this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4from io import BytesIO 5import json 6import logging 7import signal 8 9from atproto import CAR 10import dag_cbor 11import websockets 12 13from feed_manager import feed_manager 14 15logging.basicConfig( 16 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s', 17 level=logging.DEBUG 18) 19logging.getLogger('').setLevel(logging.WARNING) 20logging.getLogger('feeds').setLevel(logging.DEBUG) 21logging.getLogger('firehose').setLevel(logging.DEBUG) 22logging.getLogger('feedgen').setLevel(logging.DEBUG) 23 24logger = logging.getLogger('feedgen') 25 26async def firehose_events(): 27 relay_url = 'ws://localhost:6008/subscribe' 28 29 logger = logging.getLogger('feeds.events') 30 logger.info(f'opening websocket connection to {relay_url}') 31 32 async with websockets.connect(relay_url, ping_timeout=60) as firehose: 33 while True: 34 payload = BytesIO(await firehose.recv()) 35 yield json.load(payload) 36 37async def main(): 38 event_count = 0 39 40 async for commit in firehose_events(): 41 feed_manager.process_commit(commit) 42 event_count += 1 43 if event_count % 2500 == 0: 44 feed_manager.commit_changes() 45 46async def shutdown(signal, loop): 47 logger.info(f'received exit signal {signal.name}') 48 feed_manager.stop_all() 49 tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] 50 [task.cancel() for task in tasks] 51 logger.info(f'cancelling {len(tasks)} outstanding tasks') 52 await asyncio.gather(*tasks, return_exceptions=True) 53 loop.stop() 54 55if __name__ == '__main__': 56 loop = asyncio.get_event_loop() 57 catch_signals = (signal.SIGTERM, signal.SIGINT) 58 for sig in catch_signals: 59 loop.add_signal_handler( 60 sig, 61 lambda s=sig: asyncio.create_task(shutdown(s, loop)) 62 ) 63 64 try: 65 loop.create_task(main()) 66 loop.run_forever() 67 finally: 68 loop.close()