this repo has no description
1import apsw
2import dag_cbor
3import redis
4import sys
5import websockets
6from atproto import CAR
7from io import BytesIO
8
9async def bsky_activity():
10 redis_cnx = redis.Redis()
11 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
12 firehose_seq = redis_cnx.get('dev.edavis.muninsky.seq')
13 if firehose_seq:
14 relay_url += f'?cursor={firehose_seq.decode()}'
15
16 sys.stdout.write(f'opening websocket connection to {relay_url}\n')
17 sys.stdout.flush()
18
19 async with websockets.connect(relay_url, ping_timeout=None) as firehose:
20 while True:
21 frame = BytesIO(await firehose.recv())
22 header = dag_cbor.decode(frame, allow_concat=True)
23 if header['op'] != 1 or header['t'] != '#commit':
24 continue
25
26 payload = dag_cbor.decode(frame)
27 if payload['tooBig']:
28 # TODO(ejd): figure out how to get blocks out-of-band
29 continue
30
31 # TODO(ejd): figure out how to validate blocks
32 blocks = payload.pop('blocks')
33 car_parsed = CAR.from_bytes(blocks)
34
35 message = payload.copy()
36 del message['ops']
37 message['commit'] = message['commit'].encode('base32')
38
39 for commit_op in payload['ops']:
40 op = commit_op.copy()
41 if op['cid'] is not None:
42 op['cid'] = op['cid'].encode('base32')
43 op['record'] = car_parsed.blocks.get(op['cid'])
44
45 yield message, op