this repo has no description
1import apsw
2import dag_cbor
3import redis
4import sys
5import websockets
6from atproto import CAR
7from io import BytesIO
8
9class FirehoseManager:
10 def __init__(self, fname='firehose.db'):
11 self.db_cnx = apsw.Connection(fname)
12 with self.db_cnx:
13 self.db_cnx.execute("create table if not exists firehose(key text unique, value text)")
14
15 def get_sequence_number(self):
16 cur = self.db_cnx.execute("select * from firehose where key = 'seq'")
17 row = cur.fetchone()
18 if row is None:
19 return None
20 (key, value) = row
21 return int(value)
22
23 def set_sequence_number(self, value):
24 with self.db_cnx:
25 self.db_cnx.execute(
26 "insert into firehose (key, value) values ('seq', :value) on conflict(key) do update set value = :value",
27 dict(value=value)
28 )
29
30async def bsky_activity():
31 redis_cnx = redis.Redis()
32 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
33 firehose_seq = redis_cnx.get('dev.edavis.muninsky.seq')
34 if firehose_seq:
35 relay_url += f'?cursor={firehose_seq.decode()}'
36
37 sys.stdout.write(f'opening websocket connection to {relay_url}\n')
38 sys.stdout.flush()
39
40 async with websockets.connect(relay_url, ping_timeout=None) as firehose:
41 while True:
42 frame = BytesIO(await firehose.recv())
43 header = dag_cbor.decode(frame, allow_concat=True)
44 if header['op'] != 1 or header['t'] != '#commit':
45 continue
46
47 payload = dag_cbor.decode(frame)
48 if payload['tooBig']:
49 # TODO(ejd): figure out how to get blocks out-of-band
50 continue
51
52 # TODO(ejd): figure out how to validate blocks
53 blocks = payload.pop('blocks')
54 car_parsed = CAR.from_bytes(blocks)
55
56 message = payload.copy()
57 del message['ops']
58 message['commit'] = message['commit'].encode('base32')
59
60 for commit_op in payload['ops']:
61 op = commit_op.copy()
62 if op['cid'] is not None:
63 op['cid'] = op['cid'].encode('base32')
64 op['record'] = car_parsed.blocks.get(op['cid'])
65
66 yield message, op