this repo has no description
1import logging
2
3import apsw
4
5class FirehoseManager:
6 def __init__(self, fname='firehose.db'):
7 self.db_cnx = apsw.Connection(fname)
8 self.db_cnx.pragma('journal_mode', 'WAL')
9 with self.db_cnx:
10 self.db_cnx.execute("create table if not exists firehose(key text unique, value text)")
11
12 self.logger = logging.getLogger('feeds.firehose')
13
14 def get_sequence_number(self):
15 cur = self.db_cnx.execute("select * from firehose where key = 'seq'")
16 row = cur.fetchone()
17 if row is None:
18 return None
19 (key, value) = row
20 return int(value)
21
22 def set_sequence_number(self, value):
23 self.logger.debug(f'setting sequence number = {value}')
24
25 with self.db_cnx:
26 self.db_cnx.execute(
27 "insert into firehose (key, value) values ('seq', :value) on conflict(key) do update set value = :value",
28 dict(value=value)
29 )
30
31 self.db_cnx.pragma('wal_checkpoint(TRUNCATE)')