this repo has no description

firehose_utils.py: abstract further to subscribe_commits()

Changed files
+33 -9
+33 -9
firehose_utils.py
···
+
import dag_cbor
+
import redis
from atproto import CAR
+
from io import BytesIO
-
def commit_ops(payload):
-
# TODO(ejd): figure out how to validate blocks
-
car_parsed = CAR.from_bytes(payload['blocks'])
-
for op in payload['ops']:
-
repo_op = op.copy()
-
if op['cid'] is not None:
-
repo_op['cid'] = op['cid'].encode('base32')
-
repo_op['record'] = car_parsed.blocks[repo_op['cid']]
-
yield repo_op
+
def subscribe_commits():
+
redis_cnx = redis.Redis()
+
redis_sub = redis_cnx.pubsub(ignore_subscribe_messages=True)
+
redis_sub.subscribe('bsky-tools:firehose:stream')
+
+
for event in redis_sub.listen():
+
frame = BytesIO(event['data'])
+
header = dag_cbor.decode(frame, allow_concat=True)
+
if header['op'] != 1 or header['t'] != '#commit':
+
continue
+
+
payload = dag_cbor.decode(frame)
+
if payload['tooBig']:
+
# TODO(ejd): figure out how to get blocks out-of-band
+
continue
+
+
# TODO(ejd): figure out how to validate blocks
+
blocks = payload.pop('blocks')
+
car_parsed = CAR.from_bytes(blocks)
+
+
message = payload.copy()
+
del message['ops']
+
message['commit'] = message['commit'].encode('base32')
+
+
for commit_op in payload['ops']:
+
op = commit_op.copy()
+
if op['cid'] is not None:
+
op['cid'] = op['cid'].encode('base32')
+
op['record'] = car_parsed.blocks[op['cid']]
+
yield message, op