this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4from datetime import datetime, timezone 5import json 6import os 7import sqlite3 8import sys 9 10import redis 11import websockets 12 13app_bsky_allowlist = set([ 14 'app.bsky.actor.profile', 15 'app.bsky.feed.generator', 16 'app.bsky.feed.like', 17 'app.bsky.feed.post', 18 'app.bsky.feed.postgate', 19 'app.bsky.feed.repost', 20 'app.bsky.feed.threadgate', 21 'app.bsky.graph.block', 22 'app.bsky.graph.follow', 23 'app.bsky.graph.list', 24 'app.bsky.graph.listblock', 25 'app.bsky.graph.listitem', 26 'app.bsky.graph.starterpack', 27 'app.bsky.labeler.service', 28 'chat.bsky.actor.declaration', 29]) 30 31other_allowlist = set([ 32 'social.psky.feed.post', 33 'social.psky.chat.message', 34]) 35 36async def bsky_activity(): 37 relay_url = 'ws://localhost:6008/subscribe' 38 39 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 40 sys.stdout.flush() 41 42 async with websockets.connect(relay_url, ping_timeout=60) as firehose: 43 while True: 44 yield json.loads(await firehose.recv()) 45 46async def main(): 47 redis_cnx = redis.Redis() 48 redis_pipe = redis_cnx.pipeline() 49 50 if os.path.exists('/opt/muninsky/users.db'): 51 db_fname = '/opt/muninsky/users.db' 52 else: 53 db_fname = 'users.db' 54 55 db_cnx = sqlite3.connect(db_fname) 56 with db_cnx: 57 db_cnx.executescript(""" 58 PRAGMA journal_mode = WAL; 59 PRAGMA synchronous = off; 60 CREATE TABLE IF NOT EXISTS users (did TEXT, ts TIMESTAMP); 61 CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did); 62 CREATE INDEX IF NOT EXISTS ts_idx on users(ts); 63 """) 64 65 sys.stdout.write('starting up\n') 66 sys.stdout.flush() 67 68 op_count = 0 69 async for event in bsky_activity(): 70 if event['type'] != 'com': 71 continue 72 73 payload = event.get('commit') 74 if payload is None: 75 continue 76 77 if payload['type'] != 'c': 78 continue 79 80 collection = payload['collection'] 81 if collection not in app_bsky_allowlist | other_allowlist: 82 continue 83 84 repo_did = event['did'] 85 repo_update_time = datetime.now(timezone.utc) 86 db_cnx.execute( 87 'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts', 88 {'did': repo_did, 'ts': repo_update_time.timestamp()} 89 ) 90 91 if collection == 'app.bsky.feed.post': 92 embed = payload['record'].get('embed') 93 if embed is not None and embed.get('$type', ''): 94 embed_type = embed['$type'] 95 redis_pipe.incr(f'app.bsky.feed.post:embed:{embed_type}') 96 97 redis_pipe \ 98 .incr(collection) \ 99 .incr('dev.edavis.muninsky.ops') 100 101 op_count += 1 102 if op_count % 500 == 0: 103 current_time_ms = datetime.now(timezone.utc).timestamp() 104 event_time_ms = event['time_us'] / 1_000_000 105 current_lag = current_time_ms - event_time_ms 106 sys.stdout.write(f'lag: {current_lag:.2f}\n') 107 redis_pipe.execute() 108 db_cnx.commit() 109 sys.stdout.flush() 110 111if __name__ == '__main__': 112 asyncio.run(main())