this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from io import BytesIO
5import json
6import logging
7
8from atproto import CAR
9import dag_cbor
10import websockets
11
12from feed_manager import feed_manager
13from firehose_manager import FirehoseManager
14
15logging.basicConfig(
16 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s',
17 level=logging.DEBUG
18)
19logging.getLogger('').setLevel(logging.WARNING)
20logging.getLogger('feeds').setLevel(logging.DEBUG)
21logging.getLogger('firehose').setLevel(logging.DEBUG)
22
23async def firehose_events(firehose_manager):
24 relay_url = 'ws://localhost:6008/subscribe'
25
26 logger = logging.getLogger('feeds.events')
27 logger.info(f'opening websocket connection to {relay_url}')
28
29 async with websockets.connect(relay_url, ping_timeout=60) as firehose:
30 while True:
31 payload = BytesIO(await firehose.recv())
32 yield json.load(payload)
33
34async def main():
35 firehose_manager = FirehoseManager()
36 event_count = 0
37
38 async for commit in firehose_events(firehose_manager):
39 feed_manager.process_commit(commit)
40 event_count += 1
41 if event_count % 2500 == 0:
42 feed_manager.commit_changes()
43 firehose_manager.set_sequence_number(commit['seq'])
44
45if __name__ == '__main__':
46 asyncio.run(main())