this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from datetime import datetime, timezone
5from io import BytesIO
6import logging
7
8from atproto import CAR
9import dag_cbor
10import websockets
11
12from feed_manager import FeedManager
13from feeds.rapidfire import RapidFireFeed
14from feeds.popular import PopularFeed
15from firehose_manager import FirehoseManager
16
17logging.basicConfig(
18 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s',
19 level=logging.DEBUG
20)
21logging.getLogger('').setLevel(logging.WARNING)
22logging.getLogger('feeds').setLevel(logging.DEBUG)
23
24async def firehose_events(firehose_manager):
25 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
26 seq = firehose_manager.get_sequence_number()
27 if seq:
28 relay_url += f'?cursor={seq}'
29
30 logger = logging.getLogger('feeds.events')
31 logger.info(f'opening websocket connection to {relay_url}')
32
33 async with websockets.connect(relay_url, ping_timeout=None) as firehose:
34 while True:
35 frame = BytesIO(await firehose.recv())
36 header = dag_cbor.decode(frame, allow_concat=True)
37 if header['op'] != 1 or header['t'] != '#commit':
38 continue
39
40 payload = dag_cbor.decode(frame)
41 if payload['tooBig']:
42 continue
43
44 blocks = payload.pop('blocks')
45 car_parsed = CAR.from_bytes(blocks)
46 message = payload.copy()
47 del message['ops']
48 message['commit'] = message['commit'].encode('base32')
49
50 for op in payload['ops']:
51 repo_op = op.copy()
52 if op['cid'] is not None:
53 repo_op['cid'] = repo_op['cid'].encode('base32')
54 repo_op['record'] = car_parsed.blocks.get(repo_op['cid'])
55
56 message['op'] = repo_op
57 yield message
58
59async def main():
60 firehose_manager = FirehoseManager()
61
62 feed_manager = FeedManager()
63 feed_manager.register(RapidFireFeed)
64 # feed_manager.register(PopularFeed)
65
66 current_minute = None
67 async for commit in firehose_events(firehose_manager):
68 feed_manager.process_commit(commit)
69
70 now = datetime.now(timezone.utc)
71 if now.minute != current_minute:
72 current_minute = now.minute
73 feed_manager.run_tasks_minute()
74 firehose_manager.set_sequence_number(commit['seq'])
75
76if __name__ == '__main__':
77 asyncio.run(main())