this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from io import BytesIO
5import json
6import logging
7
8from atproto import CAR
9import dag_cbor
10import websockets
11
12from feed_manager import feed_manager
13
14logging.basicConfig(
15 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s',
16 level=logging.DEBUG
17)
18logging.getLogger('').setLevel(logging.WARNING)
19logging.getLogger('feeds').setLevel(logging.DEBUG)
20logging.getLogger('firehose').setLevel(logging.DEBUG)
21
22async def firehose_events():
23 relay_url = 'ws://localhost:6008/subscribe'
24
25 logger = logging.getLogger('feeds.events')
26 logger.info(f'opening websocket connection to {relay_url}')
27
28 async with websockets.connect(relay_url, ping_timeout=60) as firehose:
29 while True:
30 payload = BytesIO(await firehose.recv())
31 yield json.load(payload)
32
33async def main():
34 event_count = 0
35
36 async for commit in firehose_events():
37 feed_manager.process_commit(commit)
38 event_count += 1
39 if event_count % 2500 == 0:
40 feed_manager.commit_changes()
41
42if __name__ == '__main__':
43 asyncio.run(main())