this repo has no description

feeds: add timestamp parsing, use in rapidfire.py

Changed files
+41 -2
feeds
+39
feeds/__init__.py
···
class BaseFeed:
def process_commit(self, commit):
raise NotImplementedError
···
def run_tasks_minute(self):
pass
class FeedManager:
def __init__(self):
···
+
from datetime import datetime, timezone, timedelta
+
class BaseFeed:
def process_commit(self, commit):
raise NotImplementedError
···
def run_tasks_minute(self):
pass
+
+
def parse_timestamp(self, timestamp):
+
# https://atproto.com/specs/lexicon#datetime
+
formats = {
+
# preferred
+
'1985-04-12T23:20:50.123Z': '%Y-%m-%dT%H:%M:%S.%f%z',
+
# '1985-04-12T23:20:50.123456Z': '%Y-%m-%dT%H:%M:%S.%f%z',
+
# '1985-04-12T23:20:50.120Z': '%Y-%m-%dT%H:%M:%S.%f%z',
+
# '1985-04-12T23:20:50.120000Z': '%Y-%m-%dT%H:%M:%S.%f%z',
+
+
# supported
+
# '1985-04-12T23:20:50.12345678912345Z': '',
+
'1985-04-12T23:20:50Z': '%Y-%m-%dT%H:%M:%S%z',
+
# '1985-04-12T23:20:50.0Z': '%Y-%m-%dT%H:%M:%S.%f%z',
+
# '1985-04-12T23:20:50.123+00:00': '%Y-%m-%dT%H:%M:%S.%f%z',
+
# '1985-04-12T23:20:50.123-07:00': '%Y-%m-%dT%H:%M:%S.%f%z',
+
}
+
+
for format in formats.values():
+
try:
+
ts = datetime.strptime(timestamp, format)
+
except ValueError
+
continue
+
else:
+
return ts
+
+
return datetime.now(timezone.utc)
+
+
def safe_timestamp(self, timestamp):
+
parsed = self.parse_timestamp(timestamp)
+
utc_now = datetime.now(timezone.utc)
+
if parsed.timestamp() <= 0:
+
return utc_now
+
elif parsed - timedelta(minutes=2) < utc_now:
+
return parsed
+
elif parsed > utc_now:
+
return utc_now
class FeedManager:
def __init__(self):
+2 -2
feeds/rapidfire.py
···
repo = commit['repo']
path = op['path']
post_uri = f'at://{repo}/{path}'
-
ts = record['createdAt']
with self.db_cnx:
langs = record.get('langs') or ['']
···
with self.db_cnx:
self.db_cnx.execute(
-
"delete from posts where strftime('%s', create_ts) < strftime('%s', 'now', '-15 minutes')"
)
self.db_cnx.pragma('wal_checkpoint(TRUNCATE)')
···
repo = commit['repo']
path = op['path']
post_uri = f'at://{repo}/{path}'
+
ts = self.safe_timestamp(record['createdAt']).timestamp()
with self.db_cnx:
langs = record.get('langs') or ['']
···
with self.db_cnx:
self.db_cnx.execute(
+
"delete from posts where create_ts < unixepoch('now', '-15 minutes')"
)
self.db_cnx.pragma('wal_checkpoint(TRUNCATE)')