Elixir ATProtocol firehose & subscription listener

feat: support running multiple instances

ovyerus.com 905d880b 8ba72831

verified
+1 -2
README.md
···
- Support for different subscriptions other than
`com.atproto.sync.subscribeRepo'
-
- Support for multiple instances at once, each with unique consumers (for
-
listening to multiple subscriptions at once)
- Tests
+
- Documentation
+26
examples/basic_consumer.ex
···
+
defmodule BasicConsumer do
+
@behaviour Drinkup.Consumer
+
+
def handle_event(%Drinkup.Event.Commit{} = event) do
+
IO.inspect(event, label: "Got commit event")
+
end
+
+
def handle_event(_), do: :noop
+
end
+
+
defmodule ExampleSupervisor do
+
use Supervisor
+
+
def start_link(arg \\ []) do
+
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
+
end
+
+
@impl true
+
def init(_) do
+
children = [
+
{Drinkup, %{consumer: BasicConsumer}}
+
]
+
+
Supervisor.init(children, strategy: :one_for_one)
+
end
+
end
+35
examples/multiple_consumers.ex
···
+
defmodule PostDeleteConsumer do
+
use Drinkup.RecordConsumer, collections: ["app.bsky.feed.post"]
+
+
def handle_delete(record) do
+
IO.inspect(record, label: "update")
+
end
+
end
+
+
defmodule IdentityConsumer do
+
@behaviour Drinkup.Consumer
+
+
def handle_event(%Drinkup.Event.Identity{} = event) do
+
IO.inspect(event, label: "identity event")
+
end
+
+
def handle_event(_), do: :noop
+
end
+
+
defmodule ExampleSupervisor do
+
use Supervisor
+
+
def start_link(arg \\ []) do
+
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
+
end
+
+
@impl true
+
def init(_) do
+
children = [
+
{Drinkup, %{consumer: PostDeleteConsumer}},
+
{Drinkup, %{consumer: IdentityConsumer, name: :identities}}
+
]
+
+
Supervisor.init(children, strategy: :one_for_one)
+
end
+
end
+2 -2
examples/record_consumer.ex
···
defmodule ExampleSupervisor do
use Supervisor
-
def start_link(args \\ []) do
+
def start_link(arg \\ []) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
@impl true
def init(_) do
children = [
-
{Drinkup, %{module: ExampleRecordConsumer}}
+
{Drinkup, %{consumer: ExampleRecordConsumer}}
]
Supervisor.init(children, strategy: :one_for_one)
+8
lib/application.ex
···
+
defmodule Drinkup.Application do
+
use Application
+
+
def start(_type, _args) do
+
children = [{Registry, keys: :unique, name: Drinkup.Registry}]
+
Supervisor.start_link(children, strategy: :one_for_one)
+
end
+
end
+23 -14
lib/drinkup.ex
···
defmodule Drinkup do
use Supervisor
+
alias Drinkup.Options
-
@type options() :: %{
-
required(:consumer) => module(),
-
optional(:host) => String.t(),
-
optional(:cursor) => pos_integer()
-
}
+
@dialyzer nowarn_function: {:init, 1}
+
@impl true
+
def init({%Options{name: name} = drinkup_options, supervisor_options}) do
+
children = [
+
{Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, Tasks}}}},
+
{Drinkup.Socket, drinkup_options}
+
]
-
@spec start_link(options()) :: Supervisor.on_start()
-
def start_link(options) do
-
Supervisor.start_link(__MODULE__, options, name: __MODULE__)
+
Supervisor.start_link(
+
children,
+
supervisor_options ++ [name: {:via, Registry, {Drinkup.Registry, {name, Supervisor}}}]
+
)
end
-
def init(options) do
-
children = [
-
{Task.Supervisor, name: Drinkup.TaskSupervisor},
-
{Drinkup.Socket, options}
-
]
+
@spec child_spec(Options.options()) :: Supervisor.child_spec()
+
def child_spec(%{} = options), do: child_spec({options, [strategy: :one_for_one]})
-
Supervisor.init(children, strategy: :one_for_one)
+
@spec child_spec({Options.options(), Keyword.t()}) :: Supervisor.child_spec()
+
def child_spec({drinkup_options, supervisor_options}) do
+
%{
+
id: Map.get(drinkup_options, :name, __MODULE__),
+
start: {__MODULE__, :init, [{Options.from(drinkup_options), supervisor_options}]},
+
type: :supervisor,
+
restart: :permanent,
+
shutdown: 500
+
}
end
end
+6 -4
lib/event.ex
···
defmodule Drinkup.Event do
require Logger
-
alias Drinkup.Event
+
alias Drinkup.{Event, Options}
@type t() ::
Event.Commit.t()
···
def valid_seq?(last_seq, seq) when is_integer(last_seq) and is_integer(seq), do: seq > last_seq
def valid_seq?(_last_seq, _seq), do: false
-
@spec dispatch(module(), t()) :: :ok
-
def dispatch(consumer, message) do
+
@spec dispatch(t(), Options.t()) :: :ok
+
def dispatch(message, %Options{consumer: consumer, name: name}) do
+
supervisor_name = {:via, Registry, {Drinkup.Registry, {name, Tasks}}}
+
{:ok, _pid} =
-
Task.Supervisor.start_child(Drinkup.TaskSupervisor, fn ->
+
Task.Supervisor.start_child(supervisor_name, fn ->
try do
consumer.handle_event(message)
rescue
+22
lib/options.ex
···
+
defmodule Drinkup.Options do
+
use TypedStruct
+
+
@default_host "https://bsky.network"
+
+
@type options() :: %{
+
required(:consumer) => module(),
+
optional(:name) => atom(),
+
optional(:host) => String.t(),
+
optional(:cursor) => pos_integer()
+
}
+
+
typedstruct do
+
field :consumer, module(), enforce: true
+
field :name, atom(), default: Drinkup
+
field :host, String.t(), default: @default_host
+
field :cursor, pos_integer() | nil
+
end
+
+
@spec from(options()) :: t()
+
def from(%{consumer: _} = options), do: struct(__MODULE__, options)
+
end
+3 -6
lib/socket.ex
···
"""
require Logger
-
alias Drinkup.Event
+
alias Drinkup.{Event, Options}
@behaviour :gen_statem
-
@default_host "https://bsky.network"
@timeout :timer.seconds(5)
# TODO: `flow` determines messages in buffer. Determine ideal value?
@flow 10
···
}
end
-
def start_link(%{consumer: _} = options, statem_opts) do
-
options = Map.merge(%{host: @default_host, cursor: nil}, options)
-
+
def start_link(%Options{} = options, statem_opts) do
:gen_statem.start_link(__MODULE__, options, statem_opts)
end
···
Logger.warning("Received unrecognised event from firehose: #{inspect({type, payload})}")
message ->
-
Event.dispatch(options.consumer, message)
+
Event.dispatch(message, options)
end
{:keep_state, data}
+2 -1
mix.exs
···
# Run "mix help compile.app" to learn about applications.
def application do
[
-
extra_applications: [:logger]
+
extra_applications: [:logger],
+
mod: {Drinkup.Application, []}
]
end