this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4import dag_cbor 5import logging 6import sys 7import websockets 8 9from atproto import CAR 10from io import BytesIO 11from datetime import datetime, timezone 12 13from feeds import FeedManager 14from feeds.rapidfire import RapidFireFeed 15from feeds.popular import PopularFeed 16from firehose_manager import FirehoseManager 17 18logging.basicConfig( 19 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s', 20 level=logging.DEBUG 21) 22logging.getLogger('').setLevel(logging.WARNING) 23logging.getLogger('feeds').setLevel(logging.DEBUG) 24 25async def firehose_events(firehose_manager): 26 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 27 seq = firehose_manager.get_sequence_number() 28 if seq: 29 relay_url += f'?cursor={seq}' 30 31 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 32 sys.stdout.flush() 33 34 async with websockets.connect(relay_url, ping_timeout=None) as firehose: 35 while True: 36 frame = BytesIO(await firehose.recv()) 37 header = dag_cbor.decode(frame, allow_concat=True) 38 if header['op'] != 1 or header['t'] != '#commit': 39 continue 40 41 payload = dag_cbor.decode(frame) 42 if payload['tooBig']: 43 continue 44 45 blocks = payload.pop('blocks') 46 car_parsed = CAR.from_bytes(blocks) 47 message = payload.copy() 48 del message['ops'] 49 message['commit'] = message['commit'].encode('base32') 50 51 for op in payload['ops']: 52 repo_op = op.copy() 53 if op['cid'] is not None: 54 repo_op['cid'] = repo_op['cid'].encode('base32') 55 repo_op['record'] = car_parsed.blocks[repo_op['cid']] 56 message['op'] = repo_op 57 yield message 58 59async def main(): 60 firehose_manager = FirehoseManager() 61 62 feed_manager = FeedManager() 63 feed_manager.register(RapidFireFeed) 64 # feed_manager.register(PopularFeed) 65 66 current_minute = None 67 async for commit in firehose_events(firehose_manager): 68 feed_manager.process_commit(commit) 69 70 now = datetime.now(timezone.utc) 71 if now.minute != current_minute: 72 current_minute = now.minute 73 feed_manager.run_tasks_minute() 74 firehose_manager.set_sequence_number(commit['seq']) 75 76if __name__ == '__main__': 77 asyncio.run(main())