Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

backup?

Changed files
+28
constellation
src
bin
storage
+6
constellation/src/bin/main.rs
···
#[arg(short, long)]
#[clap(value_enum, default_value_t = StorageBackend::Memory)]
backend: StorageBackend,
+
/// Initiate a database backup into this dir, if supported by the storage
+
#[arg(long)]
+
backup: Option<PathBuf>,
/// Saved jsonl from jetstream to use instead of a live subscription
#[arg(short, long)]
fixture: Option<PathBuf>,
···
let storage_dir = args.data.clone().unwrap_or("rocks.test".into());
println!("starting rocksdb...");
let rocks = RocksStorage::new(storage_dir)?;
+
if let Some(backup_dir) = args.backup {
+
rocks.start_backup(backup_dir)?;
+
}
println!("rocks ready.");
run(rocks, fixture, args.data, stream)
}
+22
constellation/src/storage/rocks_store.rs
···
})
}
+
pub fn start_backup(&self, path: impl AsRef<Path>) -> Result<()> {
+
use rocksdb::backup::{BackupEngine, BackupEngineOptions};
+
eprintln!("getting ready to start backup...");
+
let mut engine =
+
BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
+
std::thread::spawn({
+
let db = self.db.clone();
+
move || {
+
eprintln!("backup starting.");
+
let t0 = Instant::now();
+
if let Err(e) = engine.create_new_backup(&db) {
+
eprintln!("oh no, backup failed: {e:?}");
+
} else {
+
eprintln!("yay, backup worked?");
+
}
+
eprintln!("backup finished after {:.2}s", t0.elapsed().as_secs_f32());
+
}
+
});
+
eprintln!("backups should be happening in bg thread.");
+
Ok(())
+
}
+
fn describe_metrics() {
describe_histogram!(
"storage_rocksdb_read_seconds",