this repo has no description
1import logging 2 3import apsw 4import apsw.ext 5import threading 6import queue 7 8from . import BaseFeed 9 10class DatabaseWorker(threading.Thread): 11 def __init__(self, name, db_path, task_queue): 12 super().__init__() 13 self.db_cnx = apsw.Connection(db_path) 14 self.db_cnx.pragma('foreign_keys', True) 15 self.db_cnx.pragma('journal_mode', 'WAL') 16 self.db_cnx.pragma('wal_autocheckpoint', '0') 17 self.stop_signal = False 18 self.task_queue = task_queue 19 self.logger = logging.getLogger(f'feeds.db.{name}') 20 self.changes = 0 21 22 def run(self): 23 while not self.stop_signal: 24 task = self.task_queue.get(block=True) 25 if task == 'STOP': 26 self.stop_signal = True 27 elif task == 'COMMIT': 28 self.logger.debug(f'committing {self.changes} changes') 29 if self.db_cnx.in_transaction: 30 self.db_cnx.execute('COMMIT') 31 checkpoint = self.db_cnx.execute('PRAGMA wal_checkpoint(PASSIVE)') 32 self.logger.debug(f'checkpoint: {checkpoint.fetchall()!r}') 33 self.changes = 0 34 self.logger.debug(f'qsize: {self.task_queue.qsize()}') 35 else: 36 sql, bindings = task 37 if not self.db_cnx.in_transaction: 38 self.db_cnx.execute('BEGIN') 39 self.db_cnx.execute(sql, bindings) 40 self.changes += self.db_cnx.changes() 41 self.task_queue.task_done() 42 self.db_cnx.close() 43 44 def stop(self): 45 self.task_queue.put('STOP') 46 47class MostLikedFeed(BaseFeed): 48 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/most-liked' 49 DELETE_OLD_POSTS_QUERY = """ 50 delete from posts where ( 51 create_ts < unixepoch('now', '-15 minutes') and likes < 2 52 ) or create_ts < unixepoch('now', '-24 hours'); 53 """ 54 55 def __init__(self): 56 self.db_cnx = apsw.Connection('db/mostliked.db') 57 self.db_cnx.pragma('foreign_keys', True) 58 self.db_cnx.pragma('journal_mode', 'WAL') 59 self.db_cnx.pragma('wal_autocheckpoint', '0') 60 61 with self.db_cnx: 62 self.db_cnx.execute(""" 63 create table if not exists posts ( 64 uri text primary key, 65 create_ts timestamp, 66 likes int 67 ); 68 create table if not exists langs ( 69 uri text, 70 lang text, 71 foreign key(uri) references posts(uri) on delete cascade 72 ); 73 create index if not exists ts_idx on posts(create_ts); 74 """) 75 76 self.logger = logging.getLogger('feeds.mostliked') 77 78 self.db_writes = queue.Queue() 79 db_worker = DatabaseWorker('mostliked', 'db/mostliked.db', self.db_writes) 80 db_worker.start() 81 82 def process_commit(self, commit): 83 if commit['opType'] != 'c': 84 return 85 86 if commit['collection'] == 'app.bsky.feed.post': 87 record = commit.get('record') 88 post_uri = f"at://{commit['did']}/app.bsky.feed.post/{commit['rkey']}" 89 task = ( 90 'insert or ignore into posts (uri, create_ts, likes) values (:uri, :ts, 0)', 91 {'uri': post_uri, 'ts': self.safe_timestamp(record.get('createdAt')).timestamp()} 92 ) 93 self.db_writes.put(task) 94 95 langs = record.get('langs', []) 96 for lang in langs: 97 task = ( 98 'insert or ignore into langs (uri, lang) values (:uri, :lang)', 99 {'uri': post_uri, 'lang': lang} 100 ) 101 self.db_writes.put(task) 102 103 elif commit['collection'] == 'app.bsky.feed.like': 104 record = commit.get('record') 105 try: 106 subject_uri = record['subject']['uri'] 107 except KeyError: 108 return 109 110 task = ( 111 'update posts set likes = likes + 1 where uri = :uri', 112 {'uri': subject_uri} 113 ) 114 self.db_writes.put(task) 115 116 def commit_changes(self): 117 self.db_writes.put((self.DELETE_OLD_POSTS_QUERY, {})) 118 self.db_writes.put('COMMIT') 119 120 def generate_sql(self, limit, offset, langs): 121 bindings = [] 122 sql = """ 123 select posts.uri, create_ts, create_ts - unixepoch('now', '-15 minutes') as rem, likes, lang 124 from posts 125 left join langs on posts.uri = langs.uri 126 where 127 """ 128 if not '*' in langs: 129 lang_values = list(langs.values()) 130 bindings.extend(lang_values) 131 sql += " OR ".join(['lang = ?'] * len(lang_values)) 132 else: 133 sql += " 1=1 " 134 sql += """ 135 order by likes desc, create_ts desc 136 limit ? offset ? 137 """ 138 bindings.extend([limit, offset]) 139 return sql, bindings 140 141 def serve_feed(self, limit, offset, langs): 142 sql, bindings = self.generate_sql(limit, offset, langs) 143 cur = self.db_cnx.execute(sql, bindings) 144 return [row[0] for row in cur] 145 146 def serve_feed_debug(self, limit, offset, langs): 147 sql, bindings = self.generate_sql(limit, offset, langs) 148 return apsw.ext.format_query_table( 149 self.db_cnx, sql, bindings, 150 string_sanitize=2, text_width=9999, use_unicode=True 151 )