this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4from io import BytesIO 5import logging 6 7from atproto import CAR 8import dag_cbor 9import websockets 10 11from feed_manager import feed_manager 12from firehose_manager import FirehoseManager 13 14logging.basicConfig( 15 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s', 16 level=logging.DEBUG 17) 18logging.getLogger('').setLevel(logging.WARNING) 19logging.getLogger('feeds').setLevel(logging.DEBUG) 20 21async def firehose_events(firehose_manager): 22 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 23 seq = firehose_manager.get_sequence_number() 24 if seq: 25 relay_url += f'?cursor={seq}' 26 27 logger = logging.getLogger('feeds.events') 28 logger.info(f'opening websocket connection to {relay_url}') 29 30 async with websockets.connect(relay_url, ping_timeout=None) as firehose: 31 while True: 32 frame = BytesIO(await firehose.recv()) 33 header = dag_cbor.decode(frame, allow_concat=True) 34 if header['op'] != 1 or header['t'] != '#commit': 35 continue 36 37 payload = dag_cbor.decode(frame) 38 if payload['tooBig']: 39 continue 40 41 blocks = payload.pop('blocks') 42 car_parsed = CAR.from_bytes(blocks) 43 message = payload.copy() 44 del message['ops'] 45 message['commit'] = message['commit'].encode('base32') 46 47 for op in payload['ops']: 48 repo_op = op.copy() 49 if op['cid'] is not None: 50 repo_op['cid'] = repo_op['cid'].encode('base32') 51 repo_op['record'] = car_parsed.blocks.get(repo_op['cid']) 52 53 message['op'] = repo_op 54 yield message 55 56async def main(): 57 firehose_manager = FirehoseManager() 58 event_count = 0 59 60 async for commit in firehose_events(firehose_manager): 61 feed_manager.process_commit(commit) 62 event_count += 1 63 if event_count % 2000 == 0: 64 feed_manager.commit_changes() 65 firehose_manager.set_sequence_number(commit['seq']) 66 67if __name__ == '__main__': 68 asyncio.run(main())