this repo has no description
1import logging
2import os
3
4import apsw
5import apsw.ext
6
7from . import BaseFeed
8
9MAX_TEXT_LENGTH = 140
10
11class RapidFireFeed(BaseFeed):
12 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire'
13
14 def __init__(self):
15 db_fname = ''
16 if os.path.isdir('/dev/shm/'):
17 os.makedirs('/dev/shm/feedgens/', exist_ok=True)
18 db_fname = '/dev/shm/feedgens/rapidfire.db'
19 else:
20 db_fname = 'db/rapidfire.db'
21
22 self.db_cnx = apsw.Connection(db_fname)
23 self.db_cnx.pragma('journal_mode', 'WAL')
24 self.db_cnx.pragma('synchronous', 'OFF')
25 self.db_cnx.pragma('wal_autocheckpoint', '0')
26
27 with self.db_cnx:
28 self.db_cnx.execute("""
29 create table if not exists posts (uri text, create_ts timestamp, lang text);
30 create index if not exists create_ts_idx on posts(create_ts);
31 """)
32
33 self.logger = logging.getLogger('feeds.rapidfire')
34
35 def process_commit(self, commit):
36 op = commit['op']
37 if op['action'] != 'create':
38 return
39
40 collection, _ = op['path'].split('/')
41 if collection != 'app.bsky.feed.post':
42 return
43
44 record = op['record']
45
46 if all([
47 len(record['text']) <= MAX_TEXT_LENGTH,
48 record.get('reply') is None,
49 record.get('embed') is None,
50 record.get('facets') is None
51 ]):
52 repo = commit['repo']
53 path = op['path']
54 post_uri = f'at://{repo}/{path}'
55 ts = self.safe_timestamp(record['createdAt']).timestamp()
56
57 langs = record.get('langs') or ['']
58 for lang in langs:
59 self.db_cnx.execute(
60 'insert into posts (uri, create_ts, lang) values (:uri, :ts, :lang)',
61 dict(uri=post_uri, ts=ts, lang=lang)
62 )
63
64 def run_tasks_minute(self):
65 self.logger.debug('running minute tasks')
66
67 with self.db_cnx:
68 self.db_cnx.execute(
69 "delete from posts where create_ts < unixepoch('now', '-15 minutes')"
70 )
71
72 self.db_cnx.pragma('wal_checkpoint(TRUNCATE)')
73
74 def serve_feed(self, limit, offset, langs):
75 if '*' in langs:
76 cur = self.db_cnx.execute(
77 "select uri from posts order by create_ts desc limit :limit offset :offset",
78 dict(limit=limit, offset=offset)
79 )
80 return [uri for (uri,) in cur]
81 else:
82 lang_values = list(langs.values())
83 lang_selects = ['select uri, create_ts from posts where lang = ?'] * len(lang_values)
84 lang_clause = ' union '.join(lang_selects)
85 cur = self.db_cnx.execute(
86 lang_clause + ' order by create_ts desc limit ? offset ?',
87 [*lang_values, limit, offset]
88 )
89 return [uri for (uri, create_ts) in cur]
90
91 def serve_feed_debug(self, limit, offset, langs):
92 query = (
93 "select *, unixepoch('now') from posts order by create_ts desc limit :limit offset :offset;"
94 )
95 bindings = dict(limit=limit, offset=offset)
96 return apsw.ext.format_query_table(
97 self.db_cnx, query, bindings,
98 string_sanitize=2, text_width=9999, use_unicode=True
99 )