Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::store_types::{CountsValue, HourTruncatedCursor, SketchSecretPrefix}; 2use crate::{ 3 error::StorageError, ConsumerInfo, Cursor, EventBatch, JustCount, NsidCount, NsidPrefix, 4 OrderCollectionsBy, PrefixChild, UFOsRecord, 5}; 6use async_trait::async_trait; 7use jetstream::exports::{Did, Nsid}; 8use metrics::{describe_histogram, histogram, Unit}; 9use std::collections::{HashMap, HashSet}; 10use std::path::Path; 11use std::time::{Duration, Instant}; 12use tokio::sync::mpsc::Receiver; 13use tokio_util::sync::CancellationToken; 14 15pub type StorageResult<T> = Result<T, StorageError>; 16 17pub trait StorageWhatever<R: StoreReader, W: StoreWriter<B>, B: StoreBackground, C> { 18 fn init( 19 path: impl AsRef<Path>, 20 endpoint: String, 21 force_endpoint: bool, 22 config: C, 23 ) -> StorageResult<(R, W, Option<Cursor>, SketchSecretPrefix)> 24 where 25 Self: Sized; 26} 27 28#[async_trait] 29pub trait StoreWriter<B: StoreBackground>: Clone + Send + Sync 30where 31 Self: 'static, 32{ 33 fn background_tasks(&mut self, reroll: bool) -> StorageResult<B>; 34 35 async fn receive_batches<const LIMIT: usize>( 36 self, 37 mut batches: Receiver<EventBatch<LIMIT>>, 38 ) -> StorageResult<()> { 39 describe_histogram!( 40 "storage_slow_batches", 41 Unit::Microseconds, 42 "batches that took more than 3s to insert" 43 ); 44 describe_histogram!( 45 "storage_batch_insert_time", 46 Unit::Microseconds, 47 "total time to insert one commit batch" 48 ); 49 while let Some(event_batch) = batches.recv().await { 50 let token = CancellationToken::new(); 51 let cancelled = token.clone(); 52 tokio::spawn(async move { 53 let started = Instant::now(); 54 let mut concerned = false; 55 loop { 56 tokio::select! { 57 _ = tokio::time::sleep(Duration::from_secs(3)) => { 58 if !concerned { 59 log::warn!("taking a long time to insert an event batch..."); 60 } 61 concerned = true; 62 } 63 _ = cancelled.cancelled() => { 64 if concerned { 65 log::warn!("finally inserted slow event batch (or failed) after {:?}", started.elapsed()); 66 histogram!("storage_slow_batches").record(started.elapsed().as_micros() as f64); 67 } 68 break 69 } 70 } 71 } 72 }); 73 tokio::task::spawn_blocking({ 74 let mut me = self.clone(); 75 move || { 76 let _guard = token.drop_guard(); 77 let t0 = Instant::now(); 78 let r = me.insert_batch(event_batch); 79 histogram!("storage_batch_insert_time").record(t0.elapsed().as_micros() as f64); 80 r 81 } 82 }) 83 .await??; 84 } 85 86 Err(StorageError::BatchSenderExited) 87 } 88 89 fn insert_batch<const LIMIT: usize>( 90 &mut self, 91 event_batch: EventBatch<LIMIT>, 92 ) -> StorageResult<()>; 93 94 fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)>; 95 96 fn trim_collection( 97 &mut self, 98 collection: &Nsid, 99 limit: usize, 100 full_scan: bool, 101 ) -> StorageResult<(usize, usize, bool)>; 102 103 fn delete_account(&mut self, did: &Did) -> StorageResult<usize>; 104} 105 106#[async_trait] 107pub trait StoreBackground: Send + Sync { 108 async fn run(mut self, backfill: bool) -> StorageResult<()>; 109} 110 111#[async_trait] 112pub trait StoreReader: Send + Sync { 113 fn name(&self) -> String; 114 115 fn update_metrics(&self) {} 116 117 async fn get_storage_stats(&self) -> StorageResult<serde_json::Value>; 118 119 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>; 120 121 async fn get_collections( 122 &self, 123 limit: usize, 124 order: OrderCollectionsBy, 125 since: Option<HourTruncatedCursor>, 126 until: Option<HourTruncatedCursor>, 127 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)>; 128 129 async fn get_prefix( 130 &self, 131 prefix: NsidPrefix, 132 limit: usize, 133 order: OrderCollectionsBy, 134 since: Option<HourTruncatedCursor>, 135 until: Option<HourTruncatedCursor>, 136 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)>; 137 138 async fn get_timeseries( 139 &self, 140 collections: Vec<Nsid>, 141 since: HourTruncatedCursor, 142 until: Option<HourTruncatedCursor>, 143 step: u64, 144 ) -> StorageResult<(Vec<HourTruncatedCursor>, HashMap<Nsid, Vec<CountsValue>>)>; 145 146 async fn get_collection_counts( 147 &self, 148 collection: &Nsid, 149 since: HourTruncatedCursor, 150 until: Option<HourTruncatedCursor>, 151 ) -> StorageResult<JustCount>; 152 153 async fn get_records_by_collections( 154 &self, 155 collections: HashSet<Nsid>, 156 limit: usize, 157 expand_each_collection: bool, 158 ) -> StorageResult<Vec<UFOsRecord>>; 159 160 async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>>; 161}