this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from io import BytesIO
5import logging
6
7from atproto import CAR
8import dag_cbor
9import websockets
10
11from feed_manager import feed_manager
12from firehose_manager import FirehoseManager
13
14logging.basicConfig(
15 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s',
16 level=logging.DEBUG
17)
18logging.getLogger('').setLevel(logging.WARNING)
19logging.getLogger('feeds').setLevel(logging.DEBUG)
20logging.getLogger('firehose').setLevel(logging.DEBUG)
21
22async def firehose_events(firehose_manager):
23 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
24 seq = firehose_manager.get_sequence_number()
25 if seq is not None:
26 relay_url += f'?cursor={seq}'
27
28 logger = logging.getLogger('feeds.events')
29 logger.info(f'opening websocket connection to {relay_url}')
30
31 async with websockets.connect(relay_url, ping_timeout=60) as firehose:
32 while True:
33 frame = BytesIO(await firehose.recv())
34 header = dag_cbor.decode(frame, allow_concat=True)
35 if header['op'] != 1 or header['t'] != '#commit':
36 continue
37
38 payload = dag_cbor.decode(frame)
39 if payload['tooBig']:
40 continue
41
42 blocks = payload.pop('blocks')
43 car_parsed = CAR.from_bytes(blocks)
44 message = payload.copy()
45 del message['ops']
46 message['commit'] = message['commit'].encode('base32')
47
48 for op in payload['ops']:
49 repo_op = op.copy()
50 if op['cid'] is not None:
51 repo_op['cid'] = repo_op['cid'].encode('base32')
52 repo_op['record'] = car_parsed.blocks.get(repo_op['cid'])
53
54 message['op'] = repo_op
55 yield message
56
57async def main():
58 firehose_manager = FirehoseManager()
59 event_count = 0
60
61 async for commit in firehose_events(firehose_manager):
62 feed_manager.process_commit(commit)
63 event_count += 1
64 if event_count % 2500 == 0:
65 feed_manager.commit_changes()
66 firehose_manager.set_sequence_number(commit['seq'])
67
68if __name__ == '__main__':
69 asyncio.run(main())