this repo has no description
1import os
2import sys
3import sqlite3
4
5from . import BaseFeed
6
7MAX_TEXT_LENGTH = 140
8
9class RapidFireFeed(BaseFeed):
10 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire'
11
12 def __init__(self):
13 if os.path.isdir('/dev/shm/'):
14 os.makedirs('/dev/shm/feedgens/', exist_ok=True)
15 self.db_cnx = sqlite3.connect('/dev/shm/feedgens/rapidfire.db')
16 else:
17 self.db_cnx = sqlite3.connect('db/rapidfire.db')
18
19 with self.db_cnx:
20 self.db_cnx.executescript(
21 "pragma journal_mode = WAL;"
22 "pragma synchronous = OFF;"
23 "pragma wal_autocheckpoint = 0;"
24 "create table if not exists posts (uri text, create_ts timestamp, lang text);"
25 "create index if not exists create_ts_idx on posts(create_ts);"
26 )
27
28 self.checkpoint = 0
29
30 def process_commit(self, commit):
31 op = commit['op']
32 if op['action'] != 'create':
33 return
34
35 collection, _ = op['path'].split('/')
36 if collection != 'app.bsky.feed.post':
37 return
38
39 record = op['record']
40
41 if all([
42 len(record['text']) <= MAX_TEXT_LENGTH,
43 record.get('reply') is None,
44 record.get('embed') is None,
45 record.get('facets') is None
46 ]):
47 repo = commit['repo']
48 path = op['path']
49 post_uri = f'at://{repo}/{path}'
50 ts = record['createdAt']
51 self.checkpoint += 1
52
53 with self.db_cnx:
54 langs = record.get('langs') or ['']
55 for lang in langs:
56 self.db_cnx.execute(
57 'insert into posts (uri, create_ts, lang) values (:uri, :ts, :lang)',
58 dict(uri=post_uri, ts=ts, lang=lang)
59 )
60
61 self.db_cnx.execute(
62 "delete from posts where strftime('%s', create_ts) < strftime('%s', 'now', '-15 minutes')"
63 )
64
65 if self.checkpoint % 100 == 0:
66 sys.stdout.write('rapidfire: checkpoint\n')
67 sys.stdout.flush()
68 self.db_cnx.execute("pragma wal_checkpoint(TRUNCATE)")
69
70 def serve_feed(self, limit, offset, langs):
71 if '*' in langs:
72 cur = self.db_cnx.execute(
73 "select uri from posts order by create_ts desc limit :limit offset :offset",
74 dict(limit=limit, offset=offset)
75 )
76 return [uri for (uri,) in cur]
77 else:
78 lang_values = list(langs.values())
79 lang_selects = ['select uri, create_ts from posts where lang = ?'] * len(lang_values)
80 lang_clause = ' union '.join(lang_selects)
81 cur = self.db_cnx.execute(
82 lang_clause + ' order by create_ts desc limit ? offset ?',
83 [*lang_values, limit, offset]
84 )
85 return [uri for (uri, create_ts) in cur]