this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4from datetime import datetime, timezone 5from io import BytesIO 6import logging 7 8from atproto import CAR 9import dag_cbor 10import websockets 11 12from feed_manager import FeedManager 13from feeds.rapidfire import RapidFireFeed 14from feeds.popular import PopularFeed 15from firehose_manager import FirehoseManager 16 17logging.basicConfig( 18 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s', 19 level=logging.DEBUG 20) 21logging.getLogger('').setLevel(logging.WARNING) 22logging.getLogger('feeds').setLevel(logging.DEBUG) 23 24async def firehose_events(firehose_manager): 25 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 26 seq = firehose_manager.get_sequence_number() 27 if seq: 28 relay_url += f'?cursor={seq}' 29 30 logger = logging.getLogger('feeds.events') 31 logger.info(f'opening websocket connection to {relay_url}') 32 33 async with websockets.connect(relay_url, ping_timeout=None) as firehose: 34 while True: 35 frame = BytesIO(await firehose.recv()) 36 header = dag_cbor.decode(frame, allow_concat=True) 37 if header['op'] != 1 or header['t'] != '#commit': 38 continue 39 40 payload = dag_cbor.decode(frame) 41 if payload['tooBig']: 42 continue 43 44 blocks = payload.pop('blocks') 45 car_parsed = CAR.from_bytes(blocks) 46 message = payload.copy() 47 del message['ops'] 48 message['commit'] = message['commit'].encode('base32') 49 50 for op in payload['ops']: 51 repo_op = op.copy() 52 if op['cid'] is not None: 53 repo_op['cid'] = repo_op['cid'].encode('base32') 54 repo_op['record'] = car_parsed.blocks.get(repo_op['cid']) 55 56 message['op'] = repo_op 57 yield message 58 59async def main(): 60 firehose_manager = FirehoseManager() 61 62 feed_manager = FeedManager() 63 feed_manager.register(RapidFireFeed) 64 # feed_manager.register(PopularFeed) 65 66 current_minute = None 67 async for commit in firehose_events(firehose_manager): 68 feed_manager.process_commit(commit) 69 70 now = datetime.now(timezone.utc) 71 if now.minute != current_minute: 72 current_minute = now.minute 73 feed_manager.run_tasks_minute() 74 firehose_manager.set_sequence_number(commit['seq']) 75 76if __name__ == '__main__': 77 asyncio.run(main())