this repo has no description
1import logging 2 3import apsw 4import apsw.ext 5import grapheme 6 7from . import BaseFeed 8 9class BattleFeed(BaseFeed): 10 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/battle' 11 12 def __init__(self): 13 self.db_cnx = apsw.Connection('db/battle.db') 14 self.db_cnx.pragma('journal_mode', 'WAL') 15 self.db_cnx.pragma('wal_autocheckpoint', '0') 16 17 with self.db_cnx: 18 self.db_cnx.execute(""" 19 create table if not exists posts ( 20 uri text, 21 grapheme_length integer, 22 create_ts timestamp, 23 lang text 24 ); 25 create unique index if not exists ll_idx on posts(grapheme_length, lang); 26 """) 27 28 self.logger = logging.getLogger('feeds.battle') 29 30 def process_commit(self, commit): 31 if commit['opType'] != 'c': 32 return 33 34 if commit['collection'] != 'app.bsky.feed.post': 35 return 36 37 record = commit.get('record') 38 if record is None: 39 return 40 41 repo = commit['did'] 42 rkey = commit['rkey'] 43 post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}' 44 length = grapheme.length(record.get('text', '')) 45 ts = self.safe_timestamp(record.get('createdAt')).timestamp() 46 47 self.transaction_begin(self.db_cnx) 48 49 langs = record.get('langs') or [''] 50 for lang in langs: 51 self.db_cnx.execute(""" 52 insert into posts(uri, grapheme_length, create_ts, lang) 53 values(:uri, :length, :ts, :lang) 54 on conflict do update set uri = :uri, create_ts = :ts 55 """, dict(uri=post_uri, length=length, ts=ts, lang=lang)) 56 57 def commit_changes(self): 58 self.logger.debug('committing changes') 59 self.transaction_commit(self.db_cnx) 60 self.wal_checkpoint(self.db_cnx, 'RESTART') 61 62 def serve_feed(self, limit, offset, langs): 63 if '*' in langs: 64 cur = self.db_cnx.execute(""" 65 select uri 66 from posts 67 order by grapheme_length asc 68 limit :limit offset :offset 69 """, dict(limit=limit, offset=offset)) 70 return [uri for (uri,) in cur] 71 else: 72 lang_values = list(langs.values()) 73 lang_selects = ['select uri, grapheme_length from posts where lang = ?'] * len(lang_values) 74 lang_clause = ' union '.join(lang_selects) 75 cur = self.db_cnx.execute( 76 lang_clause + ' order by grapheme_length asc limit ? offset ?', 77 [*lang_values, limit, offset] 78 ) 79 return [uri for (uri, grapheme_length) in cur] 80 81 def serve_feed_debug(self, limit, offset, langs): 82 if '*' in langs: 83 query = """ 84 select *, unixepoch('now') - create_ts as age_seconds 85 from posts 86 order by grapheme_length asc 87 limit :limit offset :offset 88 """ 89 bindings = [limit, offset] 90 else: 91 lang_values = list(langs.values()) 92 lang_selects = ["select *, unixepoch('now') - create_ts as age_seconds from posts where lang = ?"] * len(lang_values) 93 lang_clause = ' union '.join(lang_selects) 94 query = lang_clause + ' order by grapheme_length asc limit ? offset ?' 95 bindings = [*lang_values, limit, offset] 96 97 return apsw.ext.format_query_table( 98 self.db_cnx, query, bindings, 99 string_sanitize=2, text_width=9999, use_unicode=True 100 )