Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

try not to die for longer

Changed files
+4 -4
ufos
+2 -2
ufos/src/consumer.rs
···
pub const MAX_BATCHED_COLLECTIONS: usize = 64; // hard limit, MAX_BATCHED_RECORDS applies per-collection
pub const MIN_BATCH_SPAN_SECS: f64 = 2.; // breathe
pub const MAX_BATCH_SPAN_SECS: f64 = 60.; // hard limit, pause consumer if we're unable to send by now
-
pub const SEND_TIMEOUT_S: f64 = 15.; // if the channel is blocked longer than this, something is probably up
-
pub const BATCH_QUEUE_SIZE: usize = 1; // nearly-rendez-vous
pub type LimitedBatch = EventBatch<MAX_BATCHED_RECORDS>;
···
pub const MAX_BATCHED_COLLECTIONS: usize = 64; // hard limit, MAX_BATCHED_RECORDS applies per-collection
pub const MIN_BATCH_SPAN_SECS: f64 = 2.; // breathe
pub const MAX_BATCH_SPAN_SECS: f64 = 60.; // hard limit, pause consumer if we're unable to send by now
+
pub const SEND_TIMEOUT_S: f64 = 150.; // if the channel is blocked longer than this, something is probably up
+
pub const BATCH_QUEUE_SIZE: usize = 64; // used to be 1, but sometimes inserts are just really slow????????
pub type LimitedBatch = EventBatch<MAX_BATCHED_RECORDS>;
+2 -2
ufos/src/storage.rs
···
let mut concerned = false;
loop {
tokio::select! {
-
_ = tokio::time::sleep(Duration::from_secs_f64(1.)) => {
log::warn!("taking a long time to insert an event batch ({:?})...", started.elapsed());
concerned = true;
}
_ = cancelled.cancelled() => {
if concerned {
-
log::warn!("finally inserted slow event batch after {:?}", started.elapsed());
}
break
}
···
let mut concerned = false;
loop {
tokio::select! {
+
_ = tokio::time::sleep(Duration::from_secs_f64(3.)) => {
log::warn!("taking a long time to insert an event batch ({:?})...", started.elapsed());
concerned = true;
}
_ = cancelled.cancelled() => {
if concerned {
+
log::warn!("finally inserted slow event batch (or failed) after {:?}", started.elapsed());
}
break
}