this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from datetime import datetime, timezone
5from io import BytesIO
6import logging
7
8from atproto import CAR
9import dag_cbor
10import websockets
11
12from feed_manager import manager as feed_manager
13from firehose_manager import FirehoseManager
14
15logging.basicConfig(
16 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s',
17 level=logging.DEBUG
18)
19logging.getLogger('').setLevel(logging.WARNING)
20logging.getLogger('feeds').setLevel(logging.DEBUG)
21
22async def firehose_events(firehose_manager):
23 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
24 seq = firehose_manager.get_sequence_number()
25 if seq:
26 relay_url += f'?cursor={seq}'
27
28 logger = logging.getLogger('feeds.events')
29 logger.info(f'opening websocket connection to {relay_url}')
30
31 async with websockets.connect(relay_url, ping_timeout=None) as firehose:
32 while True:
33 frame = BytesIO(await firehose.recv())
34 header = dag_cbor.decode(frame, allow_concat=True)
35 if header['op'] != 1 or header['t'] != '#commit':
36 continue
37
38 payload = dag_cbor.decode(frame)
39 if payload['tooBig']:
40 continue
41
42 blocks = payload.pop('blocks')
43 car_parsed = CAR.from_bytes(blocks)
44 message = payload.copy()
45 del message['ops']
46 message['commit'] = message['commit'].encode('base32')
47
48 for op in payload['ops']:
49 repo_op = op.copy()
50 if op['cid'] is not None:
51 repo_op['cid'] = repo_op['cid'].encode('base32')
52 repo_op['record'] = car_parsed.blocks.get(repo_op['cid'])
53
54 message['op'] = repo_op
55 yield message
56
57async def main():
58 firehose_manager = FirehoseManager()
59
60 current_minute = None
61 async for commit in firehose_events(firehose_manager):
62 feed_manager.process_commit(commit)
63
64 now = datetime.now(timezone.utc)
65 if now.minute != current_minute:
66 current_minute = now.minute
67 feed_manager.run_tasks_minute()
68 firehose_manager.set_sequence_number(commit['seq'])
69
70if __name__ == '__main__':
71 asyncio.run(main())