Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Compare changes

Choose any two refs to compare.

+9 -9
Cargo.lock
···
[[package]]
name = "clap"
-
version = "4.5.47"
+
version = "4.5.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "7eac00902d9d136acd712710d71823fb8ac8004ca445a89e73a41d45aa712931"
+
checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae"
dependencies = [
"clap_builder",
"clap_derive",
···
[[package]]
name = "clap_builder"
-
version = "4.5.47"
+
version = "4.5.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "2ad9bbf750e73b5884fb8a211a9424a1906c1e156724260fdae972f31d70e1d6"
+
checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9"
dependencies = [
"anstream",
"anstyle",
···
checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f"
dependencies = [
"data-encoding",
-
"syn 1.0.109",
+
"syn 2.0.106",
[[package]]
···
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
dependencies = [
"cfg-if",
-
"windows-targets 0.48.5",
+
"windows-targets 0.52.6",
[[package]]
···
[[package]]
name = "reqwest"
-
version = "0.12.22"
+
version = "0.12.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531"
+
checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb"
dependencies = [
"async-compression",
"base64 0.22.1",
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
-
"windows-sys 0.48.0",
+
"windows-sys 0.59.0",
[[package]]
+1 -1
constellation/src/bin/main.rs
···
let rocks = rocks.clone();
let stay_alive = stay_alive.clone();
s.spawn(move || {
-
let rep = rocks.run_repair(time::Duration::from_millis(1), stay_alive);
+
let rep = rocks.run_repair(time::Duration::from_millis(0), stay_alive);
eprintln!("repair finished: {rep:?}");
rep
});
+13 -6
constellation/src/consumer/jetstream.rs
···
println!("jetstream closed the websocket cleanly.");
break;
}
-
r => eprintln!("jetstream: close result after error: {r:?}"),
+
Err(_) => {
+
counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "dirty close").increment(1);
+
println!("jetstream failed to close the websocket cleanly.");
+
break;
+
}
+
Ok(r) => {
+
eprintln!("jetstream: close result after error: {r:?}");
+
counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error")
+
.increment(1);
+
// if we didn't immediately get ConnectionClosed, we should keep polling read
+
// until we get it.
+
continue;
+
}
}
-
counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error")
-
.increment(1);
-
// if we didn't immediately get ConnectionClosed, we should keep polling read
-
// until we get it.
-
continue;
}
};
+8 -6
constellation/src/server/filters.rs
···
Ok({
if let Some(link) = parse_any_link(s) {
match link {
-
Link::AtUri(at_uri) => at_uri.strip_prefix("at://").map(|noproto| {
-
format!("https://atproto-browser-plus-links.vercel.app/at/{noproto}")
-
}),
-
Link::Did(did) => Some(format!(
-
"https://atproto-browser-plus-links.vercel.app/at/{did}"
-
)),
+
Link::AtUri(at_uri) => at_uri
+
.strip_prefix("at://")
+
.map(|noproto| format!("https://pdsls.dev/at://{noproto}")),
+
Link::Did(did) => Some(format!("https://pdsls.dev/at://{did}")),
Link::Uri(uri) => Some(uri),
}
} else {
···
pub fn human_number(n: &u64) -> askama::Result<String> {
Ok(n.to_formatted_string(&Locale::en))
}
+
+
pub fn to_u64(n: usize) -> askama::Result<u64> {
+
Ok(n as u64)
+
}
+3 -1
constellation/src/server/mod.rs
···
};
let path = format!(".{path}");
+
let path_to_other = format!(".{}", query.path_to_other);
+
let paged = store
.get_many_to_many_counts(
&query.subject,
collection,
&path,
-
&query.path_to_other,
+
&path_to_other,
limit,
cursor_key,
&filter_dids,
+7 -1
constellation/src/storage/rocks_store.rs
···
let mut maybe_done = false;
+
let mut write_fast = rocksdb::WriteOptions::default();
+
write_fast.set_sync(false);
+
write_fast.disable_wal(true);
+
while !stay_alive.is_cancelled() && !maybe_done {
// let mut batch = WriteBatch::default();
···
let target_id: TargetId = _vr(iter.value().unwrap())?;
self.db
-
.put_cf(&cf, target_id.id().to_be_bytes(), _rv(&target))?;
+
.put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?;
any_written = true;
iter.next();
}
···
let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?;
let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
+
eprintln!("nothin doin for this target, {target_key:?}");
return Ok(Default::default());
};
···
.take(1)
.next()
else {
+
eprintln!("no forward match");
continue;
};
+1 -1
constellation/templates/dids.html.j2
···
{% for did in linking_dids %}
<pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ did.0 }}
-> see <a href="/links/all?target={{ did.0|urlencode }}">links to this DID</a>
-
-> browse <a href="https://atproto-browser-plus-links.vercel.app/at/{{ did.0|urlencode }}">this DID record</a></pre>
+
-> browse <a href="https://pdsls.dev/at://{{ did.0|urlencode }}">this DID record</a></pre>
{% endfor %}
{% if let Some(c) = cursor %}
+1 -1
constellation/templates/get-backlinks.html.j2
···
{% extends "base.html.j2" %}
{% import "try-it-macros.html.j2" as try_it %}
-
{% block title %}Links{% endblock %}
+
{% block title %}Backlinks{% endblock %}
{% block description %}All {{ query.source }} records with links to {{ query.subject }}{% endblock %}
{% block content %}
+67
constellation/templates/get-many-to-many-counts.html.j2
···
+
{% extends "base.html.j2" %}
+
{% import "try-it-macros.html.j2" as try_it %}
+
+
{% block title %}Many to Many counts{% endblock %}
+
{% block description %}Counts of many-to-many {{ query.source }} join records with links to {{ query.subject }} and a secondary target at {{ query.path_to_other }}{% endblock %}
+
+
{% block content %}
+
+
{% call try_it::get_many_to_many_counts(
+
query.subject,
+
query.source,
+
query.path_to_other,
+
query.did,
+
query.other_subject,
+
query.limit,
+
) %}
+
+
<h2>
+
Many-to-many links to <code>{{ query.subject }}</code> joining through <code>{{ query.path_to_other }}</code>
+
{% if let Some(browseable_uri) = query.subject|to_browseable %}
+
<small style="font-weight: normal; font-size: 1rem"><a href="{{ browseable_uri }}">browse record</a></small>
+
{% endif %}
+
</h2>
+
+
<p><strong>{% if cursor.is_some() || query.cursor.is_some() %}more than {% endif %}{{ counts_by_other_subject.len()|to_u64|human_number }} joins</strong> <code>{{ query.source }}โ†’{{ query.path_to_other }}</code></p>
+
+
<ul>
+
<li>See direct backlinks at <code>/xrpc/blue.microcosm.links.getBacklinks</code>: <a href="/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject|urlencode }}&source={{ query.source|urlencode }}">/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject }}&source={{ query.source }}</a></li>
+
<li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li>
+
</ul>
+
+
<h3>Counts by other subject:</h3>
+
+
{% for counts in counts_by_other_subject %}
+
<pre style="display: block; margin: 1em 2em" class="code"><strong>Joined subject</strong>: {{ counts.subject }}
+
<strong>Joining records</strong>: {{ counts.total }}
+
<strong>Unique joiner ids</strong>: {{ counts.distinct }}
+
-> {% if let Some(browseable_uri) = counts.subject|to_browseable -%}
+
<a href="{{ browseable_uri }}">browse record</a>
+
{%- endif %}</pre>
+
{% endfor %}
+
+
{% if let Some(c) = cursor %}
+
<form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts">
+
<input type="hidden" name="subject" value="{{ query.subject }}" />
+
<input type="hidden" name="source" value="{{ query.source }}" />
+
<input type="hidden" name="pathToOther" value="{{ query.path_to_other }}" />
+
{% for did in query.did %}
+
<input type="hidden" name="did" value="{{ did }}" />
+
{% endfor %}
+
{% for otherSubject in query.other_subject %}
+
<input type="hidden" name="otherSubject" value="{{ otherSubject }}" />
+
{% endfor %}
+
<input type="hidden" name="limit" value="{{ query.limit }}" />
+
<input type="hidden" name="cursor" value={{ c|json|safe }} />
+
<button type="submit">next page&hellip;</button>
+
</form>
+
{% else %}
+
<button disabled><em>end of results</em></button>
+
{% endif %}
+
+
<details>
+
<summary>Raw JSON response</summary>
+
<pre class="code">{{ self|tojson }}</pre>
+
</details>
+
+
{% endblock %}
+26
constellation/templates/hello.html.j2
···
{% call try_it::get_backlinks("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", [""], 16) %}
+
<h3 class="route"><code>GET /xrpc/blue.microcosm.links.getManyToManyCounts</code></h3>
+
+
<p>TODO: description</p>
+
+
<h4>Query parameters:</h4>
+
+
<ul>
+
<li><p><code>subject</code>: required, must url-encode. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
+
<li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li>
+
<li><p><code>pathToOther</code>: required. Path to the secondary link in the many-to-many record. Example: <code>otherThing.uri</code></p></li>
+
<li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li>
+
<li><p><code>otherSubject</code>: optional, filter secondary links to specific subjects. Include multiple times to filter by multiple users. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
+
<li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li>
+
</ul>
+
+
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
+
{% call try_it::get_many_to_many_counts(
+
"at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/good-first-issue",
+
"sh.tangled.label.op:add[].key",
+
"subject",
+
[""],
+
[""],
+
25,
+
) %}
+
+
<h3 class="route"><code>GET /links</code></h3>
<p>A list of records linking to a target.</p>
+43 -1
constellation/templates/try-it-macros.html.j2
···
{% macro get_backlinks(subject, source, dids, limit) %}
<form method="get" action="/xrpc/blue.microcosm.links.getBacklinks">
-
<pre class="code"><strong>GET</strong> /links
+
<pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getBacklinks
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
{%- for did in dids %}{% if !did.is_empty() %}
···
p.insertBefore(document.createTextNode('&did= '), didPlaceholder);
p.insertBefore(i, didPlaceholder);
p.insertBefore(document.createTextNode('\n '), didPlaceholder);
+
});
+
</script>
+
{% endmacro %}
+
+
{% macro get_many_to_many_counts(subject, source, pathToOther, dids, otherSubjects, limit) %}
+
<form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts">
+
<pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getManyToManyCounts
+
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
+
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
+
&pathToOther= <input type="text" name="pathToOther" value="{{ pathToOther }}" placeholder="otherThing.uri" />
+
{%- for did in dids %}{% if !did.is_empty() %}
+
&did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %}
+
<span id="m2m-subject-placeholder"></span> <button id="m2m-add-subject">+ other subject filter</button>
+
{%- for otherSubject in otherSubjects %}{% if !otherSubject.is_empty() %}
+
&otherSubject= <input type="text" name="did" value="{{ otherSubject }}" placeholder="at-uri, did, uri..." />{% endif %}{% endfor %}
+
<span id="m2m-did-placeholder"></span> <button id="m2m-add-did">+ did filter</button>
+
&limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get links</button></pre>
+
</form>
+
<script>
+
const m2mAddDidButton = document.getElementById('m2m-add-did');
+
const m2mDidPlaceholder = document.getElementById('m2m-did-placeholder');
+
m2mAddDidButton.addEventListener('click', e => {
+
e.preventDefault();
+
const i = document.createElement('input');
+
i.placeholder = 'did:plc:...';
+
i.name = "did"
+
const p = m2mAddDidButton.parentNode;
+
p.insertBefore(document.createTextNode('&did= '), m2mDidPlaceholder);
+
p.insertBefore(i, m2mDidPlaceholder);
+
p.insertBefore(document.createTextNode('\n '), m2mDidPlaceholder);
+
});
+
const m2mAddSubjectButton = document.getElementById('m2m-add-subject');
+
const m2mSubjectPlaceholder = document.getElementById('m2m-subject-placeholder');
+
m2mAddSubjectButton.addEventListener('click', e => {
+
e.preventDefault();
+
const i = document.createElement('input');
+
i.placeholder = 'at-uri, did, uri...';
+
i.name = "otherSubject"
+
const p = m2mAddSubjectButton.parentNode;
+
p.insertBefore(document.createTextNode('&otherSubject= '), m2mSubjectPlaceholder);
+
p.insertBefore(i, m2mSubjectPlaceholder);
+
p.insertBefore(document.createTextNode('\n '), m2mSubjectPlaceholder);
});
</script>
{% endmacro %}
+4 -2
slingshot/src/firehose_cache.rs
···
pub async fn firehose_cache(
cache_dir: impl AsRef<Path>,
+
memory_mb: usize,
+
disk_gb: usize,
) -> Result<HybridCache<String, CachedRecord>, String> {
let cache = HybridCacheBuilder::new()
.with_name("firehose")
-
.memory(64 * 2_usize.pow(20))
+
.memory(memory_mb * 2_usize.pow(20))
.with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v))
.storage(Engine::large())
.with_device_options(
DirectFsDeviceOptions::new(cache_dir)
-
.with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something)
+
.with_capacity(disk_gb * 2_usize.pow(30))
.with_file_size(16 * 2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records
)
.build()
+26 -5
slingshot/src/main.rs
···
/// where to keep disk caches
#[arg(long)]
cache_dir: PathBuf,
+
/// memory cache size in MB
+
#[arg(long, default_value_t = 64)]
+
cache_memory_mb: usize,
+
/// disk cache size in GB
+
#[arg(long, default_value_t = 1)]
+
cache_disk_gb: usize,
+
/// host for HTTP server (when not using --domain)
+
#[arg(long, default_value = "127.0.0.1")]
+
host: String,
+
/// port for HTTP server (when not using --domain)
+
#[arg(long, default_value_t = 3000)]
+
port: u16,
+
/// port for metrics/prometheus server
+
#[arg(long, default_value_t = 8765)]
+
metrics_port: u16,
/// the domain pointing to this server
///
/// if present:
···
let args = Args::parse();
-
if let Err(e) = install_metrics_server() {
+
if let Err(e) = install_metrics_server(args.metrics_port) {
log::error!("failed to install metrics server: {e:?}");
} else {
-
log::info!("metrics listening at http://0.0.0.0:8765");
+
log::info!("metrics listening at http://0.0.0.0:{}", args.metrics_port);
}
std::fs::create_dir_all(&args.cache_dir).map_err(|e| {
···
log::info!("cache dir ready at at {cache_dir:?}.");
log::info!("setting up firehose cache...");
-
let cache = firehose_cache(cache_dir.join("./firehose")).await?;
+
let cache = firehose_cache(
+
cache_dir.join("./firehose"),
+
args.cache_memory_mb,
+
args.cache_disk_gb,
+
)
+
.await?;
log::info!("firehose cache ready.");
let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new();
···
args.domain,
args.acme_contact,
args.certs,
+
args.host,
+
args.port,
server_shutdown,
)
.await?;
···
Ok(())
}
-
fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> {
+
fn install_metrics_server(port: u16) -> Result<(), metrics_exporter_prometheus::BuildError> {
log::info!("installing metrics server...");
let host = [0, 0, 0, 0];
-
let port = 8765;
PrometheusBuilder::new()
.set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
.set_bucket_duration(std::time::Duration::from_secs(300))?
+19 -12
slingshot/src/server.rs
···
Ok(did) => did,
Err(_) => {
let Ok(alleged_handle) = Handle::new(identifier) else {
-
return invalid("identifier was not a valid DID or handle");
+
return invalid("Identifier was not a valid DID or handle");
};
match self.identity.handle_to_did(alleged_handle.clone()).await {
···
Err(e) => {
log::debug!("failed to resolve handle: {e}");
// TODO: ServerError not BadRequest
-
return invalid("errored while trying to resolve handle to DID");
+
return invalid("Errored while trying to resolve handle to DID");
}
}
}
};
let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else {
-
return invalid("failed to get DID doc");
+
return invalid("Failed to get DID doc");
};
let Some(partial_doc) = partial_doc else {
-
return invalid("failed to find DID doc");
+
return invalid("Failed to find DID doc");
};
// ok so here's where we're at:
···
.handle_to_did(partial_doc.unverified_handle.clone())
.await
else {
-
return invalid("failed to get did doc's handle");
+
return invalid("Failed to get DID doc's handle");
};
let Some(handle_did) = handle_did else {
-
return invalid("failed to resolve did doc's handle");
+
return invalid("Failed to resolve DID doc's handle");
};
if handle_did == did {
partial_doc.unverified_handle.to_string()
···
let Ok(handle) = Handle::new(repo) else {
return GetRecordResponse::BadRequest(xrpc_error(
"InvalidRequest",
-
"repo was not a valid DID or handle",
+
"Repo was not a valid DID or handle",
));
};
match self.identity.handle_to_did(handle).await {
···
log::debug!("handle resolution failed: {e}");
return GetRecordResponse::ServerError(xrpc_error(
"ResolutionFailed",
-
"errored while trying to resolve handle to DID",
+
"Errored while trying to resolve handle to DID",
));
}
}
···
let Ok(collection) = Nsid::new(collection) else {
return GetRecordResponse::BadRequest(xrpc_error(
"InvalidRequest",
-
"invalid NSID for collection",
+
"Invalid NSID for collection",
));
};
let Ok(rkey) = RecordKey::new(rkey) else {
-
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid rkey"));
+
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey"));
};
let cid: Option<Cid> = if let Some(cid) = cid {
let Ok(cid) = Cid::from_str(&cid) else {
-
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid CID"));
+
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID"));
};
Some(cid)
} else {
···
domain: Option<String>,
acme_contact: Option<String>,
certs: Option<PathBuf>,
+
host: String,
+
port: u16,
shutdown: CancellationToken,
) -> Result<(), ServerError> {
let repo = Arc::new(repo);
···
)
.await
} else {
-
run(TcpListener::bind("127.0.0.1:3000"), app, shutdown).await
+
run(
+
TcpListener::bind(format!("{host}:{port}")),
+
app,
+
shutdown,
+
)
+
.await
}
}