Elixir ATProtocol firehose & subscription listener

feat: RecordConsumer, a consumer for easily reading record changes from the firehose

ovyerus.com edbdd5b4 6dd22610

verified
+1 -1
.formatter.exs
···
# Used by "mix format"
[
-
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
+
inputs: ["{mix,.formatter}.exs", "{config,examples,lib,test}/**/*.{ex,exs}"],
import_deps: [:typedstruct]
]
+8
README.md
···
Drinkup is an ELixir library for listening to events from an ATProtocol
firehose.
+
+
## Roadmap
+
+
- Support for different subscriptions other than
+
`com.atproto.sync.subscribeRepo'
+
- Support for multiple instances at once, each with unique consumers (for
+
listening to multiple subscriptions at once)
+
- Tests
+33
examples/record_consumer.ex
···
+
defmodule ExampleRecordConsumer do
+
use Drinkup.RecordConsumer, collections: [~r/app\.bsky\.graph\..+/, "app.bsky.feed.post"]
+
+
def handle_create(record) do
+
IO.inspect(record, label: "create")
+
end
+
+
def handle_update(record) do
+
IO.inspect(record, label: "update")
+
end
+
+
def handle_delete(record) do
+
IO.inspect(record, label: "delete")
+
end
+
end
+
+
defmodule ExampleSupervisor do
+
use Supervisor
+
+
def start_link(args \\ []) do
+
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
+
end
+
+
@immpl true
+
def init(_arg) do
+
children = [
+
Drinkup,
+
ExampleRecordConsumer
+
]
+
+
Supervisor.init(children, strategy: :one_for_one)
+
end
+
end
+85
lib/record_consumer.ex
···
+
defmodule Drinkup.RecordConsumer do
+
@moduledoc """
+
An opinionated consumer of the Firehose that eats consumers
+
"""
+
+
@callback handle_create(any()) :: any()
+
@callback handle_update(any()) :: any()
+
@callback handle_delete(any()) :: any()
+
+
defmacro __using__(opts) do
+
{collections, _opts} = Keyword.pop(opts, :collections, [])
+
+
quote location: :keep do
+
use Drinkup.Consumer
+
@behaviour Drinkup.RecordConsumer
+
+
def handle_event(%Drinkup.Event.Commit{} = event) do
+
event.ops
+
|> Enum.filter(fn %{path: path} ->
+
path |> String.split("/") |> Enum.at(0) |> matches_collections?()
+
end)
+
|> Enum.map(&Drinkup.RecordConsumer.Record.from(&1, event.repo))
+
|> Enum.each(&apply(__MODULE__, :"handle_#{&1.action}", [&1]))
+
end
+
+
def handle_event(_event), do: :noop
+
+
unquote(
+
if collections == [] do
+
quote do
+
def matches_collections?(_type), do: true
+
end
+
else
+
quote do
+
def matches_collections?(nil), do: false
+
+
def matches_collections?(type) when is_binary(type),
+
do:
+
Enum.any?(unquote(collections), fn
+
matcher when is_binary(matcher) -> type == matcher
+
matcher -> Regex.match?(matcher, type)
+
end)
+
end
+
end
+
)
+
+
@impl true
+
def handle_create(_record), do: nil
+
@impl true
+
def handle_update(_record), do: nil
+
@impl true
+
def handle_delete(_record), do: nil
+
+
defoverridable handle_create: 1, handle_update: 1, handle_delete: 1
+
end
+
end
+
+
defmodule Record do
+
alias Drinkup.Event.Commit.RepoOp
+
use TypedStruct
+
+
typedstruct do
+
field :type, String.t()
+
field :rkey, String.t()
+
field :did, String.t()
+
field :action, :create | :update | :delete
+
field :cid, binary() | nil
+
field :record, map() | nil
+
end
+
+
@spec from(RepoOp.t(), String.t()) :: t()
+
def from(%RepoOp{action: action, path: path, cid: cid, record: record}, did) do
+
[type, rkey] = String.split(path, "/")
+
+
%__MODULE__{
+
type: type,
+
rkey: rkey,
+
did: did,
+
action: action,
+
cid: cid,
+
record: record
+
}
+
end
+
end
+
end