this repo has no description
1import os
2import sys
3import apsw
4
5from . import BaseFeed
6
7MAX_TEXT_LENGTH = 140
8
9class RapidFireFeed(BaseFeed):
10 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire'
11
12 def __init__(self):
13 db_fname = ''
14 if os.path.isdir('/dev/shm/'):
15 os.makedirs('/dev/shm/feedgens/', exist_ok=True)
16 db_fname = '/dev/shm/feedgens/rapidfire.db'
17 else:
18 db_fname = 'db/rapidfire.db'
19
20 self.db_cnx = apsw.Connection(db_fname)
21 self.db_cnx.pragma('journal_mode', 'WAL')
22 self.db_cnx.pragma('synchronous', 'OFF')
23 self.db_cnx.pragma('wal_autocheckpoint', '0')
24
25 with self.db_cnx:
26 self.db_cnx.execute("""
27 create table if not exists posts (uri text, create_ts timestamp, lang text);
28 create index if not exists create_ts_idx on posts(create_ts);
29 """)
30
31 def process_commit(self, commit):
32 op = commit['op']
33 if op['action'] != 'create':
34 return
35
36 collection, _ = op['path'].split('/')
37 if collection != 'app.bsky.feed.post':
38 return
39
40 record = op['record']
41
42 if all([
43 len(record['text']) <= MAX_TEXT_LENGTH,
44 record.get('reply') is None,
45 record.get('embed') is None,
46 record.get('facets') is None
47 ]):
48 repo = commit['repo']
49 path = op['path']
50 post_uri = f'at://{repo}/{path}'
51 ts = record['createdAt']
52
53 with self.db_cnx:
54 langs = record.get('langs') or ['']
55 for lang in langs:
56 self.db_cnx.execute(
57 'insert into posts (uri, create_ts, lang) values (:uri, :ts, :lang)',
58 dict(uri=post_uri, ts=ts, lang=lang)
59 )
60
61 def run_tasks_minute(self):
62 sys.stdout.write('rapidfire: running minute tasks\n')
63 sys.stdout.flush()
64
65 with self.db_cnx:
66 self.db_cnx.execute(
67 "delete from posts where strftime('%s', create_ts) < strftime('%s', 'now', '-15 minutes')"
68 )
69
70 self.db_cnx.pragma('wal_checkpoint(TRUNCATE)')
71
72 def serve_feed(self, limit, offset, langs):
73 if '*' in langs:
74 cur = self.db_cnx.execute(
75 "select uri from posts order by create_ts desc limit :limit offset :offset",
76 dict(limit=limit, offset=offset)
77 )
78 return [uri for (uri,) in cur]
79 else:
80 lang_values = list(langs.values())
81 lang_selects = ['select uri, create_ts from posts where lang = ?'] * len(lang_values)
82 lang_clause = ' union '.join(lang_selects)
83 cur = self.db_cnx.execute(
84 lang_clause + ' order by create_ts desc limit ? offset ?',
85 [*lang_values, limit, offset]
86 )
87 return [uri for (uri, create_ts) in cur]