this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4import dag_cbor
5import sys
6import websockets
7
8from atproto import CAR
9from io import BytesIO
10from datetime import datetime, timezone
11
12from feeds import FeedManager
13from feeds.rapidfire import RapidFireFeed
14from feeds.popular import PopularFeed
15from firehose_manager import FirehoseManager
16
17async def firehose_events(firehose_manager):
18 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
19 seq = firehose_manager.get_sequence_number()
20 if seq:
21 relay_url += f'?cursor={seq}'
22
23 sys.stdout.write(f'opening websocket connection to {relay_url}\n')
24 sys.stdout.flush()
25
26 async with websockets.connect(relay_url, ping_timeout=None) as firehose:
27 while True:
28 frame = BytesIO(await firehose.recv())
29 header = dag_cbor.decode(frame, allow_concat=True)
30 if header['op'] != 1 or header['t'] != '#commit':
31 continue
32
33 payload = dag_cbor.decode(frame)
34 if payload['tooBig']:
35 continue
36
37 blocks = payload.pop('blocks')
38 car_parsed = CAR.from_bytes(blocks)
39 message = payload.copy()
40 del message['ops']
41 message['commit'] = message['commit'].encode('base32')
42
43 for op in payload['ops']:
44 repo_op = op.copy()
45 if op['cid'] is not None:
46 repo_op['cid'] = repo_op['cid'].encode('base32')
47 repo_op['record'] = car_parsed.blocks[repo_op['cid']]
48 message['op'] = repo_op
49 yield message
50
51async def main():
52 firehose_manager = FirehoseManager()
53
54 feed_manager = FeedManager()
55 feed_manager.register(RapidFireFeed)
56 # feed_manager.register(PopularFeed)
57
58 current_minute = None
59 async for commit in firehose_events(firehose_manager):
60 feed_manager.process_commit(commit)
61
62 now = datetime.now(timezone.utc)
63 if now.minute != current_minute:
64 current_minute = now.minute
65 feed_manager.run_tasks_minute()
66 firehose_manager.set_sequence_number(commit['seq'])
67
68if __name__ == '__main__':
69 asyncio.run(main())