this repo has no description
1import logging
2
3import apsw
4import apsw.ext
5import grapheme
6
7from . import BaseFeed
8
9MAX_TEXT_LENGTH = 140
10
11class RapidFireFeed(BaseFeed):
12 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire'
13
14 def __init__(self):
15 self.db_cnx = apsw.Connection('db/rapidfire.db')
16 self.db_cnx.pragma('journal_mode', 'WAL')
17 self.db_cnx.pragma('wal_autocheckpoint', '0')
18
19 with self.db_cnx:
20 self.db_cnx.execute("""
21 create table if not exists posts (uri text, create_ts timestamp, lang text);
22 create index if not exists create_ts_idx on posts(create_ts);
23 """)
24
25 self.logger = logging.getLogger('feeds.rapidfire')
26
27 def process_commit(self, commit):
28 if commit['opType'] != 'c':
29 return
30
31 if commit['collection'] != 'app.bsky.feed.post':
32 return
33
34 record = commit.get('record')
35 if record is None:
36 return
37
38 if all([
39 grapheme.length(record.get('text', '')) <= MAX_TEXT_LENGTH,
40 record.get('reply') is None,
41 record.get('embed') is None,
42 record.get('facets') is None
43 ]):
44 repo = commit['did']
45 rkey = commit['rkey']
46 post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}'
47 ts = self.safe_timestamp(record.get('createdAt')).timestamp()
48
49 self.transaction_begin(self.db_cnx)
50
51 langs = record.get('langs') or ['']
52 for lang in langs:
53 self.db_cnx.execute(
54 'insert into posts (uri, create_ts, lang) values (:uri, :ts, :lang)',
55 dict(uri=post_uri, ts=ts, lang=lang)
56 )
57
58 def delete_old_posts(self):
59 self.db_cnx.execute(
60 "delete from posts where create_ts < unixepoch('now', '-15 minutes')"
61 )
62 self.logger.debug('deleted {} old posts'.format(self.db_cnx.changes()))
63
64 def commit_changes(self):
65 self.delete_old_posts()
66 self.logger.debug('committing changes')
67 self.transaction_commit(self.db_cnx)
68 self.wal_checkpoint(self.db_cnx, 'RESTART')
69
70 def serve_feed(self, limit, offset, langs):
71 if '*' in langs:
72 cur = self.db_cnx.execute(
73 "select uri from posts order by create_ts desc limit :limit offset :offset",
74 dict(limit=limit, offset=offset)
75 )
76 return [uri for (uri,) in cur]
77 else:
78 lang_values = list(langs.values())
79 lang_selects = ['select uri, create_ts from posts where lang = ?'] * len(lang_values)
80 lang_clause = ' union '.join(lang_selects)
81 cur = self.db_cnx.execute(
82 lang_clause + ' order by create_ts desc limit ? offset ?',
83 [*lang_values, limit, offset]
84 )
85 return [uri for (uri, create_ts) in cur]
86
87 def serve_feed_debug(self, limit, offset, langs):
88 query = """
89 select *, unixepoch('now') - create_ts as age_seconds
90 from posts
91 order by create_ts desc
92 limit :limit offset :offset
93 """
94 bindings = dict(limit=limit, offset=offset)
95 return apsw.ext.format_query_table(
96 self.db_cnx, query, bindings,
97 string_sanitize=2, text_width=9999, use_unicode=True
98 )