Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::store_types::{CountsValue, HourTruncatedCursor, SketchSecretPrefix}; 2use crate::{ 3 error::StorageError, ConsumerInfo, Cursor, EventBatch, JustCount, NsidCount, NsidPrefix, 4 OrderCollectionsBy, PrefixChild, UFOsRecord, 5}; 6use async_trait::async_trait; 7use jetstream::exports::{Did, Nsid}; 8use metrics::{describe_histogram, histogram, Unit}; 9use std::collections::{HashMap, HashSet}; 10use std::path::Path; 11use std::time::{Duration, Instant}; 12use tokio::sync::mpsc::Receiver; 13use tokio_util::sync::CancellationToken; 14 15pub type StorageResult<T> = Result<T, StorageError>; 16 17pub trait StorageWhatever<R: StoreReader, W: StoreWriter<B>, B: StoreBackground, C> { 18 fn init( 19 path: impl AsRef<Path>, 20 endpoint: String, 21 force_endpoint: bool, 22 config: C, 23 ) -> StorageResult<(R, W, Option<Cursor>, SketchSecretPrefix)> 24 where 25 Self: Sized; 26} 27 28#[async_trait] 29pub trait StoreWriter<B: StoreBackground>: Clone + Send + Sync 30where 31 Self: 'static, 32{ 33 fn background_tasks(&mut self, reroll: bool) -> StorageResult<B>; 34 35 async fn receive_batches<const LIMIT: usize>( 36 self, 37 mut batches: Receiver<EventBatch<LIMIT>>, 38 ) -> StorageResult<()> { 39 describe_histogram!( 40 "storage_slow_batches", 41 Unit::Microseconds, 42 "batches that took more than 3s to insert" 43 ); 44 while let Some(event_batch) = batches.recv().await { 45 let token = CancellationToken::new(); 46 let cancelled = token.clone(); 47 tokio::spawn(async move { 48 let started = Instant::now(); 49 let mut concerned = false; 50 loop { 51 tokio::select! { 52 _ = tokio::time::sleep(Duration::from_secs(3)) => { 53 if !concerned { 54 log::warn!("taking a long time to insert an event batch..."); 55 } 56 concerned = true; 57 } 58 _ = cancelled.cancelled() => { 59 if concerned { 60 log::warn!("finally inserted slow event batch (or failed) after {:?}", started.elapsed()); 61 histogram!("storage_slow_batches").record(started.elapsed().as_micros() as f64); 62 } 63 break 64 } 65 } 66 } 67 }); 68 tokio::task::spawn_blocking({ 69 let mut me = self.clone(); 70 move || { 71 let _guard = token.drop_guard(); 72 me.insert_batch(event_batch) 73 } 74 }) 75 .await??; 76 } 77 78 Err(StorageError::BatchSenderExited) 79 } 80 81 fn insert_batch<const LIMIT: usize>( 82 &mut self, 83 event_batch: EventBatch<LIMIT>, 84 ) -> StorageResult<()>; 85 86 fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)>; 87 88 fn trim_collection( 89 &mut self, 90 collection: &Nsid, 91 limit: usize, 92 full_scan: bool, 93 ) -> StorageResult<(usize, usize, bool)>; 94 95 fn delete_account(&mut self, did: &Did) -> StorageResult<usize>; 96} 97 98#[async_trait] 99pub trait StoreBackground: Send + Sync { 100 async fn run(mut self, backfill: bool) -> StorageResult<()>; 101} 102 103#[async_trait] 104pub trait StoreReader: Send + Sync { 105 fn name(&self) -> String; 106 107 fn update_metrics(&self) {} 108 109 async fn get_storage_stats(&self) -> StorageResult<serde_json::Value>; 110 111 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>; 112 113 async fn get_collections( 114 &self, 115 limit: usize, 116 order: OrderCollectionsBy, 117 since: Option<HourTruncatedCursor>, 118 until: Option<HourTruncatedCursor>, 119 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)>; 120 121 async fn get_prefix( 122 &self, 123 prefix: NsidPrefix, 124 limit: usize, 125 order: OrderCollectionsBy, 126 since: Option<HourTruncatedCursor>, 127 until: Option<HourTruncatedCursor>, 128 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)>; 129 130 async fn get_timeseries( 131 &self, 132 collections: Vec<Nsid>, 133 since: HourTruncatedCursor, 134 until: Option<HourTruncatedCursor>, 135 step: u64, 136 ) -> StorageResult<(Vec<HourTruncatedCursor>, HashMap<Nsid, Vec<CountsValue>>)>; 137 138 async fn get_collection_counts( 139 &self, 140 collection: &Nsid, 141 since: HourTruncatedCursor, 142 until: Option<HourTruncatedCursor>, 143 ) -> StorageResult<JustCount>; 144 145 async fn get_records_by_collections( 146 &self, 147 collections: HashSet<Nsid>, 148 limit: usize, 149 expand_each_collection: bool, 150 ) -> StorageResult<Vec<UFOsRecord>>; 151 152 async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>>; 153}