this repo has no description
1import logging 2 3import apsw 4import apsw.ext 5from expiringdict import ExpiringDict 6import threading 7import queue 8 9from . import BaseFeed 10 11# store post in database once it has this many likes 12MIN_LIKES = 5 13 14class DatabaseWorker(threading.Thread): 15 def __init__(self, name, db_path, task_queue): 16 super().__init__() 17 self.db_cnx = apsw.Connection(db_path) 18 self.db_cnx.pragma('foreign_keys', True) 19 self.db_cnx.pragma('journal_mode', 'WAL') 20 self.db_cnx.pragma('wal_autocheckpoint', '0') 21 self.stop_signal = False 22 self.task_queue = task_queue 23 self.logger = logging.getLogger(f'feeds.db.{name}') 24 self.changes = 0 25 26 def run(self): 27 while not self.stop_signal: 28 task = self.task_queue.get(block=True) 29 if task == 'STOP': 30 self.stop_signal = True 31 elif task == 'COMMIT': 32 self.logger.debug(f'committing {self.changes} changes') 33 if self.db_cnx.in_transaction: 34 self.db_cnx.execute('COMMIT') 35 checkpoint = self.db_cnx.execute('PRAGMA wal_checkpoint(PASSIVE)') 36 self.logger.debug(f'checkpoint: {checkpoint.fetchall()!r}') 37 self.changes = 0 38 self.logger.debug(f'qsize: {self.task_queue.qsize()}') 39 else: 40 sql, bindings = task 41 if not self.db_cnx.in_transaction: 42 self.db_cnx.execute('BEGIN') 43 self.db_cnx.execute(sql, bindings) 44 self.changes += self.db_cnx.changes() 45 self.task_queue.task_done() 46 self.db_cnx.close() 47 48 def stop(self): 49 self.task_queue.put('STOP') 50 51class MostLikedFeed(BaseFeed): 52 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/most-liked' 53 DELETE_OLD_POSTS_QUERY = """ 54 delete from posts where create_ts < unixepoch('now', '-24 hours'); 55 """ 56 57 def __init__(self): 58 self.db_cnx = apsw.Connection('db/mostliked.db') 59 self.db_cnx.pragma('foreign_keys', True) 60 self.db_cnx.pragma('journal_mode', 'WAL') 61 self.db_cnx.pragma('wal_autocheckpoint', '0') 62 63 with self.db_cnx: 64 self.db_cnx.execute(""" 65 create table if not exists posts ( 66 uri text primary key, 67 create_ts timestamp, 68 likes int 69 ); 70 create table if not exists langs ( 71 uri text, 72 lang text, 73 foreign key(uri) references posts(uri) on delete cascade 74 ); 75 create index if not exists ts_idx on posts(create_ts); 76 """) 77 78 self.logger = logging.getLogger('feeds.mostliked') 79 self.drafts = ExpiringDict(max_len=50_000, max_age_seconds=5*60) 80 81 self.db_writes = queue.Queue() 82 db_worker = DatabaseWorker('mostliked', 'db/mostliked.db', self.db_writes) 83 db_worker.start() 84 85 def process_commit(self, commit): 86 if commit['opType'] != 'c': 87 return 88 89 if commit['collection'] == 'app.bsky.feed.post': 90 record = commit.get('record') 91 post_uri = f"at://{commit['did']}/app.bsky.feed.post/{commit['rkey']}" 92 93 # to keep the DB in check, instead of adding every post right away 94 # we make note of it but only add to DB once it gets some likes 95 self.drafts[post_uri] = { 96 'ts': self.safe_timestamp(record.get('createdAt')).timestamp(), 97 'langs': record.get('langs', []), 98 'likes': 0, 99 } 100 101 elif commit['collection'] == 'app.bsky.feed.like': 102 record = commit.get('record') 103 try: 104 subject_uri = record['subject']['uri'] 105 except KeyError: 106 return 107 108 if subject_uri in self.drafts: 109 record_info = self.drafts.pop(subject_uri).copy() 110 record_info['likes'] += 1 111 if record_info['likes'] < MIN_LIKES: 112 self.drafts[subject_uri] = record_info 113 return 114 115 self.logger.debug(f'graduating {subject_uri}') 116 117 task = ( 118 'insert or ignore into posts (uri, create_ts, likes) values (:uri, :ts, :likes)', 119 {'uri': subject_uri, 'ts': record_info['ts'], 'likes': record_info['likes']} 120 ) 121 self.db_writes.put(task) 122 123 for lang in record_info['langs']: 124 task = ( 125 'insert or ignore into langs (uri, lang) values (:uri, :lang)', 126 {'uri': subject_uri, 'lang': lang} 127 ) 128 self.db_writes.put(task) 129 130 subject_exists = self.db_cnx.execute('select 1 from posts where uri = ?', [subject_uri]) 131 if subject_exists.fetchone() is None: 132 return 133 134 task = ( 135 'update posts set likes = likes + 1 where uri = :uri', 136 {'uri': subject_uri} 137 ) 138 self.db_writes.put(task) 139 140 def commit_changes(self): 141 self.db_writes.put((self.DELETE_OLD_POSTS_QUERY, {})) 142 self.db_writes.put('COMMIT') 143 self.logger.debug(f'there are {len(self.drafts)} drafts') 144 145 def generate_sql(self, limit, offset, langs): 146 bindings = [] 147 sql = """ 148 select posts.uri, create_ts, create_ts - unixepoch('now', '-24 hours') as ttl, likes, lang 149 from posts 150 left join langs on posts.uri = langs.uri 151 where 152 """ 153 if not '*' in langs: 154 lang_values = list(langs.values()) 155 bindings.extend(lang_values) 156 sql += " OR ".join(['lang = ?'] * len(lang_values)) 157 else: 158 sql += " 1=1 " 159 sql += """ 160 order by likes desc, create_ts desc 161 limit ? offset ? 162 """ 163 bindings.extend([limit, offset]) 164 return sql, bindings 165 166 def serve_feed(self, limit, offset, langs): 167 sql, bindings = self.generate_sql(limit, offset, langs) 168 cur = self.db_cnx.execute(sql, bindings) 169 return [row[0] for row in cur] 170 171 def serve_feed_debug(self, limit, offset, langs): 172 sql, bindings = self.generate_sql(limit, offset, langs) 173 return apsw.ext.format_query_table( 174 self.db_cnx, sql, bindings, 175 string_sanitize=2, text_width=9999, use_unicode=True 176 )