Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

background task for backups

Changed files
+134 -33
constellation
src
bin
storage
+25 -15
constellation/src/bin/main.rs
···
-
use anyhow::Result;
use clap::{Parser, ValueEnum};
use metrics_exporter_prometheus::PrometheusBuilder;
use std::num::NonZero;
···
/// Initiate a database backup into this dir, if supported by the storage
#[arg(long)]
backup: Option<PathBuf>,
/// Saved jsonl from jetstream to use instead of a live subscription
#[arg(short, long)]
fixture: Option<PathBuf>,
···
let stream = jetstream_url(&args.jetstream);
println!("using jetstream server {stream:?}...",);
match args.backend {
-
StorageBackend::Memory => run(MemStorage::new(), fixture, None, stream),
#[cfg(feature = "rocks")]
StorageBackend::Rocks => {
let storage_dir = args.data.clone().unwrap_or("rocks.test".into());
println!("starting rocksdb...");
-
let rocks = RocksStorage::new(storage_dir)?;
if let Some(backup_dir) = args.backup {
-
rocks.start_backup(backup_dir)?;
}
println!("rocks ready.");
-
run(rocks, fixture, args.data, stream)
}
}
}
···
fixture: Option<PathBuf>,
data_dir: Option<PathBuf>,
stream: String,
) -> Result<()> {
-
let stay_alive = CancellationToken::new();
-
ctrlc::set_handler({
let mut desperation: u8 = 0;
let stay_alive = stay_alive.clone();
-
move || {
-
match desperation {
-
0 => {
-
println!("ok, shutting down...");
-
stay_alive.cancel();
-
}
-
1.. => panic!("fine, panicking!"),
}
-
desperation += 1;
}
})?;
···
+
use anyhow::{bail, Result};
use clap::{Parser, ValueEnum};
use metrics_exporter_prometheus::PrometheusBuilder;
use std::num::NonZero;
···
/// Initiate a database backup into this dir, if supported by the storage
#[arg(long)]
backup: Option<PathBuf>,
+
/// Start a background task to take backups every N hours
+
#[arg(long)]
+
backup_interval: Option<u64>,
+
/// If backup_interval is configured, purge the oldest backup once this many backups are saved
+
#[arg(long)]
+
max_old_backups: Option<usize>,
/// Saved jsonl from jetstream to use instead of a live subscription
#[arg(short, long)]
fixture: Option<PathBuf>,
···
let stream = jetstream_url(&args.jetstream);
println!("using jetstream server {stream:?}...",);
+
+
let stay_alive = CancellationToken::new();
match args.backend {
+
StorageBackend::Memory => run(MemStorage::new(), fixture, None, stream, stay_alive),
#[cfg(feature = "rocks")]
StorageBackend::Rocks => {
let storage_dir = args.data.clone().unwrap_or("rocks.test".into());
println!("starting rocksdb...");
+
let mut rocks = RocksStorage::new(storage_dir)?;
if let Some(backup_dir) = args.backup {
+
let auto_backup = match (args.backup_interval, args.max_old_backups) {
+
(Some(interval_hrs), copies) => Some((interval_hrs, copies)),
+
(None, None) => None,
+
(None, Some(_)) => bail!("invalid backup config: --max-old-backups requires --backup-interval to be configured"),
+
};
+
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
}
println!("rocks ready.");
+
run(rocks, fixture, args.data, stream, stay_alive)
}
}
}
···
fixture: Option<PathBuf>,
data_dir: Option<PathBuf>,
stream: String,
+
stay_alive: CancellationToken,
) -> Result<()> {
ctrlc::set_handler({
let mut desperation: u8 = 0;
let stay_alive = stay_alive.clone();
+
move || match desperation {
+
0 => {
+
println!("ok, shutting down...");
+
stay_alive.cancel();
+
desperation += 1;
}
+
1.. => panic!("fine, panicking!"),
}
})?;
+109 -18
constellation/src/storage/rocks_store.rs
···
use bincode::Options as BincodeOptions;
use links::CollectedLink;
use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
use rocksdb::{
AsColumnFamilyRef, ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, MergeOperands,
MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
···
use std::collections::{HashMap, HashSet};
use std::io::Read;
use std::marker::PhantomData;
-
use std::path::Path;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
-
use std::time::Instant;
static DID_IDS_CF: &str = "did_ids";
static TARGET_IDS_CF: &str = "target_ids";
···
did_id_table: IdTable<Did, DidIdValue, true>,
target_id_table: IdTable<TargetKey, TargetId, false>,
is_writer: bool,
}
trait IdTableValue: ValueFromRocks + Clone {
···
did_id_table,
target_id_table,
is_writer: true,
})
}
-
pub fn start_backup(&self, path: impl AsRef<Path>) -> Result<()> {
-
use rocksdb::backup::{BackupEngine, BackupEngineOptions};
-
eprintln!("getting ready to start backup...");
-
let mut engine =
-
BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
-
std::thread::spawn({
-
let db = self.db.clone();
-
move || {
-
eprintln!("backup starting.");
-
let t0 = Instant::now();
-
if let Err(e) = engine.create_new_backup(&db) {
-
eprintln!("oh no, backup failed: {e:?}");
} else {
-
eprintln!("yay, backup worked?");
}
-
eprintln!("backup finished after {:.2}s", t0.elapsed().as_secs_f32());
}
-
});
-
eprintln!("backups should be happening in bg thread.");
Ok(())
}
···
opt
}) {
eprintln!("rocks: flushing memtables failed: {e:?}");
}
self.db.cancel_all_background_work(true);
}
···
use bincode::Options as BincodeOptions;
use links::CollectedLink;
use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
+
use ratelimit::Ratelimiter;
+
use rocksdb::backup::{BackupEngine, BackupEngineOptions};
use rocksdb::{
AsColumnFamilyRef, ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, MergeOperands,
MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
···
use std::collections::{HashMap, HashSet};
use std::io::Read;
use std::marker::PhantomData;
+
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
+
use std::thread;
+
use std::time::{Duration, Instant};
+
use tokio_util::sync::CancellationToken;
static DID_IDS_CF: &str = "did_ids";
static TARGET_IDS_CF: &str = "target_ids";
···
did_id_table: IdTable<Did, DidIdValue, true>,
target_id_table: IdTable<TargetKey, TargetId, false>,
is_writer: bool,
+
backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>,
}
trait IdTableValue: ValueFromRocks + Clone {
···
did_id_table,
target_id_table,
is_writer: true,
+
backup_task: None.into(),
})
}
+
pub fn start_backup(
+
&mut self,
+
path: PathBuf,
+
auto: Option<(u64, Option<usize>)>,
+
stay_alive: CancellationToken,
+
) -> Result<()> {
+
let task = if let Some((interval_hrs, copies)) = auto {
+
eprintln!("backups: starting background task...");
+
self.backup_task(path, interval_hrs, copies, stay_alive)
+
} else {
+
eprintln!("backups: starting a one-off backup...");
+
thread::spawn({
+
let db = self.db.clone();
+
move || Self::do_backup(db, path)
+
})
+
};
+
self.backup_task = Arc::new(Some(task));
+
Ok(())
+
}
+
+
fn backup_task(
+
&self,
+
path: PathBuf,
+
interval_hrs: u64,
+
copies: Option<usize>,
+
stay_alive: CancellationToken,
+
) -> std::thread::JoinHandle<Result<()>> {
+
let db = self.db.clone();
+
thread::spawn(move || {
+
let limit =
+
Ratelimiter::builder(1, Duration::from_secs(interval_hrs * 60 * 60)).build()?;
+
let minimum_sleep = Duration::from_secs(1);
+
+
'quit: loop {
+
if let Err(sleep) = limit.try_wait() {
+
eprintln!("backups: background: next backup scheduled in {sleep:?}");
+
let waiting = Instant::now();
+
loop {
+
let remaining = sleep - waiting.elapsed();
+
if stay_alive.is_cancelled() {
+
break 'quit;
+
} else if remaining <= Duration::ZERO {
+
break;
+
} else if remaining < minimum_sleep {
+
thread::sleep(remaining);
+
break;
+
} else {
+
thread::sleep(minimum_sleep);
+
}
+
}
+
}
+
eprintln!("backups: background: starting backup...");
+
if let Err(e) = Self::do_backup(db.clone(), &path) {
+
eprintln!("backups: background: backup failed: {e:?}");
+
// todo: metrics
} else {
+
eprintln!("backups: background: backup succeeded yay");
+
}
+
if let Some(copies) = copies {
+
eprintln!("backups: background: trimming to {copies} saved backups...");
+
if let Err(e) = Self::trim_backups(copies, &path) {
+
eprintln!("backups: background: failed to trim backups: {e:?}");
+
} else {
+
eprintln!("backups: background: trimming worked!")
+
}
}
}
+
+
Ok(())
+
})
+
}
+
+
fn do_backup(db: Arc<DBWithThreadMode<MultiThreaded>>, path: impl AsRef<Path>) -> Result<()> {
+
let mut engine =
+
BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
+
eprintln!("backups: starting backup...");
+
let t0 = Instant::now();
+
if let Err(e) = engine.create_new_backup(&db) {
+
eprintln!("backups: oh no, backup failed: {e:?}");
+
} else {
+
eprintln!("backups: yay, backup worked?");
+
}
+
eprintln!(
+
"backups: backup finished after {:.2}s",
+
t0.elapsed().as_secs_f32()
+
);
+
Ok(())
+
}
+
+
fn trim_backups(num_backups_to_keep: usize, path: impl AsRef<Path>) -> Result<()> {
+
let mut engine =
+
BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
+
engine.purge_old_backups(num_backups_to_keep)?;
Ok(())
}
···
opt
}) {
eprintln!("rocks: flushing memtables failed: {e:?}");
+
}
+
match Arc::get_mut(&mut self.backup_task) {
+
Some(maybe_task) => {
+
if let Some(task) = maybe_task.take() {
+
eprintln!("waiting for backup task to complete...");
+
if let Err(e) = task.join() {
+
eprintln!("failed to join backup task: {e:?}");
+
}
+
}
+
}
+
None => eprintln!("rocks: failed to get backup task, likely a bug."),
}
self.db.cancel_all_background_work(true);
}