this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4from io import BytesIO 5import json 6import logging 7 8from atproto import CAR 9import dag_cbor 10import websockets 11 12from feed_manager import feed_manager 13 14logging.basicConfig( 15 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s', 16 level=logging.DEBUG 17) 18logging.getLogger('').setLevel(logging.WARNING) 19logging.getLogger('feeds').setLevel(logging.DEBUG) 20logging.getLogger('firehose').setLevel(logging.DEBUG) 21 22async def firehose_events(): 23 relay_url = 'ws://localhost:6008/subscribe' 24 25 logger = logging.getLogger('feeds.events') 26 logger.info(f'opening websocket connection to {relay_url}') 27 28 async with websockets.connect(relay_url, ping_timeout=60) as firehose: 29 while True: 30 payload = BytesIO(await firehose.recv()) 31 yield json.load(payload) 32 33async def main(): 34 event_count = 0 35 36 async for commit in firehose_events(): 37 feed_manager.process_commit(commit) 38 event_count += 1 39 if event_count % 2500 == 0: 40 feed_manager.commit_changes() 41 42if __name__ == '__main__': 43 asyncio.run(main())