this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4import os
5import redis
6import sqlite3
7import sys
8from datetime import datetime, timezone
9from firehose_utils import bsky_activity
10
11app_bsky_allowlist = set([
12 'app.bsky.actor.profile',
13 'app.bsky.feed.generator',
14 'app.bsky.feed.like',
15 'app.bsky.feed.post',
16 'app.bsky.feed.repost',
17 'app.bsky.feed.threadgate',
18 'app.bsky.graph.block',
19 'app.bsky.graph.follow',
20 'app.bsky.graph.list',
21 'app.bsky.graph.listblock',
22 'app.bsky.graph.listitem',
23 'app.bsky.labeler.service',
24])
25
26async def main():
27 redis_cnx = redis.Redis()
28 redis_pipe = redis_cnx.pipeline()
29
30 if os.path.exists('/opt/muninsky/users.db'):
31 db_fname = '/opt/muninsky/users.db'
32 else:
33 db_fname = 'users.db'
34
35 db_cnx = sqlite3.connect(db_fname)
36 with db_cnx:
37 db_cnx.executescript("""
38 PRAGMA journal_mode = WAL;
39 PRAGMA synchronous = off;
40 CREATE TABLE IF NOT EXISTS users (did TEXT, ts TIMESTAMP);
41 CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did);
42 CREATE INDEX IF NOT EXISTS ts_idx on users(ts);
43 """)
44
45 sys.stdout.write('starting up\n')
46 sys.stdout.flush()
47
48 op_count = 0
49 async for commit, op in bsky_activity():
50 if op['action'] != 'create':
51 continue
52
53 collection, _ = op['path'].split('/')
54 if collection not in app_bsky_allowlist:
55 continue
56
57 repo_did = commit['repo']
58 repo_update_time = datetime.strptime(commit['time'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc)
59 db_cnx.execute(
60 'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts',
61 {'did': repo_did, 'ts': repo_update_time.timestamp()}
62 )
63
64 redis_pipe \
65 .incr(collection) \
66 .incr('dev.edavis.muninsky.ops')
67
68 op_count += 1
69 if op_count % 500 == 0:
70 now = datetime.now(timezone.utc)
71 payload_seq = commit['seq']
72 payload_lag = now - repo_update_time
73
74 sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n')
75 redis_pipe.set('dev.edavis.muninsky.seq', payload_seq)
76 redis_pipe.execute()
77 db_cnx.commit()
78 sys.stdout.flush()
79
80if __name__ == '__main__':
81 asyncio.run(main())