this repo has no description
1import logging
2
3import apsw
4import apsw.ext
5import grapheme
6
7from . import BaseFeed
8
9class BattleFeed(BaseFeed):
10 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/battle'
11
12 def __init__(self):
13 self.db_cnx = apsw.Connection('db/battle.db')
14 self.db_cnx.pragma('journal_mode', 'WAL')
15 self.db_cnx.pragma('wal_autocheckpoint', '0')
16
17 with self.db_cnx:
18 self.db_cnx.execute("""
19 create table if not exists posts (
20 uri text,
21 grapheme_length integer,
22 create_ts timestamp,
23 lang text
24 );
25 create unique index if not exists ll_idx on posts(grapheme_length, lang);
26 """)
27
28 self.logger = logging.getLogger('feeds.battle')
29
30 def process_commit(self, commit):
31 if commit['opType'] != 'c':
32 return
33
34 if commit['collection'] != 'app.bsky.feed.post':
35 return
36
37 record = commit.get('record')
38 if record is None:
39 return
40
41 repo = commit['did']
42 rkey = commit['rkey']
43 post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}'
44 length = grapheme.length(record.get('text', ''))
45 ts = self.safe_timestamp(record.get('createdAt')).timestamp()
46
47 self.transaction_begin(self.db_cnx)
48
49 langs = record.get('langs') or ['']
50 for lang in langs:
51 self.db_cnx.execute("""
52 insert into posts(uri, grapheme_length, create_ts, lang)
53 values(:uri, :length, :ts, :lang)
54 on conflict do update set uri = :uri, create_ts = :ts
55 """, dict(uri=post_uri, length=length, ts=ts, lang=lang))
56
57 def commit_changes(self):
58 self.logger.debug('committing changes')
59 self.transaction_commit(self.db_cnx)
60 self.wal_checkpoint(self.db_cnx, 'RESTART')
61
62 def serve_feed(self, limit, offset, langs):
63 if '*' in langs:
64 cur = self.db_cnx.execute("""
65 select uri
66 from posts
67 order by grapheme_length asc
68 limit :limit offset :offset
69 """, dict(limit=limit, offset=offset))
70 return [uri for (uri,) in cur]
71 else:
72 lang_values = list(langs.values())
73 lang_selects = ['select uri, grapheme_length from posts where lang = ?'] * len(lang_values)
74 lang_clause = ' union '.join(lang_selects)
75 cur = self.db_cnx.execute(
76 lang_clause + ' order by grapheme_length asc limit ? offset ?',
77 [*lang_values, limit, offset]
78 )
79 return [uri for (uri, grapheme_length) in cur]
80
81 def serve_feed_debug(self, limit, offset, langs):
82 if '*' in langs:
83 query = """
84 select *, unixepoch('now') - create_ts as age_seconds
85 from posts
86 order by grapheme_length asc
87 limit :limit offset :offset
88 """
89 bindings = [limit, offset]
90 else:
91 lang_values = list(langs.values())
92 lang_selects = ["select *, unixepoch('now') - create_ts as age_seconds from posts where lang = ?"] * len(lang_values)
93 lang_clause = ' union '.join(lang_selects)
94 query = lang_clause + ' order by grapheme_length asc limit ? offset ?'
95 bindings = [*lang_values, limit, offset]
96
97 return apsw.ext.format_query_table(
98 self.db_cnx, query, bindings,
99 string_sanitize=2, text_width=9999, use_unicode=True
100 )