this repo has no description
1import logging
2
3import apsw
4import apsw.ext
5import threading
6import queue
7
8from . import BaseFeed
9
10class DatabaseWorker(threading.Thread):
11 def __init__(self, name, db_path, task_queue):
12 super().__init__()
13 self.db_cnx = apsw.Connection(db_path)
14 self.db_cnx.pragma('foreign_keys', True)
15 self.db_cnx.pragma('journal_mode', 'WAL')
16 self.db_cnx.pragma('wal_autocheckpoint', '0')
17 self.stop_signal = False
18 self.task_queue = task_queue
19 self.logger = logging.getLogger(f'feeds.db.{name}')
20 self.changes = 0
21
22 def run(self):
23 while not self.stop_signal:
24 task = self.task_queue.get(block=True)
25 if task == 'STOP':
26 self.stop_signal = True
27 elif task == 'COMMIT':
28 self.logger.debug(f'committing {self.changes} changes')
29 if self.db_cnx.in_transaction:
30 self.db_cnx.execute('COMMIT')
31 checkpoint = self.db_cnx.execute('PRAGMA wal_checkpoint(PASSIVE)')
32 self.logger.debug(f'checkpoint: {checkpoint.fetchall()!r}')
33 self.changes = 0
34 self.logger.debug(f'qsize: {self.task_queue.qsize()}')
35 else:
36 sql, bindings = task
37 if not self.db_cnx.in_transaction:
38 self.db_cnx.execute('BEGIN')
39 self.db_cnx.execute(sql, bindings)
40 self.changes += self.db_cnx.changes()
41 self.task_queue.task_done()
42 self.db_cnx.close()
43
44 def stop(self):
45 self.task_queue.put('STOP')
46
47class MostLikedFeed(BaseFeed):
48 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/most-liked'
49 DELETE_OLD_POSTS_QUERY = """
50 delete from posts where (
51 create_ts < unixepoch('now', '-15 minutes') and likes < 2
52 ) or create_ts < unixepoch('now', '-24 hours');
53 """
54
55 def __init__(self):
56 self.db_cnx = apsw.Connection('db/mostliked.db')
57 self.db_cnx.pragma('foreign_keys', True)
58 self.db_cnx.pragma('journal_mode', 'WAL')
59 self.db_cnx.pragma('wal_autocheckpoint', '0')
60
61 with self.db_cnx:
62 self.db_cnx.execute("""
63 create table if not exists posts (
64 uri text primary key,
65 create_ts timestamp,
66 likes int
67 );
68 create table if not exists langs (
69 uri text,
70 lang text,
71 foreign key(uri) references posts(uri) on delete cascade
72 );
73 create index if not exists ts_idx on posts(create_ts);
74 """)
75
76 self.logger = logging.getLogger('feeds.mostliked')
77
78 self.db_writes = queue.Queue()
79 db_worker = DatabaseWorker('mostliked', 'db/mostliked.db', self.db_writes)
80 db_worker.start()
81
82 def process_commit(self, commit):
83 if commit['opType'] != 'c':
84 return
85
86 if commit['collection'] == 'app.bsky.feed.post':
87 record = commit.get('record')
88 post_uri = f"at://{commit['did']}/app.bsky.feed.post/{commit['rkey']}"
89 task = (
90 'insert or ignore into posts (uri, create_ts, likes) values (:uri, :ts, 0)',
91 {'uri': post_uri, 'ts': self.safe_timestamp(record.get('createdAt')).timestamp()}
92 )
93 self.db_writes.put(task)
94
95 langs = record.get('langs', [])
96 for lang in langs:
97 task = (
98 'insert or ignore into langs (uri, lang) values (:uri, :lang)',
99 {'uri': post_uri, 'lang': lang}
100 )
101 self.db_writes.put(task)
102
103 elif commit['collection'] == 'app.bsky.feed.like':
104 record = commit.get('record')
105 try:
106 subject_uri = record['subject']['uri']
107 except KeyError:
108 return
109
110 task = (
111 'update posts set likes = likes + 1 where uri = :uri',
112 {'uri': subject_uri}
113 )
114 self.db_writes.put(task)
115
116 def commit_changes(self):
117 self.db_writes.put((self.DELETE_OLD_POSTS_QUERY, {}))
118 self.db_writes.put('COMMIT')
119
120 def generate_sql(self, limit, offset, langs):
121 bindings = []
122 sql = """
123 select posts.uri, create_ts, create_ts - unixepoch('now', '-15 minutes') as rem, likes, lang
124 from posts
125 left join langs on posts.uri = langs.uri
126 where
127 """
128 if not '*' in langs:
129 lang_values = list(langs.values())
130 bindings.extend(lang_values)
131 sql += " OR ".join(['lang = ?'] * len(lang_values))
132 else:
133 sql += " 1=1 "
134 sql += """
135 order by likes desc, create_ts desc
136 limit ? offset ?
137 """
138 bindings.extend([limit, offset])
139 return sql, bindings
140
141 def serve_feed(self, limit, offset, langs):
142 sql, bindings = self.generate_sql(limit, offset, langs)
143 cur = self.db_cnx.execute(sql, bindings)
144 return [row[0] for row in cur]
145
146 def serve_feed_debug(self, limit, offset, langs):
147 sql, bindings = self.generate_sql(limit, offset, langs)
148 return apsw.ext.format_query_table(
149 self.db_cnx, sql, bindings,
150 string_sanitize=2, text_width=9999, use_unicode=True
151 )