this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4from datetime import datetime, timezone 5from io import BytesIO 6import json 7import os 8import sqlite3 9import sys 10 11from atproto import CAR 12import redis 13import dag_cbor 14import websockets 15 16app_bsky_allowlist = set([ 17 'app.bsky.actor.profile', 18 'app.bsky.feed.generator', 19 'app.bsky.feed.like', 20 'app.bsky.feed.post', 21 'app.bsky.feed.postgate', 22 'app.bsky.feed.repost', 23 'app.bsky.feed.threadgate', 24 'app.bsky.graph.block', 25 'app.bsky.graph.follow', 26 'app.bsky.graph.list', 27 'app.bsky.graph.listblock', 28 'app.bsky.graph.listitem', 29 'app.bsky.graph.starterpack', 30 'app.bsky.labeler.service', 31 'chat.bsky.actor.declaration', 32]) 33 34async def bsky_activity(): 35 relay_url = 'ws://localhost:6008/subscribe' 36 37 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 38 sys.stdout.flush() 39 40 async with websockets.connect(relay_url, ping_timeout=60) as firehose: 41 while True: 42 payload = BytesIO(await firehose.recv()) 43 44 yield json.load(payload) 45 46async def main(): 47 redis_cnx = redis.Redis() 48 redis_pipe = redis_cnx.pipeline() 49 50 if os.path.exists('/opt/muninsky/users.db'): 51 db_fname = '/opt/muninsky/users.db' 52 else: 53 db_fname = 'users.db' 54 55 db_cnx = sqlite3.connect(db_fname) 56 with db_cnx: 57 db_cnx.executescript(""" 58 PRAGMA journal_mode = WAL; 59 PRAGMA synchronous = off; 60 CREATE TABLE IF NOT EXISTS users (did TEXT, ts TIMESTAMP); 61 CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did); 62 CREATE INDEX IF NOT EXISTS ts_idx on users(ts); 63 """) 64 65 sys.stdout.write('starting up\n') 66 sys.stdout.flush() 67 68 op_count = 0 69 async for payload in bsky_activity(): 70 if payload['opType'] != 'c': 71 continue 72 73 collection = payload['collection'] 74 if collection not in app_bsky_allowlist: 75 continue 76 77 repo_did = payload['did'] 78 repo_update_time = datetime.now(timezone.utc) 79 db_cnx.execute( 80 'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts', 81 {'did': repo_did, 'ts': repo_update_time.timestamp()} 82 ) 83 84 redis_pipe \ 85 .incr(collection) \ 86 .incr('dev.edavis.muninsky.ops') 87 88 op_count += 1 89 if op_count % 500 == 0: 90 now = datetime.now(timezone.utc) 91 payload_seq = payload['seq'] 92 payload_lag = now - repo_update_time 93 94 sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n') 95 redis_pipe.set('dev.edavis.muninsky.seq', payload_seq) 96 redis_pipe.execute() 97 db_cnx.commit() 98 sys.stdout.flush() 99 100if __name__ == '__main__': 101 asyncio.run(main())