this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4from io import BytesIO 5import json 6import logging 7import signal 8 9from atproto import CAR 10import dag_cbor 11import websockets 12 13from feed_manager import feed_manager 14 15logging.basicConfig( 16 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s', 17 level=logging.DEBUG 18) 19logging.getLogger('').setLevel(logging.WARNING) 20logging.getLogger('feeds').setLevel(logging.DEBUG) 21logging.getLogger('firehose').setLevel(logging.DEBUG) 22logging.getLogger('feedgen').setLevel(logging.DEBUG) 23 24logger = logging.getLogger('feedgen') 25 26async def firehose_events(): 27 relay_url = 'ws://localhost:6008/subscribe' 28 29 logger = logging.getLogger('feeds.events') 30 logger.info(f'opening websocket connection to {relay_url}') 31 32 async with websockets.connect(relay_url, ping_timeout=60) as firehose: 33 while True: 34 payload = BytesIO(await firehose.recv()) 35 yield json.load(payload) 36 37async def main(): 38 event_count = 0 39 40 async for commit in firehose_events(): 41 feed_manager.process_commit(commit) 42 event_count += 1 43 if event_count % 2500 == 0: 44 feed_manager.commit_changes() 45 46def handle_exception(loop, context): 47 msg = context.get("exception", context["message"]) 48 logger.error(f"Caught exception: {msg}") 49 logger.info("Shutting down...") 50 asyncio.create_task(shutdown(loop)) 51 52async def shutdown(loop, signal=None): 53 if signal: 54 logger.info(f'received exit signal {signal.name}') 55 feed_manager.stop_all() 56 tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] 57 [task.cancel() for task in tasks] 58 logger.info(f'cancelling {len(tasks)} outstanding tasks') 59 await asyncio.gather(*tasks, return_exceptions=True) 60 loop.stop() 61 62if __name__ == '__main__': 63 loop = asyncio.get_event_loop() 64 catch_signals = (signal.SIGTERM, signal.SIGINT) 65 for sig in catch_signals: 66 loop.add_signal_handler( 67 sig, 68 lambda s=sig: asyncio.create_task(shutdown(loop, signal=s)) 69 ) 70 loop.set_exception_handler(handle_exception) 71 72 try: 73 loop.create_task(main()) 74 loop.run_forever() 75 finally: 76 loop.close()