this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from datetime import datetime, timezone
5import json
6import os
7import sqlite3
8import sys
9
10import redis
11import websockets
12
13app_bsky_allowlist = set([
14 'app.bsky.actor.profile',
15 'app.bsky.feed.generator',
16 'app.bsky.feed.like',
17 'app.bsky.feed.post',
18 'app.bsky.feed.postgate',
19 'app.bsky.feed.repost',
20 'app.bsky.feed.threadgate',
21 'app.bsky.graph.block',
22 'app.bsky.graph.follow',
23 'app.bsky.graph.list',
24 'app.bsky.graph.listblock',
25 'app.bsky.graph.listitem',
26 'app.bsky.graph.starterpack',
27 'app.bsky.labeler.service',
28 'chat.bsky.actor.declaration',
29])
30
31other_allowlist = set([
32 'social.psky.feed.post',
33 'social.psky.chat.message',
34 'blue.zio.atfile.upload',
35])
36
37async def bsky_activity():
38 relay_url = 'ws://localhost:6008/subscribe'
39
40 sys.stdout.write(f'opening websocket connection to {relay_url}\n')
41 sys.stdout.flush()
42
43 async with websockets.connect(relay_url, ping_timeout=60) as firehose:
44 while True:
45 yield json.loads(await firehose.recv())
46
47async def main():
48 redis_cnx = redis.Redis()
49 redis_pipe = redis_cnx.pipeline()
50
51 sys.stdout.write('starting up\n')
52 sys.stdout.flush()
53
54 op_count = 0
55 async for event in bsky_activity():
56 if event['kind'] != 'commit':
57 continue
58
59 payload = event.get('commit')
60 if payload is None:
61 continue
62
63 if payload['operation'] != 'create':
64 continue
65
66 collection = payload['collection']
67 if collection not in app_bsky_allowlist | other_allowlist:
68 continue
69
70 repo_did = event['did']
71 repo_update_time = datetime.now(timezone.utc)
72
73 if collection == 'app.bsky.feed.post':
74 embed = payload['record'].get('embed')
75 if embed is not None and embed.get('$type', ''):
76 embed_type = embed['$type']
77 redis_pipe.incr(f'app.bsky.feed.post:embed:{embed_type}')
78
79 redis_pipe \
80 .incr(collection) \
81 .incr('dev.edavis.muninsky.ops')
82
83 op_count += 1
84 if op_count % 500 == 0:
85 current_time_ms = datetime.now(timezone.utc).timestamp()
86 event_time_ms = event['time_us'] / 1_000_000
87 current_lag = current_time_ms - event_time_ms
88 sys.stdout.write(f'lag: {current_lag:.2f}\n')
89 redis_pipe.execute()
90 sys.stdout.flush()
91
92if __name__ == '__main__':
93 asyncio.run(main())