this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from io import BytesIO
5import json
6import logging
7import signal
8
9from atproto import CAR
10import dag_cbor
11import websockets
12
13from feed_manager import feed_manager
14
15logging.basicConfig(
16 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s',
17 level=logging.DEBUG
18)
19logging.getLogger('').setLevel(logging.WARNING)
20logging.getLogger('feeds').setLevel(logging.DEBUG)
21logging.getLogger('firehose').setLevel(logging.DEBUG)
22logging.getLogger('feedgen').setLevel(logging.DEBUG)
23
24logger = logging.getLogger('feedgen')
25
26async def firehose_events():
27 relay_url = 'ws://localhost:6008/subscribe'
28
29 logger = logging.getLogger('feeds.events')
30 logger.info(f'opening websocket connection to {relay_url}')
31
32 async with websockets.connect(relay_url, ping_timeout=60) as firehose:
33 while True:
34 payload = BytesIO(await firehose.recv())
35 yield json.load(payload)
36
37async def main():
38 event_count = 0
39
40 async for commit in firehose_events():
41 feed_manager.process_commit(commit)
42 event_count += 1
43 if event_count % 2500 == 0:
44 feed_manager.commit_changes()
45
46def handle_exception(loop, context):
47 msg = context.get("exception", context["message"])
48 logger.error(f"Caught exception: {msg}")
49 logger.info("Shutting down...")
50 asyncio.create_task(shutdown(loop))
51
52async def shutdown(loop, signal=None):
53 if signal:
54 logger.info(f'received exit signal {signal.name}')
55 feed_manager.stop_all()
56 tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
57 [task.cancel() for task in tasks]
58 logger.info(f'cancelling {len(tasks)} outstanding tasks')
59 await asyncio.gather(*tasks, return_exceptions=True)
60 loop.stop()
61
62if __name__ == '__main__':
63 loop = asyncio.get_event_loop()
64 catch_signals = (signal.SIGTERM, signal.SIGINT)
65 for sig in catch_signals:
66 loop.add_signal_handler(
67 sig,
68 lambda s=sig: asyncio.create_task(shutdown(loop, signal=s))
69 )
70 loop.set_exception_handler(handle_exception)
71
72 try:
73 loop.create_task(main())
74 loop.run_forever()
75 finally:
76 loop.close()