this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from datetime import datetime, timezone
5from io import BytesIO
6import os
7import sqlite3
8import sys
9
10from atproto import CAR
11import redis
12import dag_cbor
13import websockets
14
15app_bsky_allowlist = set([
16 'app.bsky.actor.profile',
17 'app.bsky.feed.generator',
18 'app.bsky.feed.like',
19 'app.bsky.feed.post',
20 'app.bsky.feed.repost',
21 'app.bsky.feed.threadgate',
22 'app.bsky.graph.block',
23 'app.bsky.graph.follow',
24 'app.bsky.graph.list',
25 'app.bsky.graph.listblock',
26 'app.bsky.graph.listitem',
27 'app.bsky.labeler.service',
28 'chat.bsky.actor.declaration',
29])
30
31async def bsky_activity():
32 redis_cnx = redis.Redis()
33 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
34 firehose_seq = redis_cnx.get('dev.edavis.muninsky.seq')
35 if firehose_seq:
36 relay_url += f'?cursor={firehose_seq.decode()}'
37
38 sys.stdout.write(f'opening websocket connection to {relay_url}\n')
39 sys.stdout.flush()
40
41 async with websockets.connect(relay_url) as firehose:
42 while True:
43 frame = BytesIO(await firehose.recv())
44 header = dag_cbor.decode(frame, allow_concat=True)
45 if header['op'] != 1 or header['t'] != '#commit':
46 continue
47
48 payload = dag_cbor.decode(frame)
49 if payload['tooBig']:
50 # TODO(ejd): figure out how to get blocks out-of-band
51 continue
52
53 # TODO(ejd): figure out how to validate blocks
54 blocks = payload.pop('blocks')
55 car_parsed = CAR.from_bytes(blocks)
56
57 message = payload.copy()
58 del message['ops']
59 message['commit'] = message['commit'].encode('base32')
60
61 for commit_op in payload['ops']:
62 op = commit_op.copy()
63 if op['cid'] is not None:
64 op['cid'] = op['cid'].encode('base32')
65 op['record'] = car_parsed.blocks.get(op['cid'])
66
67 yield message, op
68
69async def main():
70 redis_cnx = redis.Redis()
71 redis_pipe = redis_cnx.pipeline()
72
73 if os.path.exists('/opt/muninsky/users.db'):
74 db_fname = '/opt/muninsky/users.db'
75 else:
76 db_fname = 'users.db'
77
78 db_cnx = sqlite3.connect(db_fname)
79 with db_cnx:
80 db_cnx.executescript("""
81 PRAGMA journal_mode = WAL;
82 PRAGMA synchronous = off;
83 CREATE TABLE IF NOT EXISTS users (did TEXT, ts TIMESTAMP);
84 CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did);
85 CREATE INDEX IF NOT EXISTS ts_idx on users(ts);
86 """)
87
88 sys.stdout.write('starting up\n')
89 sys.stdout.flush()
90
91 op_count = 0
92 async for commit, op in bsky_activity():
93 if op['action'] != 'create':
94 continue
95
96 collection, _ = op['path'].split('/')
97 if collection not in app_bsky_allowlist:
98 continue
99
100 repo_did = commit['repo']
101 repo_update_time = datetime.strptime(commit['time'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc)
102 db_cnx.execute(
103 'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts',
104 {'did': repo_did, 'ts': repo_update_time.timestamp()}
105 )
106
107 redis_pipe \
108 .incr(collection) \
109 .incr('dev.edavis.muninsky.ops')
110
111 op_count += 1
112 if op_count % 500 == 0:
113 now = datetime.now(timezone.utc)
114 payload_seq = commit['seq']
115 payload_lag = now - repo_update_time
116
117 sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n')
118 redis_pipe.set('dev.edavis.muninsky.seq', payload_seq)
119 redis_pipe.execute()
120 db_cnx.commit()
121 sys.stdout.flush()
122
123if __name__ == '__main__':
124 asyncio.run(main())