this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4from io import BytesIO 5import json 6import logging 7 8from atproto import CAR 9import dag_cbor 10import websockets 11 12from feed_manager import feed_manager 13from firehose_manager import FirehoseManager 14 15logging.basicConfig( 16 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s', 17 level=logging.DEBUG 18) 19logging.getLogger('').setLevel(logging.WARNING) 20logging.getLogger('feeds').setLevel(logging.DEBUG) 21logging.getLogger('firehose').setLevel(logging.DEBUG) 22 23async def firehose_events(firehose_manager): 24 relay_url = 'ws://localhost:6008/subscribe' 25 26 logger = logging.getLogger('feeds.events') 27 logger.info(f'opening websocket connection to {relay_url}') 28 29 async with websockets.connect(relay_url, ping_timeout=60) as firehose: 30 while True: 31 payload = BytesIO(await firehose.recv()) 32 yield json.load(payload) 33 34async def main(): 35 firehose_manager = FirehoseManager() 36 event_count = 0 37 38 async for commit in firehose_events(firehose_manager): 39 feed_manager.process_commit(commit) 40 event_count += 1 41 if event_count % 2500 == 0: 42 feed_manager.commit_changes() 43 firehose_manager.set_sequence_number(commit['seq']) 44 45if __name__ == '__main__': 46 asyncio.run(main())