this repo has no description

feeds/rapidfire.py: update for new framework

Changed files
+24 -22
feeds
+1 -1
feedgen.py
···
feed_manager = FeedManager()
feed_manager.register(RapidFireFeed)
-
feed_manager.register(PopularFeed)
+
# feed_manager.register(PopularFeed)
current_minute = None
async for commit in firehose_events(firehose_manager):
+22 -20
feeds/rapidfire.py
···
import os
import sys
-
import sqlite3
+
import apsw
from . import BaseFeed
···
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire'
def __init__(self):
+
db_fname = ''
if os.path.isdir('/dev/shm/'):
os.makedirs('/dev/shm/feedgens/', exist_ok=True)
-
self.db_cnx = sqlite3.connect('/dev/shm/feedgens/rapidfire.db')
+
db_fname = '/dev/shm/feedgens/rapidfire.db'
else:
-
self.db_cnx = sqlite3.connect('db/rapidfire.db')
+
db_fname = 'db/rapidfire.db'
+
+
self.db_cnx = apsw.Connection(db_fname)
+
self.db_cnx.pragma('journal_mode', 'WAL')
+
self.db_cnx.pragma('synchronous', 'OFF')
+
self.db_cnx.pragma('wal_autocheckpoint', '0')
with self.db_cnx:
-
self.db_cnx.executescript(
-
"pragma journal_mode = WAL;"
-
"pragma synchronous = OFF;"
-
"pragma wal_autocheckpoint = 0;"
-
"create table if not exists posts (uri text, create_ts timestamp, lang text);"
-
"create index if not exists create_ts_idx on posts(create_ts);"
-
)
-
-
self.checkpoint = 0
+
self.db_cnx.execute("""
+
create table if not exists posts (uri text, create_ts timestamp, lang text);
+
create index if not exists create_ts_idx on posts(create_ts);
+
""")
def process_commit(self, commit):
op = commit['op']
···
path = op['path']
post_uri = f'at://{repo}/{path}'
ts = record['createdAt']
-
self.checkpoint += 1
with self.db_cnx:
langs = record.get('langs') or ['']
···
dict(uri=post_uri, ts=ts, lang=lang)
)
-
self.db_cnx.execute(
-
"delete from posts where strftime('%s', create_ts) < strftime('%s', 'now', '-15 minutes')"
-
)
+
def run_tasks_minute(self):
+
sys.stdout.write('rapidfire: running minute tasks\n')
+
sys.stdout.flush()
+
+
with self.db_cnx:
+
self.db_cnx.execute(
+
"delete from posts where strftime('%s', create_ts) < strftime('%s', 'now', '-15 minutes')"
+
)
-
if self.checkpoint % 100 == 0:
-
sys.stdout.write('rapidfire: checkpoint\n')
-
sys.stdout.flush()
-
self.db_cnx.execute("pragma wal_checkpoint(TRUNCATE)")
+
self.db_cnx.pragma('wal_checkpoint(TRUNCATE)')
def serve_feed(self, limit, offset, langs):
if '*' in langs:
+1 -1
feedweb.py
···
def get_feed_skeleton():
manager = FeedManager()
manager.register(RapidFireFeed)
-
manager.register(PopularFeed)
+
# manager.register(PopularFeed)
try:
limit = int(request.args.get('limit', 50))