Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Compare changes

Choose any two refs to compare.

Changed files
+62 -25
constellation
src
consumer
slingshot
+13 -6
constellation/src/consumer/jetstream.rs
···
println!("jetstream closed the websocket cleanly.");
break;
}
-
r => eprintln!("jetstream: close result after error: {r:?}"),
+
Err(_) => {
+
counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "dirty close").increment(1);
+
println!("jetstream failed to close the websocket cleanly.");
+
break;
+
}
+
Ok(r) => {
+
eprintln!("jetstream: close result after error: {r:?}");
+
counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error")
+
.increment(1);
+
// if we didn't immediately get ConnectionClosed, we should keep polling read
+
// until we get it.
+
continue;
+
}
}
-
counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error")
-
.increment(1);
-
// if we didn't immediately get ConnectionClosed, we should keep polling read
-
// until we get it.
-
continue;
}
};
+4 -2
slingshot/src/firehose_cache.rs
···
pub async fn firehose_cache(
cache_dir: impl AsRef<Path>,
+
memory_mb: usize,
+
disk_gb: usize,
) -> Result<HybridCache<String, CachedRecord>, String> {
let cache = HybridCacheBuilder::new()
.with_name("firehose")
-
.memory(64 * 2_usize.pow(20))
+
.memory(memory_mb * 2_usize.pow(20))
.with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v))
.storage(Engine::large())
.with_device_options(
DirectFsDeviceOptions::new(cache_dir)
-
.with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something)
+
.with_capacity(disk_gb * 2_usize.pow(30))
.with_file_size(16 * 2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records
)
.build()
+26 -5
slingshot/src/main.rs
···
/// where to keep disk caches
#[arg(long)]
cache_dir: PathBuf,
+
/// memory cache size in MB
+
#[arg(long, default_value_t = 64)]
+
cache_memory_mb: usize,
+
/// disk cache size in GB
+
#[arg(long, default_value_t = 1)]
+
cache_disk_gb: usize,
+
/// host for HTTP server (when not using --domain)
+
#[arg(long, default_value = "127.0.0.1")]
+
host: String,
+
/// port for HTTP server (when not using --domain)
+
#[arg(long, default_value_t = 3000)]
+
port: u16,
+
/// port for metrics/prometheus server
+
#[arg(long, default_value_t = 8765)]
+
metrics_port: u16,
/// the domain pointing to this server
///
/// if present:
···
let args = Args::parse();
-
if let Err(e) = install_metrics_server() {
+
if let Err(e) = install_metrics_server(args.metrics_port) {
log::error!("failed to install metrics server: {e:?}");
} else {
-
log::info!("metrics listening at http://0.0.0.0:8765");
+
log::info!("metrics listening at http://0.0.0.0:{}", args.metrics_port);
}
std::fs::create_dir_all(&args.cache_dir).map_err(|e| {
···
log::info!("cache dir ready at at {cache_dir:?}.");
log::info!("setting up firehose cache...");
-
let cache = firehose_cache(cache_dir.join("./firehose")).await?;
+
let cache = firehose_cache(
+
cache_dir.join("./firehose"),
+
args.cache_memory_mb,
+
args.cache_disk_gb,
+
)
+
.await?;
log::info!("firehose cache ready.");
let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new();
···
args.domain,
args.acme_contact,
args.certs,
+
args.host,
+
args.port,
server_shutdown,
)
.await?;
···
Ok(())
}
-
fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> {
+
fn install_metrics_server(port: u16) -> Result<(), metrics_exporter_prometheus::BuildError> {
log::info!("installing metrics server...");
let host = [0, 0, 0, 0];
-
let port = 8765;
PrometheusBuilder::new()
.set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
.set_bucket_duration(std::time::Duration::from_secs(300))?
+19 -12
slingshot/src/server.rs
···
Ok(did) => did,
Err(_) => {
let Ok(alleged_handle) = Handle::new(identifier) else {
-
return invalid("identifier was not a valid DID or handle");
+
return invalid("Identifier was not a valid DID or handle");
};
match self.identity.handle_to_did(alleged_handle.clone()).await {
···
Err(e) => {
log::debug!("failed to resolve handle: {e}");
// TODO: ServerError not BadRequest
-
return invalid("errored while trying to resolve handle to DID");
+
return invalid("Errored while trying to resolve handle to DID");
}
}
}
};
let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else {
-
return invalid("failed to get DID doc");
+
return invalid("Failed to get DID doc");
};
let Some(partial_doc) = partial_doc else {
-
return invalid("failed to find DID doc");
+
return invalid("Failed to find DID doc");
};
// ok so here's where we're at:
···
.handle_to_did(partial_doc.unverified_handle.clone())
.await
else {
-
return invalid("failed to get did doc's handle");
+
return invalid("Failed to get DID doc's handle");
};
let Some(handle_did) = handle_did else {
-
return invalid("failed to resolve did doc's handle");
+
return invalid("Failed to resolve DID doc's handle");
};
if handle_did == did {
partial_doc.unverified_handle.to_string()
···
let Ok(handle) = Handle::new(repo) else {
return GetRecordResponse::BadRequest(xrpc_error(
"InvalidRequest",
-
"repo was not a valid DID or handle",
+
"Repo was not a valid DID or handle",
));
};
match self.identity.handle_to_did(handle).await {
···
log::debug!("handle resolution failed: {e}");
return GetRecordResponse::ServerError(xrpc_error(
"ResolutionFailed",
-
"errored while trying to resolve handle to DID",
+
"Errored while trying to resolve handle to DID",
));
}
}
···
let Ok(collection) = Nsid::new(collection) else {
return GetRecordResponse::BadRequest(xrpc_error(
"InvalidRequest",
-
"invalid NSID for collection",
+
"Invalid NSID for collection",
));
};
let Ok(rkey) = RecordKey::new(rkey) else {
-
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid rkey"));
+
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey"));
};
let cid: Option<Cid> = if let Some(cid) = cid {
let Ok(cid) = Cid::from_str(&cid) else {
-
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid CID"));
+
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID"));
};
Some(cid)
} else {
···
domain: Option<String>,
acme_contact: Option<String>,
certs: Option<PathBuf>,
+
host: String,
+
port: u16,
shutdown: CancellationToken,
) -> Result<(), ServerError> {
let repo = Arc::new(repo);
···
)
.await
} else {
-
run(TcpListener::bind("127.0.0.1:3000"), app, shutdown).await
+
run(
+
TcpListener::bind(format!("{host}:{port}")),
+
app,
+
shutdown,
+
)
+
.await
}
}