this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4import dag_cbor 5import sys 6import websockets 7 8from atproto import CAR 9from io import BytesIO 10from datetime import datetime, timezone 11 12from feeds import FeedManager 13from feeds.rapidfire import RapidFireFeed 14from feeds.popular import PopularFeed 15from firehose_manager import FirehoseManager 16 17async def firehose_events(firehose_manager): 18 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 19 seq = firehose_manager.get_sequence_number() 20 if seq: 21 relay_url += f'?cursor={seq}' 22 23 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 24 sys.stdout.flush() 25 26 async with websockets.connect(relay_url, ping_timeout=None) as firehose: 27 while True: 28 frame = BytesIO(await firehose.recv()) 29 header = dag_cbor.decode(frame, allow_concat=True) 30 if header['op'] != 1 or header['t'] != '#commit': 31 continue 32 33 payload = dag_cbor.decode(frame) 34 if payload['tooBig']: 35 continue 36 37 blocks = payload.pop('blocks') 38 car_parsed = CAR.from_bytes(blocks) 39 message = payload.copy() 40 del message['ops'] 41 message['commit'] = message['commit'].encode('base32') 42 43 for op in payload['ops']: 44 repo_op = op.copy() 45 if op['cid'] is not None: 46 repo_op['cid'] = repo_op['cid'].encode('base32') 47 repo_op['record'] = car_parsed.blocks[repo_op['cid']] 48 message['op'] = repo_op 49 yield message 50 51async def main(): 52 firehose_manager = FirehoseManager() 53 54 feed_manager = FeedManager() 55 feed_manager.register(RapidFireFeed) 56 # feed_manager.register(PopularFeed) 57 58 current_minute = None 59 async for commit in firehose_events(firehose_manager): 60 feed_manager.process_commit(commit) 61 62 now = datetime.now(timezone.utc) 63 if now.minute != current_minute: 64 current_minute = now.minute 65 feed_manager.run_tasks_minute() 66 firehose_manager.set_sequence_number(commit['seq']) 67 68if __name__ == '__main__': 69 asyncio.run(main())