this repo has no description
1import apsw 2import dag_cbor 3import redis 4import sys 5import websockets 6from atproto import CAR 7from io import BytesIO 8 9class FirehoseManager: 10 def __init__(self, fname='firehose.db'): 11 self.db_cnx = apsw.Connection(fname) 12 with self.db_cnx: 13 self.db_cnx.execute("create table if not exists firehose(key text unique, value text)") 14 15 def get_sequence_number(self): 16 cur = self.db_cnx.execute("select * from firehose where key = 'seq'") 17 row = cur.fetchone() 18 if row is None: 19 return None 20 (key, value) = row 21 return int(value) 22 23 def set_sequence_number(self, value): 24 with self.db_cnx: 25 self.db_cnx.execute( 26 "insert into firehose (key, value) values ('seq', :value) on conflict(key) do update set value = :value", 27 dict(value=value) 28 ) 29 30async def bsky_activity(): 31 redis_cnx = redis.Redis() 32 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 33 firehose_seq = redis_cnx.get('dev.edavis.muninsky.seq') 34 if firehose_seq: 35 relay_url += f'?cursor={firehose_seq.decode()}' 36 37 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 38 sys.stdout.flush() 39 40 async with websockets.connect(relay_url, ping_timeout=None) as firehose: 41 while True: 42 frame = BytesIO(await firehose.recv()) 43 header = dag_cbor.decode(frame, allow_concat=True) 44 if header['op'] != 1 or header['t'] != '#commit': 45 continue 46 47 payload = dag_cbor.decode(frame) 48 if payload['tooBig']: 49 # TODO(ejd): figure out how to get blocks out-of-band 50 continue 51 52 # TODO(ejd): figure out how to validate blocks 53 blocks = payload.pop('blocks') 54 car_parsed = CAR.from_bytes(blocks) 55 56 message = payload.copy() 57 del message['ops'] 58 message['commit'] = message['commit'].encode('base32') 59 60 for commit_op in payload['ops']: 61 op = commit_op.copy() 62 if op['cid'] is not None: 63 op['cid'] = op['cid'].encode('base32') 64 op['record'] = car_parsed.blocks.get(op['cid']) 65 66 yield message, op