this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from datetime import datetime, timezone
5import json
6import os
7import sqlite3
8import sys
9
10import redis
11import websockets
12
13app_bsky_allowlist = set([
14 'app.bsky.actor.profile',
15 'app.bsky.feed.generator',
16 'app.bsky.feed.like',
17 'app.bsky.feed.post',
18 'app.bsky.feed.postgate',
19 'app.bsky.feed.repost',
20 'app.bsky.feed.threadgate',
21 'app.bsky.graph.block',
22 'app.bsky.graph.follow',
23 'app.bsky.graph.list',
24 'app.bsky.graph.listblock',
25 'app.bsky.graph.listitem',
26 'app.bsky.graph.starterpack',
27 'app.bsky.labeler.service',
28 'chat.bsky.actor.declaration',
29])
30
31async def bsky_activity():
32 relay_url = 'ws://localhost:6008/subscribe'
33
34 sys.stdout.write(f'opening websocket connection to {relay_url}\n')
35 sys.stdout.flush()
36
37 async with websockets.connect(relay_url, ping_timeout=60) as firehose:
38 while True:
39 yield json.loads(await firehose.recv())
40
41async def main():
42 redis_cnx = redis.Redis()
43 redis_pipe = redis_cnx.pipeline()
44
45 if os.path.exists('/opt/muninsky/users.db'):
46 db_fname = '/opt/muninsky/users.db'
47 else:
48 db_fname = 'users.db'
49
50 db_cnx = sqlite3.connect(db_fname)
51 with db_cnx:
52 db_cnx.executescript("""
53 PRAGMA journal_mode = WAL;
54 PRAGMA synchronous = off;
55 CREATE TABLE IF NOT EXISTS users (did TEXT, ts TIMESTAMP);
56 CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did);
57 CREATE INDEX IF NOT EXISTS ts_idx on users(ts);
58 """)
59
60 sys.stdout.write('starting up\n')
61 sys.stdout.flush()
62
63 op_count = 0
64 async for event in bsky_activity():
65 if event['type'] != 'com':
66 continue
67
68 payload = event.get('commit')
69 if payload is None:
70 continue
71
72 if payload['type'] != 'c':
73 continue
74
75 collection = payload['collection']
76 if collection not in app_bsky_allowlist:
77 continue
78
79 repo_did = event['did']
80 repo_update_time = datetime.now(timezone.utc)
81 db_cnx.execute(
82 'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts',
83 {'did': repo_did, 'ts': repo_update_time.timestamp()}
84 )
85
86 if collection == 'app.bsky.feed.post':
87 embed = payload['record'].get('embed')
88 if embed is not None and embed.get('$type', ''):
89 embed_type = embed['$type']
90 redis_pipe.incr(f'app.bsky.feed.post:embed:{embed_type}')
91
92 redis_pipe \
93 .incr(collection) \
94 .incr('dev.edavis.muninsky.ops')
95
96 op_count += 1
97 if op_count % 500 == 0:
98 current_time_ms = datetime.now(timezone.utc).timestamp()
99 event_time_ms = event['time_us'] / 1_000_000
100 current_lag = current_time_ms - event_time_ms
101 sys.stdout.write(f'lag: {current_lag:.2f}\n')
102 redis_pipe.execute()
103 db_cnx.commit()
104 sys.stdout.flush()
105
106if __name__ == '__main__':
107 asyncio.run(main())