this repo has no description
1import logging 2 3import apsw 4import apsw.ext 5import grapheme 6 7from . import BaseFeed 8 9class BattleFeed(BaseFeed): 10 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/battle' 11 12 def __init__(self): 13 self.db_cnx = apsw.Connection('db/battle.db') 14 self.db_cnx.pragma('journal_mode', 'WAL') 15 self.db_cnx.pragma('wal_autocheckpoint', '0') 16 17 with self.db_cnx: 18 self.db_cnx.execute(""" 19 create table if not exists posts ( 20 uri text, 21 grapheme_length integer, 22 create_ts timestamp, 23 lang text 24 ); 25 create unique index if not exists ll_idx on posts(grapheme_length, lang); 26 """) 27 28 self.logger = logging.getLogger('feeds.battle') 29 30 def process_commit(self, commit): 31 op = commit['op'] 32 if op['action'] != 'create': 33 return 34 35 collection, _ = op['path'].split('/') 36 if collection != 'app.bsky.feed.post': 37 return 38 39 record = op.get('record') 40 if record is None: 41 return 42 43 repo = commit['repo'] 44 path = op['path'] 45 post_uri = f'at://{repo}/{path}' 46 length = grapheme.length(record.get('text', '')) 47 ts = self.safe_timestamp(record.get('createdAt')).timestamp() 48 49 self.transaction_begin(self.db_cnx) 50 51 langs = record.get('langs') or [''] 52 for lang in langs: 53 self.db_cnx.execute(""" 54 insert into posts(uri, grapheme_length, create_ts, lang) 55 values(:uri, :length, :ts, :lang) 56 on conflict do update set uri = :uri, create_ts = :ts 57 """, dict(uri=post_uri, length=length, ts=ts, lang=lang)) 58 59 def commit_changes(self): 60 self.logger.debug('committing changes') 61 self.transaction_commit(self.db_cnx) 62 self.wal_checkpoint(self.db_cnx, 'RESTART') 63 64 def serve_feed(self, limit, offset, langs): 65 if '*' in langs: 66 cur = self.db_cnx.execute(""" 67 select uri 68 from posts 69 order by grapheme_length asc 70 limit :limit offset :offset 71 """, dict(limit=limit, offset=offset)) 72 return [uri for (uri,) in cur] 73 else: 74 lang_values = list(langs.values()) 75 lang_selects = ['select uri, grapheme_length from posts where lang = ?'] * len(lang_values) 76 lang_clause = ' union '.join(lang_selects) 77 cur = self.db_cnx.execute( 78 lang_clause + ' order by grapheme_length asc limit ? offset ?', 79 [*lang_values, limit, offset] 80 ) 81 return [uri for (uri, grapheme_length) in cur] 82 83 def serve_feed_debug(self, limit, offset, langs): 84 if '*' in langs: 85 query = """ 86 select *, unixepoch('now') - create_ts as age_seconds 87 from posts 88 order by grapheme_length asc 89 limit :limit offset :offset 90 """ 91 bindings = [limit, offset] 92 else: 93 lang_values = list(langs.values()) 94 lang_selects = ["select *, unixepoch('now') - create_ts as age_seconds from posts where lang = ?"] * len(lang_values) 95 lang_clause = ' union '.join(lang_selects) 96 query = lang_clause + ' order by grapheme_length asc limit ? offset ?' 97 bindings = [*lang_values, limit, offset] 98 99 return apsw.ext.format_query_table( 100 self.db_cnx, query, bindings, 101 string_sanitize=2, text_width=9999, use_unicode=True 102 )