this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4import dag_cbor
5import logging
6import sys
7import websockets
8
9from atproto import CAR
10from io import BytesIO
11from datetime import datetime, timezone
12
13from feeds import FeedManager
14from feeds.rapidfire import RapidFireFeed
15from feeds.popular import PopularFeed
16from firehose_manager import FirehoseManager
17
18logging.basicConfig(
19 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s',
20 level=logging.DEBUG
21)
22logging.getLogger('').setLevel(logging.WARNING)
23logging.getLogger('feeds').setLevel(logging.DEBUG)
24
25async def firehose_events(firehose_manager):
26 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
27 seq = firehose_manager.get_sequence_number()
28 if seq:
29 relay_url += f'?cursor={seq}'
30
31 sys.stdout.write(f'opening websocket connection to {relay_url}\n')
32 sys.stdout.flush()
33
34 async with websockets.connect(relay_url, ping_timeout=None) as firehose:
35 while True:
36 frame = BytesIO(await firehose.recv())
37 header = dag_cbor.decode(frame, allow_concat=True)
38 if header['op'] != 1 or header['t'] != '#commit':
39 continue
40
41 payload = dag_cbor.decode(frame)
42 if payload['tooBig']:
43 continue
44
45 blocks = payload.pop('blocks')
46 car_parsed = CAR.from_bytes(blocks)
47 message = payload.copy()
48 del message['ops']
49 message['commit'] = message['commit'].encode('base32')
50
51 for op in payload['ops']:
52 repo_op = op.copy()
53 if op['cid'] is not None:
54 repo_op['cid'] = repo_op['cid'].encode('base32')
55 repo_op['record'] = car_parsed.blocks[repo_op['cid']]
56 message['op'] = repo_op
57 yield message
58
59async def main():
60 firehose_manager = FirehoseManager()
61
62 feed_manager = FeedManager()
63 feed_manager.register(RapidFireFeed)
64 # feed_manager.register(PopularFeed)
65
66 current_minute = None
67 async for commit in firehose_events(firehose_manager):
68 feed_manager.process_commit(commit)
69
70 now = datetime.now(timezone.utc)
71 if now.minute != current_minute:
72 current_minute = now.minute
73 feed_manager.run_tasks_minute()
74 firehose_manager.set_sequence_number(commit['seq'])
75
76if __name__ == '__main__':
77 asyncio.run(main())