this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from datetime import datetime, timezone
5from io import BytesIO
6import os
7import sqlite3
8import sys
9
10from atproto import CAR
11import redis
12import dag_cbor
13import websockets
14
15app_bsky_allowlist = set([
16 'app.bsky.actor.profile',
17 'app.bsky.feed.generator',
18 'app.bsky.feed.like',
19 'app.bsky.feed.post',
20 'app.bsky.feed.repost',
21 'app.bsky.feed.threadgate',
22 'app.bsky.graph.block',
23 'app.bsky.graph.follow',
24 'app.bsky.graph.list',
25 'app.bsky.graph.listblock',
26 'app.bsky.graph.listitem',
27 'app.bsky.labeler.service',
28])
29
30async def bsky_activity():
31 redis_cnx = redis.Redis()
32 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
33 firehose_seq = redis_cnx.get('dev.edavis.muninsky.seq')
34 if firehose_seq:
35 relay_url += f'?cursor={firehose_seq.decode()}'
36
37 sys.stdout.write(f'opening websocket connection to {relay_url}\n')
38 sys.stdout.flush()
39
40 async with websockets.connect(relay_url) as firehose:
41 while True:
42 frame = BytesIO(await firehose.recv())
43 header = dag_cbor.decode(frame, allow_concat=True)
44 if header['op'] != 1 or header['t'] != '#commit':
45 continue
46
47 payload = dag_cbor.decode(frame)
48 if payload['tooBig']:
49 # TODO(ejd): figure out how to get blocks out-of-band
50 continue
51
52 # TODO(ejd): figure out how to validate blocks
53 blocks = payload.pop('blocks')
54 car_parsed = CAR.from_bytes(blocks)
55
56 message = payload.copy()
57 del message['ops']
58 message['commit'] = message['commit'].encode('base32')
59
60 for commit_op in payload['ops']:
61 op = commit_op.copy()
62 if op['cid'] is not None:
63 op['cid'] = op['cid'].encode('base32')
64 op['record'] = car_parsed.blocks.get(op['cid'])
65
66 yield message, op
67
68async def main():
69 redis_cnx = redis.Redis()
70 redis_pipe = redis_cnx.pipeline()
71
72 if os.path.exists('/opt/muninsky/users.db'):
73 db_fname = '/opt/muninsky/users.db'
74 else:
75 db_fname = 'users.db'
76
77 db_cnx = sqlite3.connect(db_fname)
78 with db_cnx:
79 db_cnx.executescript("""
80 PRAGMA journal_mode = WAL;
81 PRAGMA synchronous = off;
82 CREATE TABLE IF NOT EXISTS users (did TEXT, ts TIMESTAMP);
83 CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did);
84 CREATE INDEX IF NOT EXISTS ts_idx on users(ts);
85 """)
86
87 sys.stdout.write('starting up\n')
88 sys.stdout.flush()
89
90 op_count = 0
91 async for commit, op in bsky_activity():
92 if op['action'] != 'create':
93 continue
94
95 collection, _ = op['path'].split('/')
96 if collection not in app_bsky_allowlist:
97 continue
98
99 repo_did = commit['repo']
100 repo_update_time = datetime.strptime(commit['time'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc)
101 db_cnx.execute(
102 'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts',
103 {'did': repo_did, 'ts': repo_update_time.timestamp()}
104 )
105
106 redis_pipe \
107 .incr(collection) \
108 .incr('dev.edavis.muninsky.ops')
109
110 op_count += 1
111 if op_count % 500 == 0:
112 now = datetime.now(timezone.utc)
113 payload_seq = commit['seq']
114 payload_lag = now - repo_update_time
115
116 sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n')
117 redis_pipe.set('dev.edavis.muninsky.seq', payload_seq)
118 redis_pipe.execute()
119 db_cnx.commit()
120 sys.stdout.flush()
121
122if __name__ == '__main__':
123 asyncio.run(main())