this repo has no description
1import logging
2
3import apsw
4import apsw.ext
5from expiringdict import ExpiringDict
6import threading
7import queue
8
9from . import BaseFeed
10
11# store post in database once it has this many likes
12MIN_LIKES = 5
13
14class DatabaseWorker(threading.Thread):
15 def __init__(self, name, db_path, task_queue):
16 super().__init__()
17 self.db_cnx = apsw.Connection(db_path)
18 self.db_cnx.pragma('foreign_keys', True)
19 self.db_cnx.pragma('journal_mode', 'WAL')
20 self.db_cnx.pragma('wal_autocheckpoint', '0')
21 self.stop_signal = False
22 self.task_queue = task_queue
23 self.logger = logging.getLogger(f'feeds.db.{name}')
24 self.changes = 0
25
26 def run(self):
27 while True:
28 task = self.task_queue.get(block=True)
29 if task == 'STOP':
30 self.logger.debug('received STOP, breaking now')
31 break
32 elif task == 'COMMIT':
33 self.logger.debug(f'committing {self.changes} changes')
34 if self.db_cnx.in_transaction:
35 self.db_cnx.execute('COMMIT')
36 checkpoint = self.db_cnx.execute('PRAGMA wal_checkpoint(PASSIVE)')
37 self.logger.debug(f'checkpoint: {checkpoint.fetchall()!r}')
38 self.changes = 0
39 self.logger.debug(f'qsize: {self.task_queue.qsize()}')
40 else:
41 sql, bindings = task
42 if not self.db_cnx.in_transaction:
43 self.db_cnx.execute('BEGIN')
44 self.db_cnx.execute(sql, bindings)
45 self.changes += self.db_cnx.changes()
46 self.task_queue.task_done()
47
48 self.logger.debug('closing database connection')
49 self.db_cnx.close()
50
51 def stop(self):
52 self.task_queue.put('STOP')
53
54class MostLikedFeed(BaseFeed):
55 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/most-liked'
56 DELETE_OLD_POSTS_QUERY = """
57 delete from posts where create_ts < unixepoch('now', '-24 hours');
58 """
59
60 def __init__(self):
61 self.db_cnx = apsw.Connection('db/mostliked.db')
62 self.db_cnx.pragma('foreign_keys', True)
63 self.db_cnx.pragma('journal_mode', 'WAL')
64 self.db_cnx.pragma('wal_autocheckpoint', '0')
65
66 with self.db_cnx:
67 self.db_cnx.execute("""
68 create table if not exists posts (
69 uri text primary key,
70 create_ts timestamp,
71 likes int
72 );
73 create table if not exists langs (
74 uri text,
75 lang text,
76 foreign key(uri) references posts(uri) on delete cascade
77 );
78 create index if not exists ts_idx on posts(create_ts);
79 """)
80
81 self.logger = logging.getLogger('feeds.mostliked')
82 self.drafts = ExpiringDict(max_len=50_000, max_age_seconds=5*60)
83
84 self.db_writes = queue.Queue()
85 self.db_worker = DatabaseWorker('mostliked', 'db/mostliked.db', self.db_writes)
86 self.db_worker.start()
87
88 def stop_db_worker(self):
89 self.logger.debug('sending STOP')
90 self.db_writes.put('STOP')
91
92 def process_commit(self, commit):
93 if commit['opType'] != 'c':
94 return
95
96 if commit['collection'] == 'app.bsky.feed.post':
97 record = commit.get('record')
98 post_uri = f"at://{commit['did']}/app.bsky.feed.post/{commit['rkey']}"
99
100 # to keep the DB in check, instead of adding every post right away
101 # we make note of it but only add to DB once it gets some likes
102 self.drafts[post_uri] = {
103 'ts': self.safe_timestamp(record.get('createdAt')).timestamp(),
104 'langs': record.get('langs', []),
105 'likes': 0,
106 }
107
108 elif commit['collection'] == 'app.bsky.feed.like':
109 record = commit.get('record')
110 try:
111 subject_uri = record['subject']['uri']
112 except KeyError:
113 return
114
115 if subject_uri in self.drafts:
116 record_info = self.drafts.pop(subject_uri).copy()
117 record_info['likes'] += 1
118 if record_info['likes'] < MIN_LIKES:
119 self.drafts[subject_uri] = record_info
120 return
121
122 self.logger.debug(f'graduating {subject_uri}')
123
124 task = (
125 'insert or ignore into posts (uri, create_ts, likes) values (:uri, :ts, :likes)',
126 {'uri': subject_uri, 'ts': record_info['ts'], 'likes': record_info['likes']}
127 )
128 self.db_writes.put(task)
129
130 for lang in record_info['langs']:
131 task = (
132 'insert or ignore into langs (uri, lang) values (:uri, :lang)',
133 {'uri': subject_uri, 'lang': lang}
134 )
135 self.db_writes.put(task)
136
137 subject_exists = self.db_cnx.execute('select 1 from posts where uri = ?', [subject_uri])
138 if subject_exists.fetchone() is None:
139 return
140
141 task = (
142 'update posts set likes = likes + 1 where uri = :uri',
143 {'uri': subject_uri}
144 )
145 self.db_writes.put(task)
146
147 def commit_changes(self):
148 self.db_writes.put((self.DELETE_OLD_POSTS_QUERY, {}))
149 self.db_writes.put('COMMIT')
150 self.logger.debug(f'there are {len(self.drafts)} drafts')
151
152 def generate_sql(self, limit, offset, langs):
153 bindings = []
154 sql = """
155 select posts.uri, create_ts, create_ts - unixepoch('now', '-24 hours') as ttl, likes, lang
156 from posts
157 left join langs on posts.uri = langs.uri
158 where
159 """
160 if not '*' in langs:
161 lang_values = list(langs.values())
162 bindings.extend(lang_values)
163 sql += " OR ".join(['lang = ?'] * len(lang_values))
164 else:
165 sql += " 1=1 "
166 sql += """
167 order by likes desc, create_ts desc
168 limit ? offset ?
169 """
170 bindings.extend([limit, offset])
171 return sql, bindings
172
173 def serve_feed(self, limit, offset, langs):
174 sql, bindings = self.generate_sql(limit, offset, langs)
175 cur = self.db_cnx.execute(sql, bindings)
176 return [row[0] for row in cur]
177
178 def serve_feed_debug(self, limit, offset, langs):
179 sql, bindings = self.generate_sql(limit, offset, langs)
180 return apsw.ext.format_query_table(
181 self.db_cnx, sql, bindings,
182 string_sanitize=2, text_width=9999, use_unicode=True
183 )