forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::store_types::{CountsValue, HourTruncatedCursor, SketchSecretPrefix};
2use crate::{
3 error::StorageError, ConsumerInfo, Cursor, EventBatch, JustCount, NsidCount, NsidPrefix,
4 OrderCollectionsBy, PrefixChild, UFOsRecord,
5};
6use async_trait::async_trait;
7use jetstream::exports::{Did, Nsid};
8use metrics::{describe_histogram, histogram, Unit};
9use std::collections::{HashMap, HashSet};
10use std::path::Path;
11use std::time::{Duration, Instant};
12use tokio::sync::mpsc::Receiver;
13use tokio_util::sync::CancellationToken;
14
15pub type StorageResult<T> = Result<T, StorageError>;
16
17pub trait StorageWhatever<R: StoreReader, W: StoreWriter<B>, B: StoreBackground, C> {
18 fn init(
19 path: impl AsRef<Path>,
20 endpoint: String,
21 force_endpoint: bool,
22 config: C,
23 ) -> StorageResult<(R, W, Option<Cursor>, SketchSecretPrefix)>
24 where
25 Self: Sized;
26}
27
28#[async_trait]
29pub trait StoreWriter<B: StoreBackground>: Clone + Send + Sync
30where
31 Self: 'static,
32{
33 fn background_tasks(&mut self, reroll: bool) -> StorageResult<B>;
34
35 async fn receive_batches<const LIMIT: usize>(
36 self,
37 mut batches: Receiver<EventBatch<LIMIT>>,
38 ) -> StorageResult<()> {
39 describe_histogram!(
40 "storage_slow_batches",
41 Unit::Microseconds,
42 "batches that took more than 3s to insert"
43 );
44 while let Some(event_batch) = batches.recv().await {
45 let token = CancellationToken::new();
46 let cancelled = token.clone();
47 tokio::spawn(async move {
48 let started = Instant::now();
49 let mut concerned = false;
50 loop {
51 tokio::select! {
52 _ = tokio::time::sleep(Duration::from_secs(3)) => {
53 if !concerned {
54 log::warn!("taking a long time to insert an event batch...");
55 }
56 concerned = true;
57 }
58 _ = cancelled.cancelled() => {
59 if concerned {
60 log::warn!("finally inserted slow event batch (or failed) after {:?}", started.elapsed());
61 histogram!("storage_slow_batches").record(started.elapsed().as_micros() as f64);
62 }
63 break
64 }
65 }
66 }
67 });
68 tokio::task::spawn_blocking({
69 let mut me = self.clone();
70 move || {
71 let _guard = token.drop_guard();
72 me.insert_batch(event_batch)
73 }
74 })
75 .await??;
76 }
77
78 Err(StorageError::BatchSenderExited)
79 }
80
81 fn insert_batch<const LIMIT: usize>(
82 &mut self,
83 event_batch: EventBatch<LIMIT>,
84 ) -> StorageResult<()>;
85
86 fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)>;
87
88 fn trim_collection(
89 &mut self,
90 collection: &Nsid,
91 limit: usize,
92 full_scan: bool,
93 ) -> StorageResult<(usize, usize, bool)>;
94
95 fn delete_account(&mut self, did: &Did) -> StorageResult<usize>;
96}
97
98#[async_trait]
99pub trait StoreBackground: Send + Sync {
100 async fn run(mut self, backfill: bool) -> StorageResult<()>;
101}
102
103#[async_trait]
104pub trait StoreReader: Send + Sync {
105 fn name(&self) -> String;
106
107 fn update_metrics(&self) {}
108
109 async fn get_storage_stats(&self) -> StorageResult<serde_json::Value>;
110
111 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>;
112
113 async fn get_collections(
114 &self,
115 limit: usize,
116 order: OrderCollectionsBy,
117 since: Option<HourTruncatedCursor>,
118 until: Option<HourTruncatedCursor>,
119 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)>;
120
121 async fn get_prefix(
122 &self,
123 prefix: NsidPrefix,
124 limit: usize,
125 order: OrderCollectionsBy,
126 since: Option<HourTruncatedCursor>,
127 until: Option<HourTruncatedCursor>,
128 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)>;
129
130 async fn get_timeseries(
131 &self,
132 collections: Vec<Nsid>,
133 since: HourTruncatedCursor,
134 until: Option<HourTruncatedCursor>,
135 step: u64,
136 ) -> StorageResult<(Vec<HourTruncatedCursor>, HashMap<Nsid, Vec<CountsValue>>)>;
137
138 async fn get_collection_counts(
139 &self,
140 collection: &Nsid,
141 since: HourTruncatedCursor,
142 until: Option<HourTruncatedCursor>,
143 ) -> StorageResult<JustCount>;
144
145 async fn get_records_by_collections(
146 &self,
147 collections: HashSet<Nsid>,
148 limit: usize,
149 expand_each_collection: bool,
150 ) -> StorageResult<Vec<UFOsRecord>>;
151
152 async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>>;
153}