this repo has no description
1import logging 2 3import apsw 4import apsw.ext 5import grapheme 6 7from . import BaseFeed 8 9MAX_TEXT_LENGTH = 140 10 11class RapidFireFeed(BaseFeed): 12 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire' 13 14 def __init__(self): 15 self.db_cnx = apsw.Connection('db/rapidfire.db') 16 self.db_cnx.pragma('journal_mode', 'WAL') 17 self.db_cnx.pragma('wal_autocheckpoint', '0') 18 19 with self.db_cnx: 20 self.db_cnx.execute(""" 21 create table if not exists posts (uri text, create_ts timestamp, lang text); 22 create index if not exists create_ts_idx on posts(create_ts); 23 """) 24 25 self.logger = logging.getLogger('feeds.rapidfire') 26 27 def process_commit(self, commit): 28 if commit['opType'] != 'c': 29 return 30 31 if commit['collection'] != 'app.bsky.feed.post': 32 return 33 34 record = commit.get('record') 35 if record is None: 36 return 37 38 if all([ 39 grapheme.length(record.get('text', '')) <= MAX_TEXT_LENGTH, 40 record.get('reply') is None, 41 record.get('embed') is None, 42 record.get('facets') is None 43 ]): 44 repo = commit['did'] 45 rkey = commit['rkey'] 46 post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}' 47 ts = self.safe_timestamp(record.get('createdAt')).timestamp() 48 49 self.transaction_begin(self.db_cnx) 50 51 langs = record.get('langs') or [''] 52 for lang in langs: 53 self.db_cnx.execute( 54 'insert into posts (uri, create_ts, lang) values (:uri, :ts, :lang)', 55 dict(uri=post_uri, ts=ts, lang=lang) 56 ) 57 58 def delete_old_posts(self): 59 self.db_cnx.execute( 60 "delete from posts where create_ts < unixepoch('now', '-15 minutes')" 61 ) 62 self.logger.debug('deleted {} old posts'.format(self.db_cnx.changes())) 63 64 def commit_changes(self): 65 self.delete_old_posts() 66 self.logger.debug('committing changes') 67 self.transaction_commit(self.db_cnx) 68 self.wal_checkpoint(self.db_cnx, 'RESTART') 69 70 def serve_feed(self, limit, offset, langs): 71 if '*' in langs: 72 cur = self.db_cnx.execute( 73 "select uri from posts order by create_ts desc limit :limit offset :offset", 74 dict(limit=limit, offset=offset) 75 ) 76 return [uri for (uri,) in cur] 77 else: 78 lang_values = list(langs.values()) 79 lang_selects = ['select uri, create_ts from posts where lang = ?'] * len(lang_values) 80 lang_clause = ' union '.join(lang_selects) 81 cur = self.db_cnx.execute( 82 lang_clause + ' order by create_ts desc limit ? offset ?', 83 [*lang_values, limit, offset] 84 ) 85 return [uri for (uri, create_ts) in cur] 86 87 def serve_feed_debug(self, limit, offset, langs): 88 query = """ 89 select *, unixepoch('now') - create_ts as age_seconds 90 from posts 91 order by create_ts desc 92 limit :limit offset :offset 93 """ 94 bindings = dict(limit=limit, offset=offset) 95 return apsw.ext.format_query_table( 96 self.db_cnx, query, bindings, 97 string_sanitize=2, text_width=9999, use_unicode=True 98 )