this repo has no description
1import logging 2 3import apsw 4import apsw.ext 5 6from . import BaseFeed 7 8class OutlineTagsFeed(BaseFeed): 9 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/outline' 10 SERVE_FEED_QUERY = """ 11 select uri, create_ts 12 from posts 13 order by create_ts desc 14 limit :limit offset :offset 15 """ 16 17 def __init__(self): 18 self.db_cnx = apsw.Connection('db/outlinetags.db') 19 self.db_cnx.pragma('journal_mode', 'WAL') 20 self.db_cnx.pragma('wal_autocheckpoint', '0') 21 22 with self.db_cnx: 23 self.db_cnx.execute(""" 24 create table if not exists posts (uri text, create_ts timestamp); 25 create unique index if not exists create_ts_idx on posts(create_ts); 26 """) 27 28 self.logger = logging.getLogger('feeds.outlinetags') 29 30 def process_commit(self, commit): 31 if commit['opType'] != 'c': 32 return 33 34 if commit['collection'] != 'app.bsky.feed.post': 35 return 36 37 record = commit.get('record') 38 if record is None: 39 return 40 41 if not record.get('tags', []): 42 return 43 44 repo = commit['did'] 45 rkey = commit['rkey'] 46 post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}' 47 ts = self.safe_timestamp(record.get('createdAt')).timestamp() 48 self.transaction_begin(self.db_cnx) 49 self.db_cnx.execute( 50 'insert into posts (uri, create_ts) values (:uri, :ts)', 51 dict(uri=post_uri, ts=ts) 52 ) 53 54 def commit_changes(self): 55 self.logger.debug('committing changes') 56 self.transaction_commit(self.db_cnx) 57 self.wal_checkpoint(self.db_cnx, 'RESTART') 58 59 def serve_feed(self, limit, offset, langs): 60 cur = self.db_cnx.execute(self.SERVE_FEED_QUERY, dict(limit=limit, offset=offset)) 61 return [row[0] for row in cur] 62 63 def serve_feed_debug(self, limit, offset, langs): 64 bindings = dict(limit=limit, offset=offset) 65 return apsw.ext.format_query_table( 66 self.db_cnx, self.SERVE_FEED_QUERY, bindings, 67 string_sanitize=2, text_width=9999, use_unicode=True 68 )