tracks lexicons and how many times they appeared on the jetstream
1use std::{
2 ops::{Bound, Deref, RangeBounds},
3 path::Path,
4 time::Duration,
5};
6
7use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
8use pingora_limits::rate::Rate;
9use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
10use smol_str::SmolStr;
11use tokio::sync::broadcast;
12
13use crate::{
14 error::{AppError, AppResult},
15 jetstream::JetstreamEvent,
16};
17
18#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
19#[rkyv(compare(PartialEq), derive(Debug))]
20pub struct NsidCounts {
21 pub count: u128,
22 pub deleted_count: u128,
23 pub last_seen: u64,
24}
25
26#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
27#[rkyv(compare(PartialEq), derive(Debug))]
28pub struct NsidHit {
29 pub deleted: bool,
30}
31
32pub struct EventRecord {
33 pub nsid: SmolStr,
34 pub timestamp: u64,
35 pub deleted: bool,
36}
37
38impl EventRecord {
39 pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
40 match event {
41 JetstreamEvent::Commit {
42 time_us, commit, ..
43 } => Some(Self {
44 nsid: commit.collection.into(),
45 timestamp: time_us,
46 deleted: false,
47 }),
48 JetstreamEvent::Delete {
49 time_us, commit, ..
50 } => Some(Self {
51 nsid: commit.collection.into(),
52 timestamp: time_us,
53 deleted: true,
54 }),
55 _ => None,
56 }
57 }
58}
59
60// counts is nsid -> NsidCounts
61// hits is tree per nsid: timestamp -> NsidHit
62pub struct Db {
63 inner: Keyspace,
64 hits: papaya::HashMap<SmolStr, Partition>,
65 counts: Partition,
66 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
67 eps: Rate,
68}
69
70impl Db {
71 pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
72 tracing::info!("opening db...");
73 let ks = Config::new(path)
74 .cache_size(8 * 1024 * 1024) // from talna
75 .open()?;
76 Ok(Self {
77 hits: Default::default(),
78 counts: ks.open_partition(
79 "_counts",
80 PartitionCreateOptions::default().compression(fjall::CompressionType::None),
81 )?,
82 inner: ks,
83 event_broadcaster: broadcast::channel(1000).0,
84 eps: Rate::new(Duration::from_secs(1)),
85 })
86 }
87
88 pub fn eps(&self) -> usize {
89 self.eps.rate(&()) as usize
90 }
91
92 pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
93 self.event_broadcaster.subscribe()
94 }
95
96 #[inline(always)]
97 fn run_in_nsid_tree<T>(
98 &self,
99 nsid: &str,
100 f: impl FnOnce(&Partition) -> AppResult<T>,
101 ) -> AppResult<T> {
102 f(self.hits.pin().get_or_insert_with(SmolStr::new(nsid), || {
103 let opts = PartitionCreateOptions::default()
104 .compression(fjall::CompressionType::Miniz(9))
105 .compaction_strategy(fjall::compaction::Strategy::Fifo(fjall::compaction::Fifo {
106 limit: 5 * 1024 * 1024 * 1024, // 5 gb
107 ttl_seconds: Some(60 * 60 * 24 * 30), // 30 days
108 }));
109 self.inner.open_partition(nsid, opts).unwrap()
110 }))
111 }
112
113 pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
114 let EventRecord {
115 nsid,
116 timestamp,
117 deleted,
118 } = e;
119
120 self.insert_event(&nsid, timestamp, deleted)?;
121 // increment count
122 let mut counts = self.get_count(&nsid)?;
123 counts.last_seen = timestamp;
124 if deleted {
125 counts.deleted_count += 1;
126 } else {
127 counts.count += 1;
128 }
129 self.insert_count(&nsid, counts.clone())?;
130 if self.event_broadcaster.receiver_count() > 0 {
131 let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
132 }
133 self.eps.observe(&(), 1);
134 Ok(())
135 }
136
137 #[inline(always)]
138 fn insert_event(&self, nsid: &str, timestamp: u64, deleted: bool) -> AppResult<()> {
139 self.run_in_nsid_tree(nsid, |tree| {
140 tree.insert(
141 timestamp.to_be_bytes(),
142 unsafe { rkyv::to_bytes::<Error>(&NsidHit { deleted }).unwrap_unchecked() }
143 .as_slice(),
144 )
145 .map_err(AppError::from)
146 })
147 }
148
149 #[inline(always)]
150 fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
151 self.counts
152 .insert(
153 nsid,
154 unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
155 )
156 .map_err(AppError::from)
157 }
158
159 pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
160 let Some(raw) = self.counts.get(nsid)? else {
161 return Ok(NsidCounts::default());
162 };
163 Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
164 }
165
166 pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
167 self.counts.iter().map(|res| {
168 res.map_err(AppError::from).map(|(key, val)| {
169 (
170 SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
171 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
172 )
173 })
174 })
175 }
176
177 pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str>> {
178 self.inner
179 .list_partitions()
180 .into_iter()
181 .filter(|k| k.deref() != "_counts")
182 }
183
184 pub fn get_hits(
185 &self,
186 nsid: &str,
187 range: impl RangeBounds<u64>,
188 ) -> AppResult<Box<dyn Iterator<Item = AppResult<(u64, NsidHit)>>>> {
189 let start = range.start_bound().cloned().map(u64::to_be_bytes);
190 let end = range.end_bound().cloned().map(u64::to_be_bytes);
191
192 let _guard = self.hits.guard();
193 let Some(tree) = self.hits.get(nsid, &_guard) else {
194 return Ok(Box::new(std::iter::empty()));
195 };
196
197 Ok(Box::new(tree.range(TimestampRange { start, end }).map(
198 |res| {
199 res.map_err(AppError::from).map(|(key, val)| {
200 (
201 u64::from_be_bytes(key.as_ref().try_into().unwrap()),
202 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
203 )
204 })
205 },
206 )))
207 }
208}
209
210type TimestampRepr = [u8; 8];
211
212struct TimestampRange {
213 start: Bound<TimestampRepr>,
214 end: Bound<TimestampRepr>,
215}
216
217impl RangeBounds<TimestampRepr> for TimestampRange {
218 #[inline(always)]
219 fn start_bound(&self) -> Bound<&TimestampRepr> {
220 self.start.as_ref()
221 }
222
223 #[inline(always)]
224 fn end_bound(&self) -> Bound<&TimestampRepr> {
225 self.end.as_ref()
226 }
227}