this repo has no description
1import logging 2import os 3 4import apsw 5import apsw.ext 6 7from . import BaseFeed 8 9MAX_TEXT_LENGTH = 140 10 11class RapidFireFeed(BaseFeed): 12 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire' 13 14 def __init__(self): 15 db_fname = '' 16 if os.path.isdir('/dev/shm/'): 17 os.makedirs('/dev/shm/feedgens/', exist_ok=True) 18 db_fname = '/dev/shm/feedgens/rapidfire.db' 19 else: 20 db_fname = 'db/rapidfire.db' 21 22 self.db_cnx = apsw.Connection(db_fname) 23 self.db_cnx.pragma('journal_mode', 'WAL') 24 self.db_cnx.pragma('synchronous', 'OFF') 25 self.db_cnx.pragma('wal_autocheckpoint', '0') 26 27 with self.db_cnx: 28 self.db_cnx.execute(""" 29 create table if not exists posts (uri text, create_ts timestamp, lang text); 30 create index if not exists create_ts_idx on posts(create_ts); 31 """) 32 33 self.logger = logging.getLogger('feeds.rapidfire') 34 35 def process_commit(self, commit): 36 op = commit['op'] 37 if op['action'] != 'create': 38 return 39 40 collection, _ = op['path'].split('/') 41 if collection != 'app.bsky.feed.post': 42 return 43 44 record = op['record'] 45 46 if all([ 47 len(record['text']) <= MAX_TEXT_LENGTH, 48 record.get('reply') is None, 49 record.get('embed') is None, 50 record.get('facets') is None 51 ]): 52 repo = commit['repo'] 53 path = op['path'] 54 post_uri = f'at://{repo}/{path}' 55 ts = self.safe_timestamp(record['createdAt']).timestamp() 56 57 langs = record.get('langs') or [''] 58 for lang in langs: 59 self.db_cnx.execute( 60 'insert into posts (uri, create_ts, lang) values (:uri, :ts, :lang)', 61 dict(uri=post_uri, ts=ts, lang=lang) 62 ) 63 64 def run_tasks_minute(self): 65 self.logger.debug('running minute tasks') 66 67 with self.db_cnx: 68 self.db_cnx.execute( 69 "delete from posts where create_ts < unixepoch('now', '-15 minutes')" 70 ) 71 72 self.db_cnx.pragma('wal_checkpoint(TRUNCATE)') 73 74 def serve_feed(self, limit, offset, langs): 75 if '*' in langs: 76 cur = self.db_cnx.execute( 77 "select uri from posts order by create_ts desc limit :limit offset :offset", 78 dict(limit=limit, offset=offset) 79 ) 80 return [uri for (uri,) in cur] 81 else: 82 lang_values = list(langs.values()) 83 lang_selects = ['select uri, create_ts from posts where lang = ?'] * len(lang_values) 84 lang_clause = ' union '.join(lang_selects) 85 cur = self.db_cnx.execute( 86 lang_clause + ' order by create_ts desc limit ? offset ?', 87 [*lang_values, limit, offset] 88 ) 89 return [uri for (uri, create_ts) in cur] 90 91 def serve_feed_debug(self, limit, offset, langs): 92 query = ( 93 "select *, unixepoch('now') from posts order by create_ts desc limit :limit offset :offset;" 94 ) 95 bindings = dict(limit=limit, offset=offset) 96 return apsw.ext.format_query_table( 97 self.db_cnx, query, bindings, 98 string_sanitize=2, text_width=9999, use_unicode=True 99 )