this repo has no description
1import logging
2
3import apsw
4import apsw.ext
5import grapheme
6
7from . import BaseFeed
8
9class BattleFeed(BaseFeed):
10 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/battle'
11
12 def __init__(self):
13 self.db_cnx = apsw.Connection('db/battle.db')
14 self.db_cnx.pragma('journal_mode', 'WAL')
15 self.db_cnx.pragma('wal_autocheckpoint', '0')
16
17 with self.db_cnx:
18 self.db_cnx.execute("""
19 create table if not exists posts (
20 uri text,
21 grapheme_length integer,
22 create_ts timestamp,
23 lang text
24 );
25 create unique index if not exists ll_idx on posts(grapheme_length, lang);
26 """)
27
28 self.logger = logging.getLogger('feeds.battle')
29
30 def process_commit(self, commit):
31 op = commit['op']
32 if op['action'] != 'create':
33 return
34
35 collection, _ = op['path'].split('/')
36 if collection != 'app.bsky.feed.post':
37 return
38
39 record = op.get('record')
40 if record is None:
41 return
42
43 repo = commit['repo']
44 path = op['path']
45 post_uri = f'at://{repo}/{path}'
46 length = grapheme.length(record.get('text', ''))
47 ts = self.safe_timestamp(record.get('createdAt')).timestamp()
48
49 self.transaction_begin(self.db_cnx)
50
51 langs = record.get('langs') or ['']
52 for lang in langs:
53 self.db_cnx.execute("""
54 insert into posts(uri, grapheme_length, create_ts, lang)
55 values(:uri, :length, :ts, :lang)
56 on conflict do update set uri = :uri, create_ts = :ts
57 """, dict(uri=post_uri, length=length, ts=ts, lang=lang))
58
59 def commit_changes(self):
60 self.logger.debug('committing changes')
61 self.transaction_commit(self.db_cnx)
62 self.wal_checkpoint(self.db_cnx, 'RESTART')
63
64 def serve_feed(self, limit, offset, langs):
65 if '*' in langs:
66 cur = self.db_cnx.execute("""
67 select uri
68 from posts
69 order by grapheme_length asc
70 limit :limit offset :offset
71 """, dict(limit=limit, offset=offset))
72 return [uri for (uri,) in cur]
73 else:
74 lang_values = list(langs.values())
75 lang_selects = ['select uri, grapheme_length from posts where lang = ?'] * len(lang_values)
76 lang_clause = ' union '.join(lang_selects)
77 cur = self.db_cnx.execute(
78 lang_clause + ' order by grapheme_length asc limit ? offset ?',
79 [*lang_values, limit, offset]
80 )
81 return [uri for (uri, grapheme_length) in cur]
82
83 def serve_feed_debug(self, limit, offset, langs):
84 if '*' in langs:
85 query = """
86 select *, unixepoch('now') - create_ts as age_seconds
87 from posts
88 order by grapheme_length asc
89 limit :limit offset :offset
90 """
91 bindings = [limit, offset]
92 else:
93 lang_values = list(langs.values())
94 lang_selects = ["select *, unixepoch('now') - create_ts as age_seconds from posts where lang = ?"] * len(lang_values)
95 lang_clause = ' union '.join(lang_selects)
96 query = lang_clause + ' order by grapheme_length asc limit ? offset ?'
97 bindings = [*lang_values, limit, offset]
98
99 return apsw.ext.format_query_table(
100 self.db_cnx, query, bindings,
101 string_sanitize=2, text_width=9999, use_unicode=True
102 )