A better Rust ATProto crate

further cleanup

Orual 278eb8f2 3a7692c4

Changed files
+117 -397
crates
jacquard
jacquard-common
jacquard-oauth
-15
crates/jacquard-common/src/session.rs
···
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::RwLock;
-
use url::Url;
-
-
use crate::AuthorizationToken;
-
use crate::types::did::Did;
-
-
#[async_trait::async_trait]
-
pub trait Session {
-
async fn did(&self) -> Did<'_>;
-
-
async fn endpoint(&self) -> Url;
-
-
async fn access_token(&self) -> Result<AuthorizationToken, SessionStoreError>;
-
-
async fn refresh(&self) -> Result<AuthorizationToken, SessionStoreError>;
-
}
/// Errors emitted by session stores.
#[derive(Debug, thiserror::Error, Diagnostic)]
+71 -2
crates/jacquard-oauth/src/authstore.rs
···
use std::sync::Arc;
+
use dashmap::DashMap;
use jacquard_common::{
IntoStatic,
-
session::{FileTokenStore, SessionStore, SessionStoreError},
+
session::{SessionStore, SessionStoreError},
types::did::Did,
};
-
use smol_str::SmolStr;
+
use smol_str::{SmolStr, ToSmolStr, format_smolstr};
use crate::session::{AuthRequestData, ClientSessionData};
···
) -> Result<(), SessionStoreError>;
async fn delete_auth_req_info(&self, state: &str) -> Result<(), SessionStoreError>;
+
}
+
+
pub struct MemoryAuthStore {
+
sessions: DashMap<SmolStr, ClientSessionData<'static>>,
+
auth_reqs: DashMap<SmolStr, AuthRequestData<'static>>,
+
}
+
+
impl MemoryAuthStore {
+
pub fn new() -> Self {
+
Self {
+
sessions: DashMap::new(),
+
auth_reqs: DashMap::new(),
+
}
+
}
+
}
+
+
#[async_trait::async_trait]
+
impl ClientAuthStore for MemoryAuthStore {
+
async fn get_session(
+
&self,
+
did: &Did<'_>,
+
session_id: &str,
+
) -> Result<Option<ClientSessionData<'_>>, SessionStoreError> {
+
let key = format_smolstr!("{}_{}", did, session_id);
+
Ok(self.sessions.get(&key).map(|v| v.clone()))
+
}
+
+
async fn upsert_session(
+
&self,
+
session: ClientSessionData<'_>,
+
) -> Result<(), SessionStoreError> {
+
let key = format_smolstr!("{}_{}", session.account_did, session.session_id);
+
self.sessions.insert(key, session.into_static());
+
Ok(())
+
}
+
+
async fn delete_session(
+
&self,
+
did: &Did<'_>,
+
session_id: &str,
+
) -> Result<(), SessionStoreError> {
+
let key = format_smolstr!("{}_{}", did, session_id);
+
self.sessions.remove(&key);
+
Ok(())
+
}
+
+
async fn get_auth_req_info(
+
&self,
+
state: &str,
+
) -> Result<Option<AuthRequestData<'_>>, SessionStoreError> {
+
Ok(self.auth_reqs.get(state).map(|v| v.clone()))
+
}
+
+
async fn save_auth_req_info(
+
&self,
+
auth_req_info: &AuthRequestData<'_>,
+
) -> Result<(), SessionStoreError> {
+
self.auth_reqs.insert(
+
auth_req_info.state.clone().to_smolstr(),
+
auth_req_info.clone().into_static(),
+
);
+
Ok(())
+
}
+
+
async fn delete_auth_req_info(&self, state: &str) -> Result<(), SessionStoreError> {
+
self.auth_reqs.remove(state);
+
Ok(())
+
}
}
#[async_trait::async_trait]
+8
crates/jacquard-oauth/src/client.rs
···
xrpc::{CallOptions, Response, XrpcClient, XrpcExt, XrpcRequest},
},
};
+
use jacquard_identity::JacquardResolver;
use jose_jwk::JwkSet;
use std::sync::Arc;
use tokio::sync::RwLock;
···
{
pub registry: Arc<SessionRegistry<T, S>>,
pub client: Arc<T>,
+
}
+
+
impl<S: ClientAuthStore> OAuthClient<JacquardResolver, S> {
+
pub fn new(store: S, client_data: ClientData<'static>) -> Self {
+
let client = JacquardResolver::default();
+
Self::new_from_resolver(store, client, client_data)
+
}
}
impl<T, S> OAuthClient<T, S>
+3
crates/jacquard-oauth/src/resolver.rs
···
Err(ResolverError::HttpStatus(res.status()))
}
}
+
+
#[async_trait::async_trait]
+
impl OAuthResolver for jacquard_identity::JacquardResolver {}
+3 -60
crates/jacquard/src/client.rs
···
//! This module provides HTTP and XRPC client traits along with an authenticated
//! client implementation that manages session tokens.
-
mod at_client;
pub mod credential_session;
-
mod token;
-
-
pub use at_client::{AtClient, SendOverrides};
+
pub mod token;
pub use jacquard_common::error::{ClientError, XrpcResult};
pub use jacquard_common::session::{MemorySessionStore, SessionStore, SessionStoreError};
use jacquard_common::{
CowStr, IntoStatic,
-
types::{
-
string::{Did, Handle},
-
xrpc::{Response, XrpcRequest},
-
},
+
types::string::{Did, Handle},
};
pub use token::FileAuthStore;
-
use url::Url;
pub(crate) const NSID_REFRESH_SESSION: &str = "com.atproto.server.refreshSession";
/// Basic client wrapper: reqwest transport + in-memory session store.
-
pub struct BasicClient(AtClient<reqwest::Client, MemorySessionStore<Did<'static>, AuthSession>>);
-
-
impl BasicClient {
-
/// Construct a basic client with minimal inputs.
-
pub fn new(base: Url) -> Self {
-
Self(AtClient::new(
-
reqwest::Client::new(),
-
base,
-
MemorySessionStore::default(),
-
))
-
}
-
-
/// Access the inner stateful client.
-
pub fn inner(
-
&self,
-
) -> &AtClient<reqwest::Client, MemorySessionStore<Did<'static>, AuthSession>> {
-
&self.0
-
}
-
-
/// Send an XRPC request.
-
pub async fn send<R: XrpcRequest + Send>(&self, req: R) -> XrpcResult<Response<R>> {
-
self.0.send(req).await
-
}
-
-
/// Send with per-call overrides.
-
pub async fn send_with<R: XrpcRequest + Send>(
-
&self,
-
req: R,
-
overrides: SendOverrides<'_>,
-
) -> XrpcResult<Response<R>> {
-
self.0.send_with(req, overrides).await
-
}
-
-
/// Get current session.
-
pub async fn session(&self, did: &Did<'static>) -> Option<AuthSession> {
-
self.0.session(did).await
-
}
-
-
/// Set the session.
-
pub async fn set_session(
-
&self,
-
session: AuthSession,
-
) -> core::result::Result<(), SessionStoreError> {
-
self.0.set_session(session).await
-
}
-
-
/// Base URL of this client.
-
pub fn base(&self) -> &Url {
-
self.0.base()
-
}
-
}
+
pub struct BasicClient(); //AtClient<reqwest::Client, MemorySessionStore<Did<'static>, AuthSession>>);
/// App password session information from `com.atproto.server.createSession`
///
-284
crates/jacquard/src/client/at_client.rs
···
-
use bytes::Bytes;
-
use jacquard_common::{
-
AuthorizationToken, IntoStatic,
-
error::{AuthError, ClientError, HttpError, TransportError, XrpcResult},
-
http_client::HttpClient,
-
session::{SessionStore, SessionStoreError},
-
types::{
-
did::Did,
-
xrpc::{CallOptions, Response, XrpcExt, XrpcRequest, build_http_request},
-
},
-
};
-
use url::Url;
-
-
use crate::client::{AtpSession, AuthSession, NSID_REFRESH_SESSION};
-
-
/// Per-call overrides when sending via `AtClient`.
-
#[derive(Debug, Clone)]
-
pub struct SendOverrides<'a> {
-
/// Optional DID override for this call.
-
pub did: Option<Did<'a>>,
-
/// Optional base URI override for this call.
-
pub base_uri: Option<Url>,
-
/// Per-request options such as auth, proxy, labelers, extra headers.
-
pub options: CallOptions<'a>,
-
/// Whether to auto-refresh on expired/invalid token and retry once.
-
pub auto_refresh: bool,
-
}
-
-
impl Default for SendOverrides<'_> {
-
fn default() -> Self {
-
Self {
-
did: None,
-
base_uri: None,
-
options: CallOptions::default(),
-
auto_refresh: true,
-
}
-
}
-
}
-
-
impl<'a> SendOverrides<'a> {
-
/// Construct default overrides (no base override, auto-refresh enabled).
-
pub fn new() -> Self {
-
Self {
-
did: None,
-
base_uri: None,
-
options: CallOptions::default(),
-
auto_refresh: true,
-
}
-
}
-
/// Override the base URI for this call only.
-
pub fn base_uri(mut self, base: Url) -> Self {
-
self.base_uri = Some(base);
-
self
-
}
-
/// Provide a full set of call options (auth/headers/etc.).
-
pub fn options(mut self, opts: CallOptions<'a>) -> Self {
-
self.options = opts;
-
self
-
}
-
-
/// Provide a full set of call options (auth/headers/etc.).
-
pub fn did(mut self, did: Did<'a>) -> Self {
-
self.did = Some(did);
-
self
-
}
-
/// Enable or disable one-shot auto-refresh + retry behavior.
-
pub fn auto_refresh(mut self, enable: bool) -> Self {
-
self.auto_refresh = enable;
-
self
-
}
-
}
-
-
/// Stateful client for AT Protocol XRPC with token storage and auto-refresh.
-
///
-
/// Example (file-backed tokens)
-
/// ```ignore
-
/// use jacquard::client::{AtClient, FileTokenStore, TokenStore};
-
/// use jacquard::api::com_atproto::server::create_session::CreateSession;
-
/// use jacquard::client::AtClient as _; // method resolution
-
/// use jacquard::CowStr;
-
///
-
/// #[tokio::main]
-
/// async fn main() -> miette::Result<()> {
-
/// let base = url::Url::parse("https://bsky.social")?;
-
/// let store = FileTokenStore::new("/tmp/jacquard-session.json");
-
/// let client = AtClient::new(reqwest::Client::new(), base, store);
-
/// let session = client
-
/// .send(
-
/// CreateSession::new()
-
/// .identifier(CowStr::from("alice.example"))
-
/// .password(CowStr::from("app-password"))
-
/// .build(),
-
/// )
-
/// .await?
-
/// .into_output()?;
-
/// client.set_session(session.into()).await?;
-
/// Ok(())
-
/// }
-
/// ```
-
pub struct AtClient<C: HttpClient, S> {
-
transport: C,
-
base: Url,
-
tokens: S,
-
refresh_lock: tokio::sync::Mutex<Option<Did<'static>>>,
-
}
-
-
impl<C: HttpClient, S: SessionStore<Did<'static>, AuthSession>> AtClient<C, S> {
-
/// Create a new client with a transport, base URL, and token store.
-
pub fn new(transport: C, base: Url, tokens: S) -> Self {
-
Self {
-
transport,
-
base,
-
tokens,
-
refresh_lock: tokio::sync::Mutex::new(None),
-
}
-
}
-
-
/// Get the base URL of this client.
-
pub fn base(&self) -> &Url {
-
&self.base
-
}
-
-
/// Access the underlying transport.
-
pub fn transport(&self) -> &C {
-
&self.transport
-
}
-
-
/// Get the current session, if any.
-
pub async fn session(&self, did: &Did<'static>) -> Option<AuthSession> {
-
self.tokens.get(did).await
-
}
-
-
/// Set the current session in the token store.
-
pub async fn set_session(&self, session: AuthSession) -> Result<(), SessionStoreError> {
-
let s = session.clone();
-
let did = s.did().clone().into_static();
-
self.refresh_lock.lock().await.replace(did.clone());
-
self.tokens.set(did, session).await
-
}
-
-
/// Send an XRPC request using the client's base URL and default behavior.
-
pub async fn send<R: XrpcRequest + Send>(&self, req: R) -> XrpcResult<Response<R>> {
-
self.send_with(req, SendOverrides::new()).await
-
}
-
-
/// Send an XRPC request with per-call overrides.
-
pub async fn send_with<R: XrpcRequest + Send>(
-
&self,
-
req: R,
-
mut overrides: SendOverrides<'_>,
-
) -> XrpcResult<Response<R>> {
-
let base = overrides
-
.base_uri
-
.clone()
-
.unwrap_or_else(|| self.base.clone());
-
let is_refresh = R::NSID == NSID_REFRESH_SESSION;
-
-
let mut current_did = None;
-
if overrides.options.auth.is_none() {
-
if let Ok(guard) = self.refresh_lock.try_lock() {
-
if let Some(ref did) = *guard {
-
current_did = Some(did.clone());
-
if let Some(s) = self.tokens.get(&did).await {
-
overrides.options.auth = Some(
-
if let Some(refresh_tok) = s.refresh_token()
-
&& is_refresh
-
{
-
AuthorizationToken::Bearer(refresh_tok.clone().into_static())
-
} else {
-
AuthorizationToken::Bearer(s.access_token().clone().into_static())
-
},
-
);
-
}
-
}
-
}
-
}
-
-
let http_request =
-
build_http_request(&base, &req, &overrides.options).map_err(TransportError::from)?;
-
let http_response = self
-
.transport
-
.send_http(http_request)
-
.await
-
.map_err(|e| TransportError::Other(Box::new(e)))?;
-
let status = http_response.status();
-
let buffer = Bytes::from(http_response.into_body());
-
-
if !status.is_success() && !matches!(status.as_u16(), 400 | 401) {
-
return Err(HttpError {
-
status,
-
body: Some(buffer),
-
}
-
.into());
-
}
-
-
if overrides.auto_refresh
-
&& !is_refresh
-
&& overrides.options.auth.is_some()
-
&& Self::is_auth_expired(status, &buffer)
-
{
-
self.refresh_once().await?;
-
-
let mut retry_opts = overrides.options.clone();
-
if let Some(curr_did) = current_did {
-
if let Some(s) = self.tokens.get(&curr_did).await {
-
retry_opts.auth = Some(AuthorizationToken::Bearer(
-
s.access_token().clone().into_static(),
-
));
-
}
-
}
-
let http_request =
-
build_http_request(&base, &req, &retry_opts).map_err(TransportError::from)?;
-
let http_response = self
-
.transport
-
.send_http(http_request)
-
.await
-
.map_err(|e| TransportError::Other(Box::new(e)))?;
-
let status = http_response.status();
-
let buffer = Bytes::from(http_response.into_body());
-
-
if !status.is_success() && !matches!(status.as_u16(), 400 | 401) {
-
return Err(HttpError {
-
status,
-
body: Some(buffer),
-
}
-
.into());
-
}
-
return Ok(Response::new(buffer, status));
-
}
-
-
Ok(Response::new(buffer, status))
-
}
-
-
async fn refresh_once(&self) -> XrpcResult<()> {
-
let guard = self.refresh_lock.lock().await;
-
if let Some(ref did) = *guard {
-
if let Some(s) = self.tokens.get(did).await {
-
if let Some(refresh_tok) = s.refresh_token() {
-
let refresh_resp = self
-
.transport
-
.xrpc(self.base.clone())
-
.auth(AuthorizationToken::Bearer(
-
refresh_tok.clone().into_static(),
-
))
-
.send(&jacquard_api::com_atproto::server::refresh_session::RefreshSession)
-
.await?;
-
let refreshed = match refresh_resp.into_output() {
-
Ok(o) => AtpSession::from(o),
-
Err(_) => return Err(ClientError::Auth(AuthError::RefreshFailed)),
-
};
-
-
let mut session = s.clone();
-
session.set_access_token(refreshed.access_jwt);
-
session.set_refresh_token(refreshed.refresh_jwt);
-
-
self.set_session(session)
-
.await
-
.map_err(|_| ClientError::Auth(AuthError::RefreshFailed))?;
-
Ok(())
-
} else {
-
Err(ClientError::Auth(AuthError::RefreshFailed))
-
}
-
} else {
-
Err(ClientError::Auth(AuthError::NotAuthenticated))
-
}
-
} else {
-
Err(ClientError::Auth(AuthError::NotAuthenticated))
-
}
-
}
-
-
fn is_auth_expired(status: http::StatusCode, buffer: &Bytes) -> bool {
-
if status.as_u16() == 401 {
-
return true;
-
}
-
if status.as_u16() == 400 {
-
if let Ok(val) = serde_json::from_slice::<serde_json::Value>(buffer) {
-
if let Some(code) = val.get("error").and_then(|v| v.as_str()) {
-
return matches!(code, "ExpiredToken" | "InvalidToken");
-
}
-
}
-
}
-
false
-
}
-
}
+3 -7
crates/jacquard/src/client/token.rs
···
use jacquard_common::IntoStatic;
use jacquard_common::cowstr::ToCowStr;
use jacquard_common::session::{FileTokenStore, SessionStore, SessionStoreError};
-
use jacquard_common::types::string::{Datetime, Did, Handle};
+
use jacquard_common::types::string::{Datetime, Did};
use jacquard_oauth::scopes::Scope;
use jacquard_oauth::session::{AuthRequestData, ClientSessionData, DpopClientData, DpopReqData};
use jacquard_oauth::types::OAuthTokenType;
use jose_jwk::Key;
-
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::Value;
-
use std::fmt::Display;
-
use std::hash::Hash;
-
use std::path::{Path, PathBuf};
use url::Url;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
···
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
-
struct StoredAtSession {
+
pub struct StoredAtSession {
access_jwt: String,
refresh_jwt: String,
did: String,
···
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
-
struct OAuthSession {
+
pub struct OAuthSession {
account_did: String,
session_id: String,
+29 -29
crates/jacquard/src/main.rs
···
let resolver = slingshot_resolver_default();
let handle = Handle::new(args.username.as_ref()).into_diagnostic()?;
let (_did, pds_url) = resolver.pds_for_handle(&handle).await.into_diagnostic()?;
-
let client = BasicClient::new(pds_url);
+
// let client = BasicClient::new(pds_url);
-
// Create session
-
let session = AtpSession::from(
-
client
-
.send(
-
CreateSession::new()
-
.identifier(args.username)
-
.password(args.password)
-
.build(),
-
)
-
.await?
-
.into_output()?,
-
);
+
// // Create session
+
// let session = AtpSession::from(
+
// client
+
// .send(
+
// CreateSession::new()
+
// .identifier(args.username)
+
// .password(args.password)
+
// .build(),
+
// )
+
// .await?
+
// .into_output()?,
+
// );
-
println!("logged in as {} ({})", session.handle, session.did);
-
client.set_session(session.into()).await.into_diagnostic()?;
+
// println!("logged in as {} ({})", session.handle, session.did);
+
// client.set_session(session.into()).await.into_diagnostic()?;
-
// Fetch timeline
-
println!("\nfetching timeline...");
-
let timeline = client
-
.send(GetTimeline::new().limit(5).build())
-
.await?
-
.into_output()?;
+
// // Fetch timeline
+
// println!("\nfetching timeline...");
+
// let timeline = client
+
// .send(GetTimeline::new().limit(5).build())
+
// .await?
+
// .into_output()?;
-
println!("\ntimeline ({} posts):", timeline.feed.len());
-
for (i, post) in timeline.feed.iter().enumerate() {
-
println!("\n{}. by {}", i + 1, post.post.author.handle);
-
println!(
-
" {}",
-
serde_json::to_string_pretty(&post.post.record).into_diagnostic()?
-
);
-
}
+
// println!("\ntimeline ({} posts):", timeline.feed.len());
+
// for (i, post) in timeline.feed.iter().enumerate() {
+
// println!("\n{}. by {}", i + 1, post.post.author.handle);
+
// println!(
+
// " {}",
+
// serde_json::to_string_pretty(&post.post.record).into_diagnostic()?
+
// );
+
// }
Ok(())
}