this repo has no description
1import logging
2
3import apsw
4import apsw.ext
5import grapheme
6
7from . import BaseFeed
8
9MAX_TEXT_LENGTH = 140
10
11class RapidFireFeed(BaseFeed):
12 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire'
13
14 def __init__(self):
15 self.db_cnx = apsw.Connection('db/rapidfire.db')
16 self.db_cnx.pragma('journal_mode', 'WAL')
17 self.db_cnx.pragma('wal_autocheckpoint', '0')
18
19 with self.db_cnx:
20 self.db_cnx.execute("""
21 create table if not exists posts (uri text, create_ts timestamp, lang text);
22 create index if not exists create_ts_idx on posts(create_ts);
23 """)
24
25 self.logger = logging.getLogger('feeds.rapidfire')
26
27 def process_commit(self, commit):
28 op = commit['op']
29 if op['action'] != 'create':
30 return
31
32 collection, _ = op['path'].split('/')
33 if collection != 'app.bsky.feed.post':
34 return
35
36 record = op.get('record')
37 if record is None:
38 return
39
40 if all([
41 grapheme.length(record.get('text', '')) <= MAX_TEXT_LENGTH,
42 record.get('reply') is None,
43 record.get('embed') is None,
44 record.get('facets') is None
45 ]):
46 repo = commit['repo']
47 path = op['path']
48 post_uri = f'at://{repo}/{path}'
49 ts = self.safe_timestamp(record.get('createdAt')).timestamp()
50
51 self.transaction_begin(self.db_cnx)
52
53 langs = record.get('langs') or ['']
54 for lang in langs:
55 self.db_cnx.execute(
56 'insert into posts (uri, create_ts, lang) values (:uri, :ts, :lang)',
57 dict(uri=post_uri, ts=ts, lang=lang)
58 )
59
60 def delete_old_posts(self):
61 self.db_cnx.execute(
62 "delete from posts where create_ts < unixepoch('now', '-15 minutes')"
63 )
64
65 def commit_changes(self):
66 self.logger.debug('committing changes')
67 self.delete_old_posts()
68 self.transaction_commit(self.db_cnx)
69 self.wal_checkpoint(self.db_cnx, 'RESTART')
70
71 def serve_feed(self, limit, offset, langs):
72 if '*' in langs:
73 cur = self.db_cnx.execute(
74 "select uri from posts order by create_ts desc limit :limit offset :offset",
75 dict(limit=limit, offset=offset)
76 )
77 return [uri for (uri,) in cur]
78 else:
79 lang_values = list(langs.values())
80 lang_selects = ['select uri, create_ts from posts where lang = ?'] * len(lang_values)
81 lang_clause = ' union '.join(lang_selects)
82 cur = self.db_cnx.execute(
83 lang_clause + ' order by create_ts desc limit ? offset ?',
84 [*lang_values, limit, offset]
85 )
86 return [uri for (uri, create_ts) in cur]
87
88 def serve_feed_debug(self, limit, offset, langs):
89 query = """
90 select *, unixepoch('now') - create_ts as age_seconds
91 from posts
92 order by create_ts desc
93 limit :limit offset :offset
94 """
95 bindings = dict(limit=limit, offset=offset)
96 return apsw.ext.format_query_table(
97 self.db_cnx, query, bindings,
98 string_sanitize=2, text_width=9999, use_unicode=True
99 )