this repo has no description
1import logging
2
3import apsw
4import apsw.ext
5
6from . import BaseFeed
7
8class PopularFeed(BaseFeed):
9 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/popular'
10
11 def __init__(self):
12 self.db_cnx = apsw.Connection('db/popular.db')
13 self.db_cnx.pragma('journal_mode', 'WAL')
14 self.db_cnx.pragma('wal_autocheckpoint', '0')
15
16 with self.db_cnx:
17 self.db_cnx.execute("""
18 create table if not exists posts (uri text, create_ts timestamp, update_ts timestamp, temperature int);
19 create unique index if not exists uri_idx on posts(uri);
20 """)
21
22 self.logger = logging.getLogger('feeds.popular')
23
24 def process_commit(self, commit):
25 if commit['opType'] != 'c':
26 return
27
28 if commit['collection'] != 'app.bsky.feed.like':
29 return
30
31 record = commit.get('record')
32 if record is None:
33 return
34
35 ts = self.safe_timestamp(record.get('createdAt')).timestamp()
36 like_subject_uri = record['subject']['uri']
37
38 self.transaction_begin(self.db_cnx)
39
40 self.db_cnx.execute("""
41 insert into posts (uri, create_ts, update_ts, temperature)
42 values (:uri, :ts, :ts, 1)
43 on conflict (uri) do update set temperature = temperature + 1, update_ts = :ts
44 """, dict(uri=like_subject_uri, ts=ts))
45
46 def delete_old_posts(self):
47 self.db_cnx.execute("""
48 delete from posts
49 where
50 temperature * exp( -1 * ( ( unixepoch('now') - create_ts ) / 1800.0 ) ) < 1.0
51 and create_ts < unixepoch('now', '-15 minutes')
52 """)
53
54 def commit_changes(self):
55 self.logger.debug('committing changes')
56 self.delete_old_posts()
57 self.transaction_commit(self.db_cnx)
58 self.wal_checkpoint(self.db_cnx, 'RESTART')
59
60 def serve_feed(self, limit, offset, langs):
61 cur = self.db_cnx.execute("""
62 select uri from posts
63 order by temperature * exp( -1 * ( ( unixepoch('now') - create_ts ) / 1800.0 ) )
64 desc limit :limit offset :offset
65 """, dict(limit=limit, offset=offset))
66 return [uri for (uri,) in cur]
67
68 def serve_feed_debug(self, limit, offset, langs):
69 query = """
70 select
71 uri, temperature,
72 unixepoch('now') - create_ts as age_seconds,
73 exp(
74 -1 * ( ( unixepoch('now') - create_ts ) / 1800.0 )
75 ) as decay,
76 temperature * exp(
77 -1 * ( ( unixepoch('now') - create_ts ) / 1800.0 )
78 ) as score
79 from posts
80 order by score desc
81 limit :limit offset :offset
82 """
83 bindings = dict(limit=limit, offset=offset)
84 return apsw.ext.format_query_table(
85 self.db_cnx, query, bindings,
86 string_sanitize=2, text_width=9999, use_unicode=True
87 )