Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

slingshot fmt

+ci

Changed files
+36 -43
.github
workflows
slingshot
+1 -1
.github/workflows/checks.yml
···
- name: get nightly toolchain for jetstream fmt
run: rustup toolchain install nightly --allow-downgrade -c rustfmt
- name: fmt
-
run: cargo fmt --package links --package constellation --package ufos --package spacedust --package who-am-i -- --check
- name: fmt jetstream (nightly)
run: cargo +nightly fmt --package jetstream -- --check
- name: clippy
···
- name: get nightly toolchain for jetstream fmt
run: rustup toolchain install nightly --allow-downgrade -c rustfmt
- name: fmt
+
run: cargo fmt --package links --package constellation --package ufos --package spacedust --package who-am-i --package slingshot -- --check
- name: fmt jetstream (nightly)
run: cargo +nightly fmt --package jetstream -- --check
- name: clippy
+1 -1
Makefile
···
cargo test --all-features
fmt:
-
cargo fmt --package links --package constellation --package ufos --package spacedust --package who-am-i
cargo +nightly fmt --package jetstream
clippy:
···
cargo test --all-features
fmt:
+
cargo fmt --package links --package constellation --package ufos --package spacedust --package who-am-i --package slingshot
cargo +nightly fmt --package jetstream
clippy:
+1 -1
slingshot/src/consumer.rs
···
use crate::CachedRecord;
-
use foyer::HybridCache;
use crate::error::ConsumerError;
use jetstream::{
DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
events::{CommitOp, Cursor, EventKind},
···
use crate::CachedRecord;
use crate::error::ConsumerError;
+
use foyer::HybridCache;
use jetstream::{
DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
events::{CommitOp, Cursor, EventKind},
+5 -4
slingshot/src/firehose_cache.rs
···
use std::path::Path;
-
use crate::CachedRecord;
-
use foyer::{HybridCache, DirectFsDeviceOptions, Engine, HybridCacheBuilder};
-
-
pub async fn firehose_cache(dir: impl AsRef<Path>) -> Result<HybridCache<String, CachedRecord>, String> {
let cache = HybridCacheBuilder::new()
.with_name("firehose")
.memory(64 * 2_usize.pow(20))
···
+
use crate::CachedRecord;
+
use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder};
use std::path::Path;
+
pub async fn firehose_cache(
+
dir: impl AsRef<Path>,
+
) -> Result<HybridCache<String, CachedRecord>, String> {
let cache = HybridCacheBuilder::new()
.with_name("firehose")
.memory(64 * 2_usize.pow(20))
-3
slingshot/src/main.rs
···
use clap::Parser;
use tokio_util::sync::CancellationToken;
-
/// Slingshot record edge cache
#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
···
Ok(())
});
-
tokio::select! {
_ = shutdown.cancelled() => log::warn!("shutdown requested"),
Some(r) = tasks.join_next() => {
···
);
Ok(())
}
-
···
use clap::Parser;
use tokio_util::sync::CancellationToken;
/// Slingshot record edge cache
#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
···
Ok(())
});
tokio::select! {
_ = shutdown.cancelled() => log::warn!("shutdown requested"),
Some(r) = tasks.join_next() => {
···
);
Ok(())
}
+4 -3
slingshot/src/record.rs
···
-
use serde_json::value::RawValue;
-
use serde::{Serialize, Deserialize};
use jetstream::exports::Cid;
#[derive(Debug, Serialize, Deserialize)]
pub struct RawRecord {
···
fn from(RawRecord { cid, record }: &RawRecord) -> Self {
(
cid.clone(),
-
RawValue::from_string(record.to_string()).expect("stored string from RawValue to be valid"),
)
}
}
···
use jetstream::exports::Cid;
+
use serde::{Deserialize, Serialize};
+
use serde_json::value::RawValue;
#[derive(Debug, Serialize, Deserialize)]
pub struct RawRecord {
···
fn from(RawRecord { cid, record }: &RawRecord) -> Self {
(
cid.clone(),
+
RawValue::from_string(record.to_string())
+
.expect("stored string from RawValue to be valid"),
)
}
}
+24 -30
slingshot/src/server.rs
···
use foyer::HybridCache;
-
use crate::{error::ServerError, CachedRecord};
use tokio_util::sync::CancellationToken;
-
use poem::{listener::TcpListener, Route, Server};
use poem_openapi::{
-
payload::Json,
-
param::Query,
-
OpenApi, OpenApiService,
-
ApiResponse,
-
Object,
-
types::Example,
};
fn example_did() -> String {
···
}
}
-
fn bad_request_handler(err: poem::Error) -> GetRecordResponse {
GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
error: "InvalidRequest".to_string(),
···
impl Example for FoundRecordResponseObject {
fn example() -> Self {
Self {
-
uri: format!("at://{}/{}/{}", example_did(), example_collection(), example_rkey()),
cid: Some("bafyreialv3mzvvxaoyrfrwoer3xmabbmdchvrbyhayd7bga47qjbycy74e".to_string()),
value: serde_json::json!({
"$type": "app.bsky.feed.like",
···
cid: Query<Option<String>>,
) -> GetRecordResponse {
// TODO: yeah yeah
-
let at_uri = format!(
-
"at://{}/{}/{}",
-
&*repo, &*collection, &*rkey
-
);
-
let entry = self.cache
-
.fetch(at_uri.clone(), || async move {
-
todo!()
-
})
.await
.unwrap();
···
if cid.clone().map(|c| c != found_cid).unwrap_or(false) {
return GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
error: "RecordNotFound".to_string(),
-
message: "A record was found but its CID did not match that requested".to_string(),
}));
}
// TODO: thank u stellz: https://gist.github.com/stella3d/51e679e55b264adff89d00a1e58d0272
-
let value = serde_json::from_str(raw_value.get()).expect("RawValue to be valid json");
GetRecordResponse::Ok(Json(FoundRecordResponseObject {
uri: at_uri,
cid: Some(found_cid),
value,
}))
-
},
-
CachedRecord::Deleted => {
-
GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
-
error: "RecordNotFound".to_string(),
-
message: "This record was deleted".to_string(),
-
}))
}
}
}
}
···
cache: HybridCache<String, CachedRecord>,
_shutdown: CancellationToken,
) -> Result<(), ServerError> {
-
let api_service =
-
OpenApiService::new(Xrpc { cache }, "Slingshot", env!("CARGO_PKG_VERSION"))
-
.server("http://localhost:3000")
-
.url_prefix("/xrpc");
let app = Route::new()
.nest("/", api_service.scalar())
···
+
use crate::{CachedRecord, error::ServerError};
use foyer::HybridCache;
use tokio_util::sync::CancellationToken;
+
use poem::{Route, Server, listener::TcpListener};
use poem_openapi::{
+
ApiResponse, Object, OpenApi, OpenApiService, param::Query, payload::Json, types::Example,
};
fn example_did() -> String {
···
}
}
fn bad_request_handler(err: poem::Error) -> GetRecordResponse {
GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
error: "InvalidRequest".to_string(),
···
impl Example for FoundRecordResponseObject {
fn example() -> Self {
Self {
+
uri: format!(
+
"at://{}/{}/{}",
+
example_did(),
+
example_collection(),
+
example_rkey()
+
),
cid: Some("bafyreialv3mzvvxaoyrfrwoer3xmabbmdchvrbyhayd7bga47qjbycy74e".to_string()),
value: serde_json::json!({
"$type": "app.bsky.feed.like",
···
cid: Query<Option<String>>,
) -> GetRecordResponse {
// TODO: yeah yeah
+
let at_uri = format!("at://{}/{}/{}", &*repo, &*collection, &*rkey);
+
let entry = self
+
.cache
+
.fetch(at_uri.clone(), || async move { todo!() })
.await
.unwrap();
···
if cid.clone().map(|c| c != found_cid).unwrap_or(false) {
return GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
error: "RecordNotFound".to_string(),
+
message: "A record was found but its CID did not match that requested"
+
.to_string(),
}));
}
// TODO: thank u stellz: https://gist.github.com/stella3d/51e679e55b264adff89d00a1e58d0272
+
let value =
+
serde_json::from_str(raw_value.get()).expect("RawValue to be valid json");
GetRecordResponse::Ok(Json(FoundRecordResponseObject {
uri: at_uri,
cid: Some(found_cid),
value,
}))
}
+
CachedRecord::Deleted => GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
+
error: "RecordNotFound".to_string(),
+
message: "This record was deleted".to_string(),
+
})),
}
}
}
···
cache: HybridCache<String, CachedRecord>,
_shutdown: CancellationToken,
) -> Result<(), ServerError> {
+
let api_service = OpenApiService::new(Xrpc { cache }, "Slingshot", env!("CARGO_PKG_VERSION"))
+
.server("http://localhost:3000")
+
.url_prefix("/xrpc");
let app = Route::new()
.nest("/", api_service.scalar())