this repo has no description
1import os 2import sys 3import apsw 4 5from . import BaseFeed 6 7MAX_TEXT_LENGTH = 140 8 9class RapidFireFeed(BaseFeed): 10 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire' 11 12 def __init__(self): 13 db_fname = '' 14 if os.path.isdir('/dev/shm/'): 15 os.makedirs('/dev/shm/feedgens/', exist_ok=True) 16 db_fname = '/dev/shm/feedgens/rapidfire.db' 17 else: 18 db_fname = 'db/rapidfire.db' 19 20 self.db_cnx = apsw.Connection(db_fname) 21 self.db_cnx.pragma('journal_mode', 'WAL') 22 self.db_cnx.pragma('synchronous', 'OFF') 23 self.db_cnx.pragma('wal_autocheckpoint', '0') 24 25 with self.db_cnx: 26 self.db_cnx.execute(""" 27 create table if not exists posts (uri text, create_ts timestamp, lang text); 28 create index if not exists create_ts_idx on posts(create_ts); 29 """) 30 31 def process_commit(self, commit): 32 op = commit['op'] 33 if op['action'] != 'create': 34 return 35 36 collection, _ = op['path'].split('/') 37 if collection != 'app.bsky.feed.post': 38 return 39 40 record = op['record'] 41 42 if all([ 43 len(record['text']) <= MAX_TEXT_LENGTH, 44 record.get('reply') is None, 45 record.get('embed') is None, 46 record.get('facets') is None 47 ]): 48 repo = commit['repo'] 49 path = op['path'] 50 post_uri = f'at://{repo}/{path}' 51 ts = record['createdAt'] 52 53 with self.db_cnx: 54 langs = record.get('langs') or [''] 55 for lang in langs: 56 self.db_cnx.execute( 57 'insert into posts (uri, create_ts, lang) values (:uri, :ts, :lang)', 58 dict(uri=post_uri, ts=ts, lang=lang) 59 ) 60 61 def run_tasks_minute(self): 62 sys.stdout.write('rapidfire: running minute tasks\n') 63 sys.stdout.flush() 64 65 with self.db_cnx: 66 self.db_cnx.execute( 67 "delete from posts where strftime('%s', create_ts) < strftime('%s', 'now', '-15 minutes')" 68 ) 69 70 self.db_cnx.pragma('wal_checkpoint(TRUNCATE)') 71 72 def serve_feed(self, limit, offset, langs): 73 if '*' in langs: 74 cur = self.db_cnx.execute( 75 "select uri from posts order by create_ts desc limit :limit offset :offset", 76 dict(limit=limit, offset=offset) 77 ) 78 return [uri for (uri,) in cur] 79 else: 80 lang_values = list(langs.values()) 81 lang_selects = ['select uri, create_ts from posts where lang = ?'] * len(lang_values) 82 lang_clause = ' union '.join(lang_selects) 83 cur = self.db_cnx.execute( 84 lang_clause + ' order by create_ts desc limit ? offset ?', 85 [*lang_values, limit, offset] 86 ) 87 return [uri for (uri, create_ts) in cur]