this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4import os 5import redis 6import sqlite3 7import sys 8from datetime import datetime, timezone 9from firehose_utils import bsky_activity 10 11app_bsky_allowlist = set([ 12 'app.bsky.actor.profile', 13 'app.bsky.feed.generator', 14 'app.bsky.feed.like', 15 'app.bsky.feed.post', 16 'app.bsky.feed.repost', 17 'app.bsky.feed.threadgate', 18 'app.bsky.graph.block', 19 'app.bsky.graph.follow', 20 'app.bsky.graph.list', 21 'app.bsky.graph.listblock', 22 'app.bsky.graph.listitem', 23 'app.bsky.labeler.service', 24]) 25 26async def main(): 27 redis_cnx = redis.Redis() 28 redis_pipe = redis_cnx.pipeline() 29 30 if os.path.exists('/opt/muninsky/users.db'): 31 db_fname = '/opt/muninsky/users.db' 32 else: 33 db_fname = 'users.db' 34 35 db_cnx = sqlite3.connect(db_fname) 36 with db_cnx: 37 db_cnx.executescript(""" 38 PRAGMA journal_mode = WAL; 39 PRAGMA synchronous = off; 40 CREATE TABLE IF NOT EXISTS users (did TEXT, ts TIMESTAMP); 41 CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did); 42 CREATE INDEX IF NOT EXISTS ts_idx on users(ts); 43 """) 44 45 sys.stdout.write('starting up\n') 46 sys.stdout.flush() 47 48 op_count = 0 49 async for commit, op in bsky_activity(): 50 if op['action'] != 'create': 51 continue 52 53 collection, _ = op['path'].split('/') 54 if collection not in app_bsky_allowlist: 55 continue 56 57 repo_did = commit['repo'] 58 repo_update_time = datetime.strptime(commit['time'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc) 59 db_cnx.execute( 60 'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts', 61 {'did': repo_did, 'ts': repo_update_time.timestamp()} 62 ) 63 64 redis_pipe \ 65 .incr(collection) \ 66 .incr('dev.edavis.muninsky.ops') 67 68 op_count += 1 69 if op_count % 500 == 0: 70 now = datetime.now(timezone.utc) 71 payload_seq = commit['seq'] 72 payload_lag = now - repo_update_time 73 74 sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n') 75 redis_pipe.set('dev.edavis.muninsky.seq', payload_seq) 76 redis_pipe.execute() 77 db_cnx.commit() 78 sys.stdout.flush() 79 80if __name__ == '__main__': 81 asyncio.run(main())