Elixir ATProtocol firehose & subscription listener

refactor: simplify structure. Remove ConsumerGroup and make Consumer a simple behaviour

ovyerus.com 8ba72831 edbdd5b4

verified
+3 -4
examples/record_consumer.ex
···
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
-
@immpl true
-
def init(_arg) do
+
@impl true
+
def init(_) do
children = [
-
Drinkup,
-
ExampleRecordConsumer
+
{Drinkup, %{module: ExampleRecordConsumer}}
]
Supervisor.init(children, strategy: :one_for_one)
+1 -50
lib/consumer.ex
···
An unopinionated consumer of the Firehose. Will receive all events, not just commits.
"""
-
alias Drinkup.{ConsumerGroup, Event}
+
alias Drinkup.Event
@callback handle_event(Event.t()) :: any()
-
-
defmacro __using__(_opts) do
-
quote location: :keep do
-
use GenServer
-
require Logger
-
-
@behaviour Drinkup.Consumer
-
-
def child_spec(opts) do
-
%{
-
id: __MODULE__,
-
start: {__MODULE__, :start_link, [opts]},
-
type: :worker,
-
restart: :permanent,
-
max_restarts: 0,
-
shutdown: 500
-
}
-
end
-
-
def start_link(opts) do
-
GenServer.start_link(__MODULE__, [], opts)
-
end
-
-
@impl GenServer
-
def init(_) do
-
ConsumerGroup.join()
-
{:ok, nil}
-
end
-
-
@impl GenServer
-
def handle_info({:event, event}, state) do
-
{:ok, _pid} =
-
Task.start(fn ->
-
try do
-
__MODULE__.handle_event(event)
-
rescue
-
e ->
-
Logger.error(
-
"Error in event handler: #{Exception.format(:error, e, __STACKTRACE__)}"
-
)
-
end
-
end)
-
-
{:noreply, state}
-
end
-
-
defoverridable GenServer
-
end
-
end
end
-39
lib/consumer_group.ex
···
-
defmodule Drinkup.ConsumerGroup do
-
@moduledoc """
-
Register consumers and dispatch events to them.
-
"""
-
-
alias Drinkup.Event
-
-
@scope __MODULE__
-
@group :consumers
-
-
def start_link(_) do
-
:pg.start_link(@scope)
-
end
-
-
def child_spec(opts) do
-
%{
-
id: __MODULE__,
-
start: {__MODULE__, :start_link, [opts]},
-
type: :worker,
-
restart: :permanent,
-
shutdown: 500
-
}
-
end
-
-
@spec join() :: :ok
-
def join(), do: join(self())
-
-
@spec join(pid()) :: :ok
-
def join(pid), do: :pg.join(@scope, @group, pid)
-
-
@spec dispatch(Event.t()) :: :ok
-
def dispatch(event) do
-
@scope
-
|> :pg.get_members(@group)
-
|> Enum.each(&send(&1, {:event, event}))
-
end
-
-
# TODO: read `:pg` docs on what `monitor` is used fo
-
end
+12 -5
lib/drinkup.ex
···
defmodule Drinkup do
use Supervisor
-
def start_link(arg \\ []) do
-
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
+
@type options() :: %{
+
required(:consumer) => module(),
+
optional(:host) => String.t(),
+
optional(:cursor) => pos_integer()
+
}
+
+
@spec start_link(options()) :: Supervisor.on_start()
+
def start_link(options) do
+
Supervisor.start_link(__MODULE__, options, name: __MODULE__)
end
-
def init(_) do
+
def init(options) do
children = [
-
Drinkup.ConsumerGroup,
-
Drinkup.Socket
+
{Task.Supervisor, name: Drinkup.TaskSupervisor},
+
{Drinkup.Socket, options}
]
Supervisor.init(children, strategy: :one_for_one)
+16
lib/event.ex
···
defmodule Drinkup.Event do
+
require Logger
alias Drinkup.Event
@type t() ::
···
def valid_seq?(last_seq, nil) when is_integer(last_seq), do: true
def valid_seq?(last_seq, seq) when is_integer(last_seq) and is_integer(seq), do: seq > last_seq
def valid_seq?(_last_seq, _seq), do: false
+
+
@spec dispatch(module(), t()) :: :ok
+
def dispatch(consumer, message) do
+
{:ok, _pid} =
+
Task.Supervisor.start_child(Drinkup.TaskSupervisor, fn ->
+
try do
+
consumer.handle_event(message)
+
rescue
+
e ->
+
Logger.error("Error in event handler: #{Exception.format(:error, e, __STACKTRACE__)}")
+
end
+
end)
+
+
:ok
+
end
end
+1 -1
lib/record_consumer.ex
···
{collections, _opts} = Keyword.pop(opts, :collections, [])
quote location: :keep do
-
use Drinkup.Consumer
+
@behaviour Drinkup.Consumer
@behaviour Drinkup.RecordConsumer
def handle_event(%Drinkup.Event.Commit{} = event) do
+11 -13
lib/socket.ex
···
"""
require Logger
-
alias Drinkup.{ConsumerGroup, Event}
+
alias Drinkup.Event
@behaviour :gen_statem
@default_host "https://bsky.network"
···
@op_regular 1
@op_error -1
-
defstruct [:host, :seq, :conn, :stream]
+
defstruct [:options, :seq, :conn, :stream]
@impl true
def callback_mode, do: [:state_functions, :state_enter]
···
}
end
-
def start_link(opts \\ [], statem_opts) do
-
opts = Keyword.validate!(opts, host: @default_host)
-
host = Keyword.get(opts, :host)
-
cursor = Keyword.get(opts, :cursor)
+
def start_link(%{consumer: _} = options, statem_opts) do
+
options = Map.merge(%{host: @default_host, cursor: nil}, options)
-
:gen_statem.start_link(__MODULE__, {host, cursor}, statem_opts)
+
:gen_statem.start_link(__MODULE__, options, statem_opts)
end
@impl true
-
def init({host, cursor}) do
-
data = %__MODULE__{host: host, seq: cursor}
+
def init(%{cursor: seq} = options) do
+
data = %__MODULE__{seq: seq, options: options}
{:ok, :disconnected, data, [{:next_event, :internal, :connect}]}
end
···
{:next_state, :connecting_http, data}
end
-
def connecting_http(:enter, _from, data) do
+
def connecting_http(:enter, _from, %{options: options} = data) do
Logger.debug("Connecting to http")
-
%{host: host, port: port} = URI.new!(data.host)
+
%{host: host, port: port} = URI.new!(options.host)
{:ok, conn} =
:gun.open(:binary.bin_to_list(host), port, %{
···
:keep_state_and_data
end
-
def connected(:info, {:gun_ws, conn, stream, {:binary, frame}}, data) do
+
def connected(:info, {:gun_ws, conn, stream, {:binary, frame}}, %{options: options} = data) do
# TODO: let clients specify a handler for raw* (*decoded) packets to support any atproto subscription
# Will also need support for JSON frames
with {:ok, header, next} <- CAR.DagCbor.decode(frame),
···
Logger.warning("Received unrecognised event from firehose: #{inspect({type, payload})}")
message ->
-
ConsumerGroup.dispatch(message)
+
Event.dispatch(options.consumer, message)
end
{:keep_state, data}