forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::store_types::{CountsValue, HourTruncatedCursor, SketchSecretPrefix};
2use crate::{
3 error::StorageError, ConsumerInfo, Cursor, EventBatch, JustCount, NsidCount, NsidPrefix,
4 OrderCollectionsBy, PrefixChild, UFOsRecord,
5};
6use async_trait::async_trait;
7use jetstream::exports::{Did, Nsid};
8use metrics::{describe_histogram, histogram, Unit};
9use std::collections::{HashMap, HashSet};
10use std::path::Path;
11use std::time::{Duration, Instant};
12use tokio::sync::mpsc::Receiver;
13use tokio_util::sync::CancellationToken;
14
15pub type StorageResult<T> = Result<T, StorageError>;
16
17pub trait StorageWhatever<R: StoreReader, W: StoreWriter<B>, B: StoreBackground, C> {
18 fn init(
19 path: impl AsRef<Path>,
20 endpoint: String,
21 force_endpoint: bool,
22 config: C,
23 ) -> StorageResult<(R, W, Option<Cursor>, SketchSecretPrefix)>
24 where
25 Self: Sized;
26}
27
28#[async_trait]
29pub trait StoreWriter<B: StoreBackground>: Clone + Send + Sync
30where
31 Self: 'static,
32{
33 fn background_tasks(&mut self, reroll: bool) -> StorageResult<B>;
34
35 async fn receive_batches<const LIMIT: usize>(
36 self,
37 mut batches: Receiver<EventBatch<LIMIT>>,
38 ) -> StorageResult<()> {
39 describe_histogram!(
40 "storage_slow_batches",
41 Unit::Microseconds,
42 "batches that took more than 3s to insert"
43 );
44 describe_histogram!(
45 "storage_batch_insert_time",
46 Unit::Microseconds,
47 "total time to insert one commit batch"
48 );
49 while let Some(event_batch) = batches.recv().await {
50 let token = CancellationToken::new();
51 let cancelled = token.clone();
52 tokio::spawn(async move {
53 let started = Instant::now();
54 let mut concerned = false;
55 loop {
56 tokio::select! {
57 _ = tokio::time::sleep(Duration::from_secs(3)) => {
58 if !concerned {
59 log::warn!("taking a long time to insert an event batch...");
60 }
61 concerned = true;
62 }
63 _ = cancelled.cancelled() => {
64 if concerned {
65 log::warn!("finally inserted slow event batch (or failed) after {:?}", started.elapsed());
66 histogram!("storage_slow_batches").record(started.elapsed().as_micros() as f64);
67 }
68 break
69 }
70 }
71 }
72 });
73 tokio::task::spawn_blocking({
74 let mut me = self.clone();
75 move || {
76 let _guard = token.drop_guard();
77 let t0 = Instant::now();
78 let r = me.insert_batch(event_batch);
79 histogram!("storage_batch_insert_time").record(t0.elapsed().as_micros() as f64);
80 r
81 }
82 })
83 .await??;
84 }
85
86 Err(StorageError::BatchSenderExited)
87 }
88
89 fn insert_batch<const LIMIT: usize>(
90 &mut self,
91 event_batch: EventBatch<LIMIT>,
92 ) -> StorageResult<()>;
93
94 fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)>;
95
96 fn trim_collection(
97 &mut self,
98 collection: &Nsid,
99 limit: usize,
100 full_scan: bool,
101 ) -> StorageResult<(usize, usize, bool)>;
102
103 fn delete_account(&mut self, did: &Did) -> StorageResult<usize>;
104}
105
106#[async_trait]
107pub trait StoreBackground: Send + Sync {
108 async fn run(mut self, backfill: bool) -> StorageResult<()>;
109}
110
111#[async_trait]
112pub trait StoreReader: Send + Sync {
113 fn name(&self) -> String;
114
115 fn update_metrics(&self) {}
116
117 async fn get_storage_stats(&self) -> StorageResult<serde_json::Value>;
118
119 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>;
120
121 async fn get_collections(
122 &self,
123 limit: usize,
124 order: OrderCollectionsBy,
125 since: Option<HourTruncatedCursor>,
126 until: Option<HourTruncatedCursor>,
127 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)>;
128
129 async fn get_prefix(
130 &self,
131 prefix: NsidPrefix,
132 limit: usize,
133 order: OrderCollectionsBy,
134 since: Option<HourTruncatedCursor>,
135 until: Option<HourTruncatedCursor>,
136 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)>;
137
138 async fn get_timeseries(
139 &self,
140 collections: Vec<Nsid>,
141 since: HourTruncatedCursor,
142 until: Option<HourTruncatedCursor>,
143 step: u64,
144 ) -> StorageResult<(Vec<HourTruncatedCursor>, HashMap<Nsid, Vec<CountsValue>>)>;
145
146 async fn get_collection_counts(
147 &self,
148 collection: &Nsid,
149 since: HourTruncatedCursor,
150 until: Option<HourTruncatedCursor>,
151 ) -> StorageResult<JustCount>;
152
153 async fn get_records_by_collections(
154 &self,
155 collections: HashSet<Nsid>,
156 limit: usize,
157 expand_each_collection: bool,
158 ) -> StorageResult<Vec<UFOsRecord>>;
159
160 async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>>;
161}