this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4import dag_cbor 5import sys 6import websockets 7from atproto import CAR 8from io import BytesIO 9 10from feeds import Manager 11from feeds.rapidfire import RapidFireFeed 12from feeds.popular import PopularFeed 13 14from firehose_utils import FirehoseManager 15 16async def firehose_events(): 17 firehose_manager = FirehoseManager() 18 19 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 20 seq = firehose_manager.get_sequence_number() 21 if seq: 22 relay_url += f'?cursor={seq}' 23 24 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 25 sys.stdout.flush() 26 27 async with websockets.connect(relay_url, ping_timeout=None) as firehose: 28 while True: 29 frame = BytesIO(await firehose.recv()) 30 header = dag_cbor.decode(frame, allow_concat=True) 31 if header['op'] != 1 or header['t'] != '#commit': 32 continue 33 34 payload = dag_cbor.decode(frame) 35 if payload['tooBig']: 36 continue 37 38 blocks = payload.pop('blocks') 39 car_parsed = CAR.from_bytes(blocks) 40 message = payload.copy() 41 del message['ops'] 42 message['commit'] = message['commit'].encode('base32') 43 44 for op in payload['ops']: 45 repo_op = op.copy() 46 if op['cid'] is not None: 47 repo_op['cid'] = repo_op['cid'].encode('base32') 48 repo_op['record'] = car_parsed.blocks[repo_op['cid']] 49 message['op'] = repo_op 50 yield message 51 52async def main(): 53 firehose_manager = FirehoseManager() 54 55 feed_manager = FeedManager() 56 feed_manager.register(RapidFireFeed) 57 feed_manager.register(PopularFeed) 58 59 async for commit in firehose_events(): 60 feed_manager.process(commit) 61 62if __name__ == '__main__': 63 asyncio.run(main())