this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4from io import BytesIO 5import logging 6 7from atproto import CAR 8import dag_cbor 9import websockets 10 11from feed_manager import feed_manager 12from firehose_manager import FirehoseManager 13 14logging.basicConfig( 15 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s', 16 level=logging.DEBUG 17) 18logging.getLogger('').setLevel(logging.WARNING) 19logging.getLogger('feeds').setLevel(logging.DEBUG) 20logging.getLogger('firehose').setLevel(logging.DEBUG) 21 22async def firehose_events(firehose_manager): 23 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 24 seq = firehose_manager.get_sequence_number() 25 if seq is not None: 26 relay_url += f'?cursor={seq}' 27 28 logger = logging.getLogger('feeds.events') 29 logger.info(f'opening websocket connection to {relay_url}') 30 31 async with websockets.connect(relay_url, ping_timeout=60) as firehose: 32 while True: 33 frame = BytesIO(await firehose.recv()) 34 header = dag_cbor.decode(frame, allow_concat=True) 35 if header['op'] != 1 or header['t'] != '#commit': 36 continue 37 38 payload = dag_cbor.decode(frame) 39 if payload['tooBig']: 40 continue 41 42 blocks = payload.pop('blocks') 43 car_parsed = CAR.from_bytes(blocks) 44 message = payload.copy() 45 del message['ops'] 46 message['commit'] = message['commit'].encode('base32') 47 48 for op in payload['ops']: 49 repo_op = op.copy() 50 if op['cid'] is not None: 51 repo_op['cid'] = repo_op['cid'].encode('base32') 52 repo_op['record'] = car_parsed.blocks.get(repo_op['cid']) 53 54 message['op'] = repo_op 55 yield message 56 57async def main(): 58 firehose_manager = FirehoseManager() 59 event_count = 0 60 61 async for commit in firehose_events(firehose_manager): 62 feed_manager.process_commit(commit) 63 event_count += 1 64 if event_count % 2500 == 0: 65 feed_manager.commit_changes() 66 firehose_manager.set_sequence_number(commit['seq']) 67 68if __name__ == '__main__': 69 asyncio.run(main())