this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from io import BytesIO
5import logging
6
7from atproto import CAR
8import dag_cbor
9import websockets
10
11from feed_manager import feed_manager
12from firehose_manager import FirehoseManager
13
14logging.basicConfig(
15 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s',
16 level=logging.DEBUG
17)
18logging.getLogger('').setLevel(logging.WARNING)
19logging.getLogger('feeds').setLevel(logging.DEBUG)
20
21async def firehose_events(firehose_manager):
22 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
23 seq = firehose_manager.get_sequence_number()
24 if seq:
25 relay_url += f'?cursor={seq}'
26
27 logger = logging.getLogger('feeds.events')
28 logger.info(f'opening websocket connection to {relay_url}')
29
30 async with websockets.connect(relay_url, ping_timeout=None) as firehose:
31 while True:
32 frame = BytesIO(await firehose.recv())
33 header = dag_cbor.decode(frame, allow_concat=True)
34 if header['op'] != 1 or header['t'] != '#commit':
35 continue
36
37 payload = dag_cbor.decode(frame)
38 if payload['tooBig']:
39 continue
40
41 blocks = payload.pop('blocks')
42 car_parsed = CAR.from_bytes(blocks)
43 message = payload.copy()
44 del message['ops']
45 message['commit'] = message['commit'].encode('base32')
46
47 for op in payload['ops']:
48 repo_op = op.copy()
49 if op['cid'] is not None:
50 repo_op['cid'] = repo_op['cid'].encode('base32')
51 repo_op['record'] = car_parsed.blocks.get(repo_op['cid'])
52
53 message['op'] = repo_op
54 yield message
55
56async def main():
57 firehose_manager = FirehoseManager()
58 event_count = 0
59
60 async for commit in firehose_events(firehose_manager):
61 feed_manager.process_commit(commit)
62 event_count += 1
63 if event_count % 2000 == 0:
64 feed_manager.commit_changes()
65 firehose_manager.set_sequence_number(commit['seq'])
66
67if __name__ == '__main__':
68 asyncio.run(main())