this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4import dag_cbor
5import sys
6import websockets
7from atproto import CAR
8from io import BytesIO
9
10from feeds import Manager
11from feeds.rapidfire import RapidFireFeed
12from feeds.popular import PopularFeed
13
14from firehose_utils import FirehoseManager
15
16async def firehose_events():
17 firehose_manager = FirehoseManager()
18
19 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
20 seq = firehose_manager.get_sequence_number()
21 if seq:
22 relay_url += f'?cursor={seq}'
23
24 sys.stdout.write(f'opening websocket connection to {relay_url}\n')
25 sys.stdout.flush()
26
27 async with websockets.connect(relay_url, ping_timeout=None) as firehose:
28 while True:
29 frame = BytesIO(await firehose.recv())
30 header = dag_cbor.decode(frame, allow_concat=True)
31 if header['op'] != 1 or header['t'] != '#commit':
32 continue
33
34 payload = dag_cbor.decode(frame)
35 if payload['tooBig']:
36 continue
37
38 blocks = payload.pop('blocks')
39 car_parsed = CAR.from_bytes(blocks)
40 message = payload.copy()
41 del message['ops']
42 message['commit'] = message['commit'].encode('base32')
43
44 for op in payload['ops']:
45 repo_op = op.copy()
46 if op['cid'] is not None:
47 repo_op['cid'] = repo_op['cid'].encode('base32')
48 repo_op['record'] = car_parsed.blocks[repo_op['cid']]
49 message['op'] = repo_op
50 yield message
51
52async def main():
53 firehose_manager = FirehoseManager()
54
55 feed_manager = FeedManager()
56 feed_manager.register(RapidFireFeed)
57 feed_manager.register(PopularFeed)
58
59 async for commit in firehose_events():
60 feed_manager.process(commit)
61
62if __name__ == '__main__':
63 asyncio.run(main())