this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from datetime import datetime, timezone
5from io import BytesIO
6import json
7import os
8import sqlite3
9import sys
10
11from atproto import CAR
12import redis
13import dag_cbor
14import websockets
15
16app_bsky_allowlist = set([
17 'app.bsky.actor.profile',
18 'app.bsky.feed.generator',
19 'app.bsky.feed.like',
20 'app.bsky.feed.post',
21 'app.bsky.feed.postgate',
22 'app.bsky.feed.repost',
23 'app.bsky.feed.threadgate',
24 'app.bsky.graph.block',
25 'app.bsky.graph.follow',
26 'app.bsky.graph.list',
27 'app.bsky.graph.listblock',
28 'app.bsky.graph.listitem',
29 'app.bsky.graph.starterpack',
30 'app.bsky.labeler.service',
31 'chat.bsky.actor.declaration',
32])
33
34async def bsky_activity():
35 relay_url = 'ws://localhost:6008/subscribe'
36
37 sys.stdout.write(f'opening websocket connection to {relay_url}\n')
38 sys.stdout.flush()
39
40 async with websockets.connect(relay_url, ping_timeout=60) as firehose:
41 while True:
42 payload = BytesIO(await firehose.recv())
43
44 yield json.load(payload)
45
46async def main():
47 redis_cnx = redis.Redis()
48 redis_pipe = redis_cnx.pipeline()
49
50 if os.path.exists('/opt/muninsky/users.db'):
51 db_fname = '/opt/muninsky/users.db'
52 else:
53 db_fname = 'users.db'
54
55 db_cnx = sqlite3.connect(db_fname)
56 with db_cnx:
57 db_cnx.executescript("""
58 PRAGMA journal_mode = WAL;
59 PRAGMA synchronous = off;
60 CREATE TABLE IF NOT EXISTS users (did TEXT, ts TIMESTAMP);
61 CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did);
62 CREATE INDEX IF NOT EXISTS ts_idx on users(ts);
63 """)
64
65 sys.stdout.write('starting up\n')
66 sys.stdout.flush()
67
68 op_count = 0
69 async for payload in bsky_activity():
70 if payload['opType'] != 'c':
71 continue
72
73 collection = payload['collection']
74 if collection not in app_bsky_allowlist:
75 continue
76
77 repo_did = payload['did']
78 repo_update_time = datetime.now(timezone.utc)
79 db_cnx.execute(
80 'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts',
81 {'did': repo_did, 'ts': repo_update_time.timestamp()}
82 )
83
84 redis_pipe \
85 .incr(collection) \
86 .incr('dev.edavis.muninsky.ops')
87
88 op_count += 1
89 if op_count % 500 == 0:
90 now = datetime.now(timezone.utc)
91 payload_seq = payload['seq']
92 payload_lag = now - repo_update_time
93
94 sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n')
95 redis_pipe.set('dev.edavis.muninsky.seq', payload_seq)
96 redis_pipe.execute()
97 db_cnx.commit()
98 sys.stdout.flush()
99
100if __name__ == '__main__':
101 asyncio.run(main())