this repo has no description
1#!/usr/bin/env python3 2 3import asyncio 4from datetime import datetime, timezone 5import json 6import os 7import sqlite3 8import sys 9 10import redis 11import websockets 12 13app_bsky_allowlist = ( 14 'app.bsky.actor.profile', 15 'app.bsky.feed.generator', 16 'app.bsky.feed.like', 17 'app.bsky.feed.post', 18 'app.bsky.feed.postgate', 19 'app.bsky.feed.repost', 20 'app.bsky.feed.threadgate', 21 'app.bsky.graph.block', 22 'app.bsky.graph.follow', 23 'app.bsky.graph.list', 24 'app.bsky.graph.listblock', 25 'app.bsky.graph.listitem', 26 'app.bsky.graph.starterpack', 27 'app.bsky.labeler.service', 28 'chat.bsky.actor.declaration', 29) 30 31other_allowlist = ( 32 'social.psky', 33 'blue.zio.atfile', 34 'com.shinolabs.pinksea', 35 'com.whtwnd', 36 'events.smokesignal', 37 'fyi.unravel', 38 'xyz.statusphere', 39) 40 41async def bsky_activity(): 42 relay_url = 'wss://jetstream1.us-west.bsky.network/subscribe' 43 44 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 45 sys.stdout.flush() 46 47 async with websockets.connect(relay_url, ping_timeout=60) as firehose: 48 while True: 49 yield json.loads(await firehose.recv()) 50 51async def main(): 52 redis_cnx = redis.Redis() 53 redis_pipe = redis_cnx.pipeline() 54 55 sys.stdout.write('starting up\n') 56 sys.stdout.flush() 57 58 op_count = 0 59 async for event in bsky_activity(): 60 if event['kind'] != 'commit': 61 continue 62 63 payload = event.get('commit') 64 if payload is None: 65 continue 66 67 if payload['operation'] != 'create': 68 continue 69 70 collection = payload['collection'] 71 if not collection.startswith(app_bsky_allowlist + other_allowlist): 72 continue 73 74 for prefix in other_allowlist: 75 if collection.startswith(prefix): 76 redis_pipe.incr('dev.edavis.atproto.collection.' + prefix.replace('.', '_')) 77 78 if collection == 'app.bsky.feed.post': 79 embed = payload['record'].get('embed') 80 if embed is not None and embed.get('$type', ''): 81 embed_type = embed['$type'] 82 redis_pipe.incr(f'app.bsky.feed.post:embed:{embed_type}') 83 84 redis_pipe \ 85 .incr(collection) \ 86 .incr('dev.edavis.muninsky.ops') 87 88 op_count += 1 89 if op_count % 500 == 0: 90 event_time_ms = event['time_us'] / 1_000_000 91 sys.stdout.write(f'timestamp: {event_time_ms}\n') 92 redis_pipe.execute() 93 sys.stdout.flush() 94 95if __name__ == '__main__': 96 asyncio.run(main())