this repo has no description
1import apsw 2import dag_cbor 3import redis 4import sys 5import websockets 6from atproto import CAR 7from io import BytesIO 8 9async def bsky_activity(): 10 redis_cnx = redis.Redis() 11 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 12 firehose_seq = redis_cnx.get('dev.edavis.muninsky.seq') 13 if firehose_seq: 14 relay_url += f'?cursor={firehose_seq.decode()}' 15 16 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 17 sys.stdout.flush() 18 19 async with websockets.connect(relay_url, ping_timeout=None) as firehose: 20 while True: 21 frame = BytesIO(await firehose.recv()) 22 header = dag_cbor.decode(frame, allow_concat=True) 23 if header['op'] != 1 or header['t'] != '#commit': 24 continue 25 26 payload = dag_cbor.decode(frame) 27 if payload['tooBig']: 28 # TODO(ejd): figure out how to get blocks out-of-band 29 continue 30 31 # TODO(ejd): figure out how to validate blocks 32 blocks = payload.pop('blocks') 33 car_parsed = CAR.from_bytes(blocks) 34 35 message = payload.copy() 36 del message['ops'] 37 message['commit'] = message['commit'].encode('base32') 38 39 for commit_op in payload['ops']: 40 op = commit_op.copy() 41 if op['cid'] is not None: 42 op['cid'] = op['cid'].encode('base32') 43 op['record'] = car_parsed.blocks.get(op['cid']) 44 45 yield message, op