this repo has no description
1#!/usr/bin/env python3
2
3import asyncio
4from datetime import datetime, timezone
5import json
6import os
7import sqlite3
8import sys
9
10import redis
11import websockets
12
13app_bsky_allowlist = (
14 'app.bsky.actor.profile',
15 'app.bsky.feed.generator',
16 'app.bsky.feed.like',
17 'app.bsky.feed.post',
18 'app.bsky.feed.postgate',
19 'app.bsky.feed.repost',
20 'app.bsky.feed.threadgate',
21 'app.bsky.graph.block',
22 'app.bsky.graph.follow',
23 'app.bsky.graph.list',
24 'app.bsky.graph.listblock',
25 'app.bsky.graph.listitem',
26 'app.bsky.graph.starterpack',
27 'app.bsky.labeler.service',
28 'chat.bsky.actor.declaration',
29)
30
31other_allowlist = (
32 'social.psky',
33 'blue.zio.atfile',
34 'com.shinolabs.pinksea',
35 'com.whtwnd',
36 'events.smokesignal',
37 'fyi.unravel',
38 'xyz.statusphere',
39)
40
41async def bsky_activity():
42 relay_url = 'wss://jetstream1.us-west.bsky.network/subscribe'
43
44 sys.stdout.write(f'opening websocket connection to {relay_url}\n')
45 sys.stdout.flush()
46
47 async with websockets.connect(relay_url, ping_timeout=60) as firehose:
48 while True:
49 yield json.loads(await firehose.recv())
50
51async def main():
52 redis_cnx = redis.Redis()
53 redis_pipe = redis_cnx.pipeline()
54
55 sys.stdout.write('starting up\n')
56 sys.stdout.flush()
57
58 op_count = 0
59 async for event in bsky_activity():
60 if event['kind'] != 'commit':
61 continue
62
63 payload = event.get('commit')
64 if payload is None:
65 continue
66
67 if payload['operation'] != 'create':
68 continue
69
70 collection = payload['collection']
71 if not collection.startswith(app_bsky_allowlist + other_allowlist):
72 continue
73
74 for prefix in other_allowlist:
75 if collection.startswith(prefix):
76 redis_pipe.incr('dev.edavis.atproto.collection.' + prefix.replace('.', '_'))
77
78 if collection == 'app.bsky.feed.post':
79 embed = payload['record'].get('embed')
80 if embed is not None and embed.get('$type', ''):
81 embed_type = embed['$type']
82 redis_pipe.incr(f'app.bsky.feed.post:embed:{embed_type}')
83
84 redis_pipe \
85 .incr(collection) \
86 .incr('dev.edavis.muninsky.ops')
87
88 op_count += 1
89 if op_count % 500 == 0:
90 event_time_ms = event['time_us'] / 1_000_000
91 sys.stdout.write(f'timestamp: {event_time_ms}\n')
92 redis_pipe.execute()
93 sys.stdout.flush()
94
95if __name__ == '__main__':
96 asyncio.run(main())