this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4from datetime import datetime, timezone 5import json 6import os 7import sqlite3 8import sys 9 10import redis 11import websockets 12 13app_bsky_allowlist = set([ 14 'app.bsky.actor.profile', 15 'app.bsky.feed.generator', 16 'app.bsky.feed.like', 17 'app.bsky.feed.post', 18 'app.bsky.feed.postgate', 19 'app.bsky.feed.repost', 20 'app.bsky.feed.threadgate', 21 'app.bsky.graph.block', 22 'app.bsky.graph.follow', 23 'app.bsky.graph.list', 24 'app.bsky.graph.listblock', 25 'app.bsky.graph.listitem', 26 'app.bsky.graph.starterpack', 27 'app.bsky.labeler.service', 28 'chat.bsky.actor.declaration', 29]) 30 31other_allowlist = set([ 32 'social.psky.feed.post', 33 'social.psky.chat.message', 34 'blue.zio.atfile.upload', 35]) 36 37async def bsky_activity(): 38 relay_url = 'wss://jetstream1.us-west.bsky.network/subscribe' 39 40 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 41 sys.stdout.flush() 42 43 async with websockets.connect(relay_url, ping_timeout=60) as firehose: 44 while True: 45 yield json.loads(await firehose.recv()) 46 47async def main(): 48 redis_cnx = redis.Redis() 49 redis_pipe = redis_cnx.pipeline() 50 51 sys.stdout.write('starting up\n') 52 sys.stdout.flush() 53 54 op_count = 0 55 async for event in bsky_activity(): 56 if event['kind'] != 'commit': 57 continue 58 59 payload = event.get('commit') 60 if payload is None: 61 continue 62 63 if payload['operation'] != 'create': 64 continue 65 66 collection = payload['collection'] 67 if collection not in app_bsky_allowlist | other_allowlist: 68 continue 69 70 repo_did = event['did'] 71 repo_update_time = datetime.now(timezone.utc) 72 73 if collection == 'app.bsky.feed.post': 74 embed = payload['record'].get('embed') 75 if embed is not None and embed.get('$type', ''): 76 embed_type = embed['$type'] 77 redis_pipe.incr(f'app.bsky.feed.post:embed:{embed_type}') 78 79 redis_pipe \ 80 .incr(collection) \ 81 .incr('dev.edavis.muninsky.ops') 82 83 op_count += 1 84 if op_count % 500 == 0: 85 current_time_ms = datetime.now(timezone.utc).timestamp() 86 event_time_ms = event['time_us'] / 1_000_000 87 current_lag = current_time_ms - event_time_ms 88 sys.stdout.write(f'lag: {current_lag:.2f}\n') 89 redis_pipe.execute() 90 sys.stdout.flush() 91 92if __name__ == '__main__': 93 asyncio.run(main())