Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

sqlite get/put pretty much works

mutex-wrapping the storage accidentally serializes all access which oops but also whatever

Changed files
+160 -26
pocket
+1
pocket/.gitignore
···
+
prefs.sqlite3*
+2
pocket/src/lib.rs
···
mod server;
+
mod storage;
mod token;
pub use server::serve;
+
pub use storage::Storage;
pub use token::TokenVerifier;
+29 -3
pocket/src/main.rs
···
-
use pocket::serve;
+
use clap::Parser;
+
use pocket::{Storage, serve};
+
use std::path::PathBuf;
+
+
/// Slingshot record edge cache
+
#[derive(Parser, Debug, Clone)]
+
#[command(version, about, long_about = None)]
+
struct Args {
+
/// path to the sqlite db file
+
#[arg(long)]
+
db: Option<PathBuf>,
+
/// just initialize the db and exit
+
#[arg(long, action)]
+
init_db: bool,
+
/// the domain for serving a did doc (unused if running behind reflector)
+
#[arg(long)]
+
domain: Option<String>,
+
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
-
println!("Hello, world!");
-
serve("mac.cinnebar-tet.ts.net").await
+
log::info!("👖 hi");
+
let args = Args::parse();
+
let domain = args.domain.unwrap_or("bad-example.com".into());
+
let db_path = args.db.unwrap_or("prefs.sqlite3".into());
+
if args.init_db {
+
Storage::init(&db_path).unwrap();
+
log::info!("👖 initialized db at {db_path:?}. bye")
+
} else {
+
let storage = Storage::connect(db_path).unwrap();
+
serve(&domain, storage).await
+
}
}
+78 -23
pocket/src/server.rs
···
-
use crate::TokenVerifier;
+
use crate::{Storage, TokenVerifier};
use poem::{
Endpoint, EndpointExt, Route, Server,
endpoint::{StaticFileEndpoint, make_sync},
http::Method,
listener::TcpListener,
-
middleware::{CatchPanic, Cors, SizeLimit, Tracing},
+
middleware::{CatchPanic, Cors, Tracing},
};
use poem_openapi::{
ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService,
···
};
use serde::Serialize;
use serde_json::{Value, json};
+
use std::sync::{Arc, Mutex};
#[derive(Debug, SecurityScheme)]
#[oai(ty = "bearer")]
···
})
}
-
#[derive(Object)]
+
#[derive(Debug, Object)]
#[oai(example = true)]
-
struct GetBskyPrefsResponseObject {
+
struct BskyPrefsObject {
/// at-uri for this record
preferences: Value,
}
-
impl Example for GetBskyPrefsResponseObject {
+
impl Example for BskyPrefsObject {
fn example() -> Self {
Self {
preferences: json!({
···
enum GetBskyPrefsResponse {
/// Record found
#[oai(status = 200)]
-
Ok(Json<GetBskyPrefsResponseObject>),
+
Ok(Json<BskyPrefsObject>),
/// Bad request or no preferences to return
#[oai(status = 400)]
BadRequest(XrpcError),
···
struct Xrpc {
verifier: TokenVerifier,
+
storage: Arc<Mutex<Storage>>,
}
#[OpenApi]
···
Err(e) => return GetBskyPrefsResponse::BadRequest(xrpc_error("boooo", e.to_string())),
};
log::info!("verified did: {did}/{aud}");
-
// TODO: fetch from storage
-
GetBskyPrefsResponse::Ok(Json(GetBskyPrefsResponseObject::example()))
+
+
let storage = self.storage.clone();
+
+
let Ok(Ok(res)) = tokio::task::spawn_blocking(move || {
+
storage
+
.lock()
+
.unwrap()
+
.get(&did, &aud)
+
.inspect_err(|e| log::error!("failed to get prefs: {e}"))
+
})
+
.await
+
else {
+
return GetBskyPrefsResponse::BadRequest(xrpc_error("boooo", "failed to get from db"));
+
};
+
+
let Some(serialized) = res else {
+
return GetBskyPrefsResponse::BadRequest(xrpc_error(
+
"NotFound",
+
"could not find prefs for u",
+
));
+
};
+
+
let preferences = match serde_json::from_str(&serialized) {
+
Ok(v) => v,
+
Err(e) => {
+
log::error!("failed to deserialize prefs: {e}");
+
return GetBskyPrefsResponse::BadRequest(xrpc_error(
+
"boooo",
+
"failed to deserialize prefs",
+
));
+
}
+
};
+
+
GetBskyPrefsResponse::Ok(Json(BskyPrefsObject { preferences }))
}
/// com.bad-example.pocket.putPreferences
···
async fn pocket_put_prefs(
&self,
XrpcAuth(auth): XrpcAuth,
-
Json(prefs): Json<Value>,
+
Json(prefs): Json<BskyPrefsObject>,
) -> PutBskyPrefsResponse {
let (did, aud) = match self
.verifier
···
};
log::info!("verified did: {did}/{aud}");
log::warn!("received prefs: {prefs:?}");
-
// TODO: put prefs into storage
-
PutBskyPrefsResponse::Ok(PlainText("hiiiiii".to_string()))
+
+
let storage = self.storage.clone();
+
let serialized = prefs.preferences.to_string();
+
+
let Ok(Ok(())) = tokio::task::spawn_blocking(move || {
+
storage
+
.lock()
+
.unwrap()
+
.put(&did, &aud, &serialized)
+
.inspect_err(|e| log::error!("failed to insert prefs: {e}"))
+
})
+
.await
+
else {
+
return PutBskyPrefsResponse::BadRequest(xrpc_error("boooo", "failed to put to db"));
+
};
+
+
PutBskyPrefsResponse::Ok(PlainText("saved.".to_string()))
}
}
···
make_sync(move |_| doc.clone())
}
-
pub async fn serve(domain: &str) -> () {
+
pub async fn serve(domain: &str, storage: Storage) -> () {
let verifier = TokenVerifier::default();
-
let api_service = OpenApiService::new(Xrpc { verifier }, "Pocket", env!("CARGO_PKG_VERSION"))
-
.server(domain)
-
.url_prefix("/xrpc")
-
.contact(
-
ContactObject::new()
-
.name("@microcosm.blue")
-
.url("https://bsky.app/profile/microcosm.blue"),
-
)
-
.description(include_str!("../api-description.md"))
-
.external_document(ExternalDocumentObject::new("https://microcosm.blue/pocket"));
+
let api_service = OpenApiService::new(
+
Xrpc {
+
verifier,
+
storage: Arc::new(Mutex::new(storage)),
+
},
+
"Pocket",
+
env!("CARGO_PKG_VERSION"),
+
)
+
.server(domain)
+
.url_prefix("/xrpc")
+
.contact(
+
ContactObject::new()
+
.name("@microcosm.blue")
+
.url("https://bsky.app/profile/microcosm.blue"),
+
)
+
.description(include_str!("../api-description.md"))
+
.external_document(ExternalDocumentObject::new("https://microcosm.blue/pocket"));
let app = Route::new()
.nest("/openapi", api_service.spec_endpoint())
.nest("/xrpc/", api_service)
.at("/.well-known/did.json", get_did_doc(domain))
.at("/", StaticFileEndpoint::new("./static/index.html"))
-
.with(SizeLimit::new(100 * 2_usize.pow(10)))
.with(
Cors::new()
.allow_method(Method::GET)
+50
pocket/src/storage.rs
···
+
use rusqlite::{Connection, OptionalExtension, Result};
+
use std::path::Path;
+
+
pub struct Storage {
+
con: Connection,
+
}
+
+
impl Storage {
+
pub fn connect(path: impl AsRef<Path>) -> Result<Self> {
+
let con = Connection::open(path)?;
+
con.pragma_update(None, "journal_mode", "WAL")?;
+
con.pragma_update(None, "synchronous", "NORMAL")?;
+
con.pragma_update(None, "busy_timeout", "100")?;
+
con.pragma_update(None, "foreign_keys", "ON")?;
+
Ok(Self { con })
+
}
+
pub fn init(path: impl AsRef<Path>) -> Result<Self> {
+
let me = Self::connect(path)?;
+
me.con.execute(
+
r#"
+
create table prefs (
+
actor text not null,
+
aud text not null,
+
pref text not null,
+
primary key (actor, aud)
+
) strict"#,
+
(),
+
)?;
+
Ok(me)
+
}
+
pub fn put(&self, actor: &str, aud: &str, pref: &str) -> Result<()> {
+
self.con.execute(
+
r#"insert into prefs (actor, aud, pref)
+
values (?1, ?2, ?3)
+
on conflict do update set pref = excluded.pref"#,
+
[actor, aud, pref],
+
)?;
+
Ok(())
+
}
+
pub fn get(&self, actor: &str, aud: &str) -> Result<Option<String>> {
+
self.con
+
.query_one(
+
r#"select pref from prefs
+
where actor = ?1 and aud = ?2"#,
+
[actor, aud],
+
|row| row.get(0),
+
)
+
.optional()
+
}
+
}