this repo has no description

firehose.py: use frame for consistency

Changed files
+4 -4
+4 -4
firehose.py
···
async with websockets.connect(relay_url, ping_timeout=None) as firehose:
current_minute = None
while True:
-
message = BytesIO(await firehose.recv())
-
header = dag_cbor.decode(message, allow_concat=True)
+
frame = BytesIO(await firehose.recv())
+
header = dag_cbor.decode(frame, allow_concat=True)
if header['op'] != 1:
continue
-
redis_cnx.publish('bsky-tools:firehose:stream', message.getvalue())
+
redis_cnx.publish('bsky-tools:firehose:stream', frame.getvalue())
# checkpoint the seq
now = datetime.now(timezone.utc)
if now.time().minute != current_minute:
current_minute = now.time().minute
-
payload = dag_cbor.decode(message)
+
payload = dag_cbor.decode(frame)
payload_seq = payload['seq']
payload_time = datetime.strptime(payload['time'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc)
payload_lag = now - payload_time