this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4from datetime import datetime, timezone 5from io import BytesIO 6import os 7import sqlite3 8import sys 9 10from atproto import CAR 11import redis 12import dag_cbor 13import websockets 14 15app_bsky_allowlist = set([ 16 'app.bsky.actor.profile', 17 'app.bsky.feed.generator', 18 'app.bsky.feed.like', 19 'app.bsky.feed.post', 20 'app.bsky.feed.repost', 21 'app.bsky.feed.threadgate', 22 'app.bsky.graph.block', 23 'app.bsky.graph.follow', 24 'app.bsky.graph.list', 25 'app.bsky.graph.listblock', 26 'app.bsky.graph.listitem', 27 'app.bsky.labeler.service', 28 'chat.bsky.actor.declaration', 29]) 30 31async def bsky_activity(): 32 redis_cnx = redis.Redis() 33 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 34 firehose_seq = redis_cnx.get('dev.edavis.muninsky.seq') 35 if firehose_seq: 36 relay_url += f'?cursor={firehose_seq.decode()}' 37 38 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 39 sys.stdout.flush() 40 41 async with websockets.connect(relay_url) as firehose: 42 while True: 43 frame = BytesIO(await firehose.recv()) 44 header = dag_cbor.decode(frame, allow_concat=True) 45 if header['op'] != 1 or header['t'] != '#commit': 46 continue 47 48 payload = dag_cbor.decode(frame) 49 if payload['tooBig']: 50 # TODO(ejd): figure out how to get blocks out-of-band 51 continue 52 53 # TODO(ejd): figure out how to validate blocks 54 blocks = payload.pop('blocks') 55 car_parsed = CAR.from_bytes(blocks) 56 57 message = payload.copy() 58 del message['ops'] 59 message['commit'] = message['commit'].encode('base32') 60 61 for commit_op in payload['ops']: 62 op = commit_op.copy() 63 if op['cid'] is not None: 64 op['cid'] = op['cid'].encode('base32') 65 op['record'] = car_parsed.blocks.get(op['cid']) 66 67 yield message, op 68 69async def main(): 70 redis_cnx = redis.Redis() 71 redis_pipe = redis_cnx.pipeline() 72 73 if os.path.exists('/opt/muninsky/users.db'): 74 db_fname = '/opt/muninsky/users.db' 75 else: 76 db_fname = 'users.db' 77 78 db_cnx = sqlite3.connect(db_fname) 79 with db_cnx: 80 db_cnx.executescript(""" 81 PRAGMA journal_mode = WAL; 82 PRAGMA synchronous = off; 83 CREATE TABLE IF NOT EXISTS users (did TEXT, ts TIMESTAMP); 84 CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did); 85 CREATE INDEX IF NOT EXISTS ts_idx on users(ts); 86 """) 87 88 sys.stdout.write('starting up\n') 89 sys.stdout.flush() 90 91 op_count = 0 92 async for commit, op in bsky_activity(): 93 if op['action'] != 'create': 94 continue 95 96 collection, _ = op['path'].split('/') 97 if collection not in app_bsky_allowlist: 98 continue 99 100 repo_did = commit['repo'] 101 repo_update_time = datetime.strptime(commit['time'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc) 102 db_cnx.execute( 103 'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts', 104 {'did': repo_did, 'ts': repo_update_time.timestamp()} 105 ) 106 107 redis_pipe \ 108 .incr(collection) \ 109 .incr('dev.edavis.muninsky.ops') 110 111 op_count += 1 112 if op_count % 500 == 0: 113 now = datetime.now(timezone.utc) 114 payload_seq = commit['seq'] 115 payload_lag = now - repo_update_time 116 117 sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n') 118 redis_pipe.set('dev.edavis.muninsky.seq', payload_seq) 119 redis_pipe.execute() 120 db_cnx.commit() 121 sys.stdout.flush() 122 123if __name__ == '__main__': 124 asyncio.run(main())