A set of utilities for working with the AT Protocol in Elixir.

refactor: add support for multiple types of XRPC clients

ovyerus.com ffe9ad1b 1164a1e3

verified
Changed files
+216 -108
lib
+3
CHANGELOG.md
···
- Remove `Atex.HTTP` and associated modules as the abstraction caused a bit too
much complexities for how early atex is. It may come back in the future as
something more fleshed out once we're more stable.
+
- Rename `Atex.XRPC.Client` to `Atex.XRPC.LoginClient`
### Features
- Add `Atex.OAuth` module with utilites for handling some OAuth functionality.
- Add `Atex.OAuth.Plug` module (if Plug is loaded) which provides a basic but
complete OAuth flow, including storing the tokens in `Plug.Session`.
+
- Add `Atex.XRPC.Client` behaviour for implementing custom client variants.
+
- `Atex.XRPC` now delegates get/post options to the provided client struct.
## [0.4.0] - 2025-08-27
+2 -2
lib/atex/oauth.ex
···
end
@spec create_client_assertion(JOSE.JWK.t(), String.t(), String.t()) :: String.t()
-
defp create_client_assertion(jwk, client_id, issuer) do
+
def create_client_assertion(jwk, client_id, issuer) do
iat = System.os_time(:second)
jti = random_b64(20)
jws = %{"alg" => "ES256", "kid" => jwk.fields["kid"]}
···
end
@spec create_dpop_token(JOSE.JWK.t(), Req.Request.t(), any(), map()) :: String.t()
-
defp create_dpop_token(jwk, request, nonce \\ nil, attrs \\ %{}) do
+
def create_dpop_token(jwk, request, nonce \\ nil, attrs \\ %{}) do
iat = System.os_time(:second)
jti = random_b64(20)
{_, public_jwk} = JOSE.JWK.to_public_map(jwk)
+45 -32
lib/atex/xrpc.ex
···
defmodule Atex.XRPC do
-
alias Atex.XRPC
+
@moduledoc """
+
XRPC client module for AT Protocol RPC calls.
-
# TODO: automatic user-agent, and env for changing it
+
This module provides both authenticated and unauthenticated access to AT Protocol
+
XRPC endpoints. The authenticated functions (`get/3`, `post/3`) work with any
+
client that implements the `Atex.XRPC.Client`.
-
# TODO: consistent struct shape/protocol for Lexicon schemas so that user can pass in
-
# an object (hopefully validated by its module) without needing to specify the
-
# name & opts separately, and possibly verify the output response against it?
+
## Example usage
+
+
# Login-based client
+
{:ok, client} = Atex.XRPC.LoginClient.login("https://bsky.social", "user.bsky.social", "password")
+
{:ok, response, client} = Atex.XRPC.get(client, "app.bsky.actor.getProfile", params: [actor: "user.bsky.social"])
+
+
# OAuth-based client (coming next)
+
oauth_client = Atex.XRPC.OAuthClient.new_from_oauth_tokens(endpoint, access_token, refresh_token, dpop_key)
+
{:ok, response, oauth_client} = Atex.XRPC.get(oauth_client, "app.bsky.actor.getProfile", params: [actor: "user.bsky.social"])
+
+
## Unauthenticated requests
+
+
Unauthenticated functions (`unauthed_get/3`, `unauthed_post/3`) do not require a client
+
and work directly with endpoints:
-
# TODO: auto refresh, will need to return a client instance in each method.
+
{:ok, response} = Atex.XRPC.unauthed_get("https://bsky.social", "com.atproto.sync.getHead", params: [did: "did:plc:..."])
+
"""
@doc """
Perform a HTTP GET on a XRPC resource. Called a "query" in lexicons.
+
+
Accepts any client that implements `Atex.XRPC.Client` and returns
+
both the response and the (potentially updated) client.
"""
-
@spec get(XRPC.Client.t(), String.t(), keyword()) :: {:ok, Req.Response.t()} | {:error, any()}
-
def get(%XRPC.Client{} = client, name, opts \\ []) do
-
opts = put_auth(opts, client.access_token)
-
Req.get(url(client, name), opts)
+
@spec get(Atex.XRPC.Client.client(), String.t(), keyword()) ::
+
{:ok, Req.Response.t(), Atex.XRPC.Client.client()}
+
| {:error, any(), Atex.XRPC.Client.client()}
+
def get(client, name, opts \\ []) do
+
client.__struct__.get(client, name, opts)
end
@doc """
Perform a HTTP POST on a XRPC resource. Called a "prodecure" in lexicons.
+
+
Accepts any client that implements `Atex.XRPC.Client` and returns
+
both the response and the (potentially updated) client.
"""
-
@spec post(XRPC.Client.t(), String.t(), keyword()) :: {:ok, Req.Response.t()} | {:error, any()}
-
def post(%XRPC.Client{} = client, name, opts \\ []) do
-
# TODO: look through available HTTP clients and see if they have a
-
# consistent way of providing JSON bodies with auto content-type. If not,
-
# create one for adapters.
-
opts = put_auth(opts, client.access_token)
-
Req.post(url(client, name), opts)
+
@spec post(Atex.XRPC.Client.client(), String.t(), keyword()) ::
+
{:ok, Req.Response.t(), Atex.XRPC.Client.client()}
+
| {:error, any(), Atex.XRPC.Client.client()}
+
def post(client, name, opts \\ []) do
+
client.__struct__.post(client, name, opts)
end
@doc """
···
end
# TODO: use URI module for joining instead?
-
@spec url(XRPC.Client.t() | String.t(), String.t()) :: String.t()
-
defp url(%XRPC.Client{endpoint: endpoint}, name), do: url(endpoint, name)
-
defp url(endpoint, name) when is_binary(endpoint), do: "#{endpoint}/xrpc/#{name}"
-
@doc """
-
Put an `authorization` header into a keyword list of options to pass to a HTTP client.
+
Create an XRPC url based on an endpoint and a resource name.
+
+
## Example
+
+
iex> Atex.XRPC.url("https://bsky.app", "app.bsky.actor.getProfile")
+
"https://bsky.app/xrpc/app.bsky.actor.getProfile"
"""
-
@spec put_auth(keyword(), String.t()) :: keyword()
-
def put_auth(opts, token),
-
do: put_headers(opts, authorization: "Bearer #{token}")
-
-
@spec put_headers(keyword(), keyword()) :: keyword()
-
defp put_headers(opts, headers) do
-
opts
-
|> Keyword.put_new(:headers, [])
-
|> Keyword.update(:headers, [], &Keyword.merge(&1, headers))
-
end
+
@spec url(String.t(), String.t()) :: String.t()
+
def url(endpoint, resource) when is_binary(endpoint), do: "#{endpoint}/xrpc/#{resource}"
end
+18 -74
lib/atex/xrpc/client.ex
···
defmodule Atex.XRPC.Client do
@moduledoc """
-
Struct to store client information for ATProto XRPC.
-
"""
+
Behaviour that defines the interface for XRPC clients.
-
alias Atex.{XRPC, HTTP}
-
use TypedStruct
+
This behaviour allows different types of clients (login-based, OAuth-based, etc.)
+
to implement authentication and request handling while maintaining a consistent interface.
-
typedstruct do
-
field :endpoint, String.t(), enforce: true
-
field :access_token, String.t() | nil
-
field :refresh_token, String.t() | nil
-
end
-
-
@doc """
-
Create a new `Atex.XRPC.Client` from an endpoint, and optionally an
-
access/refresh token.
-
-
Endpoint should be the base URL of a PDS, or an AppView in the case of
-
unauthenticated requests (like Bluesky's public API), e.g.
-
`https://bsky.social`.
+
Implementations must handle token refresh internally when requests fail due to
+
expired tokens, and return both the result and potentially updated client state.
"""
-
@spec new(String.t()) :: t()
-
@spec new(String.t(), String.t() | nil) :: t()
-
@spec new(String.t(), String.t() | nil, String.t() | nil) :: t()
-
def new(endpoint, access_token \\ nil, refresh_token \\ nil) do
-
%__MODULE__{endpoint: endpoint, access_token: access_token, refresh_token: refresh_token}
-
end
+
+
@type client :: struct()
+
@type request_opts :: keyword()
+
@type request_result :: {:ok, Req.Response.t(), client()} | {:error, any(), client()}
@doc """
-
Create a new `Atex.XRPC.Client` by logging in with an `identifier` and
-
`password` to fetch an initial pair of access & refresh tokens.
+
Perform an authenticated HTTP GET request on an XRPC resource.
-
Uses `com.atproto.server.createSession` under the hood, so `identifier` can be
-
either a handle or a DID.
-
-
## Examples
-
-
iex> Atex.XRPC.Client.login("https://bsky.social", "example.com", "password123")
-
{:ok, %Atex.XRPC.Client{...}}
+
Implementations should handle token refresh if the request fails due to
+
expired authentication, returning both the response and the (potentially updated) client.
"""
-
@spec login(String.t(), String.t(), String.t()) :: {:ok, t()} | {:error, any()}
-
@spec login(String.t(), String.t(), String.t(), String.t() | nil) ::
-
{:ok, t()} | {:error, any()}
-
def login(endpoint, identifier, password, auth_factor_token \\ nil) do
-
json =
-
%{identifier: identifier, password: password}
-
|> then(
-
&if auth_factor_token do
-
Map.merge(&1, %{authFactorToken: auth_factor_token})
-
else
-
&1
-
end
-
)
-
-
response = XRPC.unauthed_post(endpoint, "com.atproto.server.createSession", json: json)
-
-
case response do
-
{:ok, %{"accessJwt" => access_token, "refreshJwt" => refresh_token}} ->
-
{:ok, new(endpoint, access_token, refresh_token)}
-
-
err ->
-
err
-
end
-
end
+
@callback get(client(), String.t(), request_opts()) :: request_result()
@doc """
-
Request a new `refresh_token` for the given client.
+
Perform an authenticated HTTP POST request on an XRPC resource.
+
+
Implementations should handle token refresh if the request fails due to
+
expired authentication, returning both the response and the (potentially updated) client.
"""
-
@spec refresh(t()) :: {:ok, t()} | {:error, any()}
-
def refresh(%__MODULE__{endpoint: endpoint, refresh_token: refresh_token} = client) do
-
response =
-
XRPC.unauthed_post(
-
endpoint,
-
"com.atproto.server.refreshSession",
-
XRPC.put_auth([], refresh_token)
-
)
-
-
case response do
-
{:ok, %{"accessJwt" => access_token, "refreshJwt" => refresh_token}} ->
-
%{client | access_token: access_token, refresh_token: refresh_token}
-
-
err ->
-
err
-
end
-
end
+
@callback post(client(), String.t(), request_opts()) :: request_result()
end
+148
lib/atex/xrpc/login_client.ex
···
+
defmodule Atex.XRPC.LoginClient do
+
alias Atex.XRPC
+
use TypedStruct
+
+
@behaviour Atex.XRPC.Client
+
+
typedstruct do
+
field :endpoint, String.t(), enforce: true
+
field :access_token, String.t() | nil
+
field :refresh_token, String.t() | nil
+
end
+
+
@doc """
+
Create a new `Atex.XRPC.LoginClient` from an endpoint, and optionally an
+
existing access/refresh token.
+
+
Endpoint should be the base URL of a PDS, or an AppView in the case of
+
unauthenticated requests (like Bluesky's public API), e.g.
+
`https://bsky.social`.
+
"""
+
@spec new(String.t(), String.t() | nil, String.t() | nil) :: t()
+
def new(endpoint, access_token \\ nil, refresh_token \\ nil) do
+
%__MODULE__{endpoint: endpoint, access_token: access_token, refresh_token: refresh_token}
+
end
+
+
@doc """
+
Create a new `Atex.XRPC.LoginClient` by logging in with an `identifier` and
+
`password` to fetch an initial pair of access & refresh tokens.
+
+
Also supports providing a MFA token in the situation that is required.
+
+
Uses `com.atproto.server.createSession` under the hood, so `identifier` can be
+
either a handle or a DID.
+
+
## Examples
+
+
iex> Atex.XRPC.LoginClient.login("https://bsky.social", "example.com", "password123")
+
{:ok, %Atex.XRPC.LoginClient{...}}
+
"""
+
@spec login(String.t(), String.t(), String.t()) :: {:ok, t()} | {:error, any()}
+
@spec login(String.t(), String.t(), String.t(), String.t() | nil) ::
+
{:ok, t()} | {:error, any()}
+
def login(endpoint, identifier, password, auth_factor_token \\ nil) do
+
json =
+
%{identifier: identifier, password: password}
+
|> then(
+
&if auth_factor_token do
+
Map.merge(&1, %{authFactorToken: auth_factor_token})
+
else
+
&1
+
end
+
)
+
+
response = XRPC.unauthed_post(endpoint, "com.atproto.server.createSession", json: json)
+
+
case response do
+
{:ok, %{body: %{"accessJwt" => access_token, "refreshJwt" => refresh_token}}} ->
+
{:ok, new(endpoint, access_token, refresh_token)}
+
+
err ->
+
err
+
end
+
end
+
+
@doc """
+
Request a new `refresh_token` for the given client.
+
"""
+
@spec refresh(t()) :: {:ok, t()} | {:error, any()}
+
def refresh(%__MODULE__{endpoint: endpoint, refresh_token: refresh_token} = client) do
+
request =
+
Req.new(method: :post, url: XRPC.url(endpoint, "com.atproto.server.refreshSession"))
+
|> put_auth(refresh_token)
+
+
case Req.request(request) do
+
{:ok, %{body: %{"accessJwt" => access_token, "refreshJwt" => refresh_token}}} ->
+
{:ok, %{client | access_token: access_token, refresh_token: refresh_token}}
+
+
{:ok, response} ->
+
{:error, response}
+
+
err ->
+
err
+
end
+
end
+
+
@impl true
+
def get(%__MODULE__{} = client, resource, opts \\ []) do
+
request(client, opts ++ [method: :get, url: XRPC.url(client.endpoint, resource)])
+
end
+
+
@impl true
+
def post(%__MODULE__{} = client, resource, opts \\ []) do
+
request(client, opts ++ [method: :post, url: XRPC.url(client.endpoint, resource)])
+
end
+
+
@spec request(t(), keyword()) :: {:ok, Req.Response.t(), t()} | {:error, any()}
+
defp request(client, opts) do
+
with {:ok, client} <- validate_client(client) do
+
request = opts |> Req.new() |> put_auth(client.access_token)
+
+
case Req.request(request) do
+
{:ok, %{status: 200} = response} ->
+
{:ok, response, client}
+
+
{:ok, response} ->
+
handle_failure(client, response, request)
+
+
err ->
+
err
+
end
+
end
+
end
+
+
@spec handle_failure(t(), Req.Response.t(), Req.Request.t()) ::
+
{:ok, Req.Response.t(), t()} | {:error, any()}
+
defp handle_failure(client, response, request) do
+
IO.inspect(response, label: "got failure")
+
+
if auth_error?(response.body) and client.refresh_token do
+
case refresh(client) do
+
{:ok, client} ->
+
case Req.request(put_auth(request, client.access_token)) do
+
{:ok, %{status: 200} = response} -> {:ok, response, client}
+
{:ok, response} -> {:error, response}
+
err -> err
+
end
+
+
err ->
+
err
+
end
+
else
+
{:error, response}
+
end
+
end
+
+
@spec validate_client(t()) :: {:ok, t()} | {:error, any()}
+
defp validate_client(%__MODULE__{access_token: nil}), do: {:error, :no_token}
+
defp validate_client(%__MODULE__{} = client), do: {:ok, client}
+
+
@spec auth_error?(body :: Req.Response.t()) :: boolean()
+
defp auth_error?(%{status: status}) when status in [401, 403], do: true
+
defp auth_error?(%{body: %{"error" => "InvalidToken"}}), do: true
+
defp auth_error?(_response), do: false
+
+
@spec put_auth(Req.Request.t(), String.t()) :: Req.Request.t()
+
defp put_auth(request, token),
+
do: Req.Request.put_header(request, "authorization", "Bearer #{token}")
+
end