···
from datetime import datetime, timezone
9
-
from firehose_utils import commit_ops
10
-
from io import BytesIO
8
+
from firehose_utils import subscribe_commits
app_bsky_allowlist = set([
'app.bsky.actor.profile',
···
redis_cnx = redis.Redis()
redis_pipe = redis_cnx.pipeline()
30
-
redis_sub = redis_cnx.pubsub(ignore_subscribe_messages=True)
if os.path.exists('/opt/muninsky/users.db'):
db_fname = '/opt/muninsky/users.db'
···
CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did);
CREATE INDEX IF NOT EXISTS ts_idx on users(ts);
44
+
sys.stdout.write('starting up\n')
48
-
redis_sub.subscribe('bsky-tools:firehose:stream')
49
-
for event in redis_sub.listen():
50
-
frame = BytesIO(event['data'])
51
-
header = dag_cbor.decode(frame, allow_concat=True)
52
-
if header['op'] != 1 or header['t'] != '#commit':
48
+
for commit, op in subscribe_commits():
49
+
if op['action'] != 'create':
55
-
payload = dag_cbor.decode(frame)
56
-
if payload['tooBig']:
57
-
# TODO(ejd): how handle these?
52
+
collection, _ = op['path'].split('/')
53
+
if collection not in app_bsky_allowlist:
60
-
for op in commit_ops(payload):
61
-
if op['action'] != 'create':
56
+
repo_did = commit['repo']
57
+
now = datetime.now(timezone.utc)
59
+
'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts',
60
+
{'did': repo_did, 'ts': now.timestamp()}
64
-
collection, _ = op['path'].split('/')
65
-
if collection not in app_bsky_allowlist:
65
+
.incr('dev.edavis.muninsky.ops')
68
-
repo_did = payload['repo']
69
-
ts = datetime.now(timezone.utc).timestamp()
71
-
'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts',
72
-
{'did': repo_did, 'ts': ts}
68
+
if op_count % 1000 == 0:
69
+
payload_seq = commit['seq']
70
+
payload_time = datetime.strptime(commit['time'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc)
71
+
payload_lag = now - payload_time
77
-
.incr('dev.edavis.muninsky.ops')
80
-
if op_count % 500 == 0:
81
-
payload_seq = payload['seq']
82
-
sys.stdout.write(f'checkpoint: seq: {payload_seq}\n')
83
-
redis_pipe.set('dev.edavis.muninsky.seq', payload_seq)
84
-
redis_pipe.execute()
73
+
sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n')
74
+
redis_pipe.set('dev.edavis.muninsky.seq', payload_seq)
75
+
redis_pipe.execute()
if __name__ == '__main__':