this repo has no description
1import logging
2
3import apsw
4import apsw.ext
5from expiringdict import ExpiringDict
6import threading
7import queue
8
9from . import BaseFeed
10
11# store post in database once it has this many likes
12MIN_LIKES = 5
13
14class DatabaseWorker(threading.Thread):
15 def __init__(self, name, db_path, task_queue):
16 super().__init__()
17 self.db_cnx = apsw.Connection(db_path)
18 self.db_cnx.pragma('foreign_keys', True)
19 self.db_cnx.pragma('journal_mode', 'WAL')
20 self.db_cnx.pragma('wal_autocheckpoint', '0')
21 self.stop_signal = False
22 self.task_queue = task_queue
23 self.logger = logging.getLogger(f'feeds.db.{name}')
24 self.changes = 0
25
26 def run(self):
27 while not self.stop_signal:
28 task = self.task_queue.get(block=True)
29 if task == 'STOP':
30 self.stop_signal = True
31 elif task == 'COMMIT':
32 self.logger.debug(f'committing {self.changes} changes')
33 if self.db_cnx.in_transaction:
34 self.db_cnx.execute('COMMIT')
35 checkpoint = self.db_cnx.execute('PRAGMA wal_checkpoint(PASSIVE)')
36 self.logger.debug(f'checkpoint: {checkpoint.fetchall()!r}')
37 self.changes = 0
38 self.logger.debug(f'qsize: {self.task_queue.qsize()}')
39 else:
40 sql, bindings = task
41 if not self.db_cnx.in_transaction:
42 self.db_cnx.execute('BEGIN')
43 self.db_cnx.execute(sql, bindings)
44 self.changes += self.db_cnx.changes()
45 self.task_queue.task_done()
46 self.db_cnx.close()
47
48 def stop(self):
49 self.task_queue.put('STOP')
50
51class MostLikedFeed(BaseFeed):
52 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/most-liked'
53 DELETE_OLD_POSTS_QUERY = """
54 delete from posts where create_ts < unixepoch('now', '-24 hours');
55 """
56
57 def __init__(self):
58 self.db_cnx = apsw.Connection('db/mostliked.db')
59 self.db_cnx.pragma('foreign_keys', True)
60 self.db_cnx.pragma('journal_mode', 'WAL')
61 self.db_cnx.pragma('wal_autocheckpoint', '0')
62
63 with self.db_cnx:
64 self.db_cnx.execute("""
65 create table if not exists posts (
66 uri text primary key,
67 create_ts timestamp,
68 likes int
69 );
70 create table if not exists langs (
71 uri text,
72 lang text,
73 foreign key(uri) references posts(uri) on delete cascade
74 );
75 create index if not exists ts_idx on posts(create_ts);
76 """)
77
78 self.logger = logging.getLogger('feeds.mostliked')
79 self.drafts = ExpiringDict(max_len=50_000, max_age_seconds=5*60)
80
81 self.db_writes = queue.Queue()
82 db_worker = DatabaseWorker('mostliked', 'db/mostliked.db', self.db_writes)
83 db_worker.start()
84
85 def process_commit(self, commit):
86 if commit['opType'] != 'c':
87 return
88
89 if commit['collection'] == 'app.bsky.feed.post':
90 record = commit.get('record')
91 post_uri = f"at://{commit['did']}/app.bsky.feed.post/{commit['rkey']}"
92
93 # to keep the DB in check, instead of adding every post right away
94 # we make note of it but only add to DB once it gets some likes
95 self.drafts[post_uri] = {
96 'ts': self.safe_timestamp(record.get('createdAt')).timestamp(),
97 'langs': record.get('langs', []),
98 'likes': 0,
99 }
100
101 elif commit['collection'] == 'app.bsky.feed.like':
102 record = commit.get('record')
103 try:
104 subject_uri = record['subject']['uri']
105 except KeyError:
106 return
107
108 if subject_uri in self.drafts:
109 record_info = self.drafts.pop(subject_uri).copy()
110 record_info['likes'] += 1
111 if record_info['likes'] < MIN_LIKES:
112 self.drafts[subject_uri] = record_info
113 return
114
115 self.logger.debug(f'graduating {subject_uri}')
116
117 task = (
118 'insert or ignore into posts (uri, create_ts, likes) values (:uri, :ts, :likes)',
119 {'uri': subject_uri, 'ts': record_info['ts'], 'likes': record_info['likes']}
120 )
121 self.db_writes.put(task)
122
123 for lang in record_info['langs']:
124 task = (
125 'insert or ignore into langs (uri, lang) values (:uri, :lang)',
126 {'uri': subject_uri, 'lang': lang}
127 )
128 self.db_writes.put(task)
129
130 subject_exists = self.db_cnx.execute('select 1 from posts where uri = ?', [subject_uri])
131 if subject_exists.fetchone() is None:
132 return
133
134 task = (
135 'update posts set likes = likes + 1 where uri = :uri',
136 {'uri': subject_uri}
137 )
138 self.db_writes.put(task)
139
140 def commit_changes(self):
141 self.db_writes.put((self.DELETE_OLD_POSTS_QUERY, {}))
142 self.db_writes.put('COMMIT')
143 self.logger.debug(f'there are {len(self.drafts)} drafts')
144
145 def generate_sql(self, limit, offset, langs):
146 bindings = []
147 sql = """
148 select posts.uri, create_ts, create_ts - unixepoch('now', '-24 hours') as ttl, likes, lang
149 from posts
150 left join langs on posts.uri = langs.uri
151 where
152 """
153 if not '*' in langs:
154 lang_values = list(langs.values())
155 bindings.extend(lang_values)
156 sql += " OR ".join(['lang = ?'] * len(lang_values))
157 else:
158 sql += " 1=1 "
159 sql += """
160 order by likes desc, create_ts desc
161 limit ? offset ?
162 """
163 bindings.extend([limit, offset])
164 return sql, bindings
165
166 def serve_feed(self, limit, offset, langs):
167 sql, bindings = self.generate_sql(limit, offset, langs)
168 cur = self.db_cnx.execute(sql, bindings)
169 return [row[0] for row in cur]
170
171 def serve_feed_debug(self, limit, offset, langs):
172 sql, bindings = self.generate_sql(limit, offset, langs)
173 return apsw.ext.format_query_table(
174 self.db_cnx, sql, bindings,
175 string_sanitize=2, text_width=9999, use_unicode=True
176 )