this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from io import BytesIO
5import json
6import logging
7import signal
8
9from atproto import CAR
10import dag_cbor
11import websockets
12
13from feed_manager import feed_manager
14
15logging.basicConfig(
16 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s',
17 level=logging.DEBUG
18)
19logging.getLogger('').setLevel(logging.WARNING)
20logging.getLogger('feeds').setLevel(logging.DEBUG)
21logging.getLogger('firehose').setLevel(logging.DEBUG)
22logging.getLogger('feedgen').setLevel(logging.DEBUG)
23
24logger = logging.getLogger('feedgen')
25
26async def firehose_events():
27 relay_url = 'ws://localhost:6008/subscribe'
28
29 logger = logging.getLogger('feeds.events')
30 logger.info(f'opening websocket connection to {relay_url}')
31
32 async with websockets.connect(relay_url, ping_timeout=60) as firehose:
33 while True:
34 payload = BytesIO(await firehose.recv())
35 yield json.load(payload)
36
37async def main():
38 event_count = 0
39
40 async for commit in firehose_events():
41 feed_manager.process_commit(commit)
42 event_count += 1
43 if event_count % 2500 == 0:
44 feed_manager.commit_changes()
45
46async def shutdown(signal, loop):
47 logger.info(f'received exit signal {signal.name}')
48 feed_manager.stop_all()
49 tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
50 [task.cancel() for task in tasks]
51 logger.info(f'cancelling {len(tasks)} outstanding tasks')
52 await asyncio.gather(*tasks, return_exceptions=True)
53 loop.stop()
54
55if __name__ == '__main__':
56 loop = asyncio.get_event_loop()
57 catch_signals = (signal.SIGTERM, signal.SIGINT)
58 for sig in catch_signals:
59 loop.add_signal_handler(
60 sig,
61 lambda s=sig: asyncio.create_task(shutdown(s, loop))
62 )
63
64 try:
65 loop.create_task(main())
66 loop.run_forever()
67 finally:
68 loop.close()