this repo has no description
1import logging 2 3import apsw 4import apsw.ext 5from expiringdict import ExpiringDict 6import threading 7import queue 8 9from . import BaseFeed 10 11# store post in database once it has this many likes 12MIN_LIKES = 5 13 14class DatabaseWorker(threading.Thread): 15 def __init__(self, name, db_path, task_queue): 16 super().__init__() 17 self.db_cnx = apsw.Connection(db_path) 18 self.db_cnx.pragma('foreign_keys', True) 19 self.db_cnx.pragma('journal_mode', 'WAL') 20 self.db_cnx.pragma('wal_autocheckpoint', '0') 21 self.stop_signal = False 22 self.task_queue = task_queue 23 self.logger = logging.getLogger(f'feeds.db.{name}') 24 self.changes = 0 25 26 def run(self): 27 while True: 28 task = self.task_queue.get(block=True) 29 if task == 'STOP': 30 self.logger.debug('received STOP, breaking now') 31 break 32 elif task == 'COMMIT': 33 self.logger.debug(f'committing {self.changes} changes') 34 if self.db_cnx.in_transaction: 35 self.db_cnx.execute('COMMIT') 36 checkpoint = self.db_cnx.execute('PRAGMA wal_checkpoint(PASSIVE)') 37 self.logger.debug(f'checkpoint: {checkpoint.fetchall()!r}') 38 self.changes = 0 39 self.logger.debug(f'qsize: {self.task_queue.qsize()}') 40 else: 41 sql, bindings = task 42 if not self.db_cnx.in_transaction: 43 self.db_cnx.execute('BEGIN') 44 self.db_cnx.execute(sql, bindings) 45 self.changes += self.db_cnx.changes() 46 self.task_queue.task_done() 47 48 self.logger.debug('closing database connection') 49 self.db_cnx.close() 50 51 def stop(self): 52 self.task_queue.put('STOP') 53 54class MostLikedFeed(BaseFeed): 55 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/most-liked' 56 DELETE_OLD_POSTS_QUERY = """ 57 delete from posts where create_ts < unixepoch('now', '-24 hours'); 58 """ 59 60 def __init__(self): 61 self.db_cnx = apsw.Connection('db/mostliked.db') 62 self.db_cnx.pragma('foreign_keys', True) 63 self.db_cnx.pragma('journal_mode', 'WAL') 64 self.db_cnx.pragma('wal_autocheckpoint', '0') 65 66 with self.db_cnx: 67 self.db_cnx.execute(""" 68 create table if not exists posts ( 69 uri text primary key, 70 create_ts timestamp, 71 likes int 72 ); 73 create table if not exists langs ( 74 uri text, 75 lang text, 76 foreign key(uri) references posts(uri) on delete cascade 77 ); 78 create index if not exists ts_idx on posts(create_ts); 79 """) 80 81 self.logger = logging.getLogger('feeds.mostliked') 82 self.drafts = ExpiringDict(max_len=50_000, max_age_seconds=5*60) 83 84 self.db_writes = queue.Queue() 85 self.db_worker = DatabaseWorker('mostliked', 'db/mostliked.db', self.db_writes) 86 self.db_worker.start() 87 88 def stop_db_worker(self): 89 self.logger.debug('sending STOP') 90 self.db_writes.put('STOP') 91 92 def process_commit(self, commit): 93 if commit['opType'] != 'c': 94 return 95 96 if commit['collection'] == 'app.bsky.feed.post': 97 record = commit.get('record') 98 post_uri = f"at://{commit['did']}/app.bsky.feed.post/{commit['rkey']}" 99 100 # to keep the DB in check, instead of adding every post right away 101 # we make note of it but only add to DB once it gets some likes 102 self.drafts[post_uri] = { 103 'ts': self.safe_timestamp(record.get('createdAt')).timestamp(), 104 'langs': record.get('langs', []), 105 'likes': 0, 106 } 107 108 elif commit['collection'] == 'app.bsky.feed.like': 109 record = commit.get('record') 110 try: 111 subject_uri = record['subject']['uri'] 112 except KeyError: 113 return 114 115 if subject_uri in self.drafts: 116 record_info = self.drafts.pop(subject_uri).copy() 117 record_info['likes'] += 1 118 if record_info['likes'] < MIN_LIKES: 119 self.drafts[subject_uri] = record_info 120 return 121 122 self.logger.debug(f'graduating {subject_uri}') 123 124 task = ( 125 'insert or ignore into posts (uri, create_ts, likes) values (:uri, :ts, :likes)', 126 {'uri': subject_uri, 'ts': record_info['ts'], 'likes': record_info['likes']} 127 ) 128 self.db_writes.put(task) 129 130 for lang in record_info['langs']: 131 task = ( 132 'insert or ignore into langs (uri, lang) values (:uri, :lang)', 133 {'uri': subject_uri, 'lang': lang} 134 ) 135 self.db_writes.put(task) 136 137 subject_exists = self.db_cnx.execute('select 1 from posts where uri = ?', [subject_uri]) 138 if subject_exists.fetchone() is None: 139 return 140 141 task = ( 142 'update posts set likes = likes + 1 where uri = :uri', 143 {'uri': subject_uri} 144 ) 145 self.db_writes.put(task) 146 147 def commit_changes(self): 148 self.db_writes.put((self.DELETE_OLD_POSTS_QUERY, {})) 149 self.db_writes.put('COMMIT') 150 self.logger.debug(f'there are {len(self.drafts)} drafts') 151 152 def generate_sql(self, limit, offset, langs): 153 bindings = [] 154 sql = """ 155 select posts.uri, create_ts, create_ts - unixepoch('now', '-24 hours') as ttl, likes, lang 156 from posts 157 left join langs on posts.uri = langs.uri 158 where 159 """ 160 if not '*' in langs: 161 lang_values = list(langs.values()) 162 bindings.extend(lang_values) 163 sql += " OR ".join(['lang = ?'] * len(lang_values)) 164 else: 165 sql += " 1=1 " 166 sql += """ 167 order by likes desc, create_ts desc 168 limit ? offset ? 169 """ 170 bindings.extend([limit, offset]) 171 return sql, bindings 172 173 def serve_feed(self, limit, offset, langs): 174 sql, bindings = self.generate_sql(limit, offset, langs) 175 cur = self.db_cnx.execute(sql, bindings) 176 return [row[0] for row in cur] 177 178 def serve_feed_debug(self, limit, offset, langs): 179 sql, bindings = self.generate_sql(limit, offset, langs) 180 return apsw.ext.format_query_table( 181 self.db_cnx, sql, bindings, 182 string_sanitize=2, text_width=9999, use_unicode=True 183 )