Elixir ATProtocol firehose & subscription listener

feat: add ConsumerGroup and dispatching via consumers

ovyerus.com 6dd22610 7c948f8f

verified
+58
lib/consumer.ex
···
+
defmodule Drinkup.Consumer do
+
@moduledoc """
+
An unopinionated consumer of the Firehose. Will receive all events, not just commits.
+
"""
+
+
alias Drinkup.{ConsumerGroup, Event}
+
+
@callback handle_event(Event.t()) :: any()
+
+
defmacro __using__(_opts) do
+
quote location: :keep do
+
use GenServer
+
require Logger
+
+
@behaviour Drinkup.Consumer
+
+
def child_spec(opts) do
+
%{
+
id: __MODULE__,
+
start: {__MODULE__, :start_link, [opts]},
+
type: :worker,
+
restart: :permanent,
+
max_restarts: 0,
+
shutdown: 500
+
}
+
end
+
+
def start_link(opts) do
+
GenServer.start_link(__MODULE__, [], opts)
+
end
+
+
@impl GenServer
+
def init(_) do
+
ConsumerGroup.join()
+
{:ok, nil}
+
end
+
+
@impl GenServer
+
def handle_info({:event, event}, state) do
+
{:ok, _pid} =
+
Task.start(fn ->
+
try do
+
__MODULE__.handle_event(event)
+
rescue
+
e ->
+
Logger.error(
+
"Error in event handler: #{Exception.format(:error, e, __STACKTRACE__)}"
+
)
+
end
+
end)
+
+
{:noreply, state}
+
end
+
+
defoverridable GenServer
+
end
+
end
+
end
+39
lib/consumer_group.ex
···
+
defmodule Drinkup.ConsumerGroup do
+
@moduledoc """
+
Register consumers and dispatch events to them.
+
"""
+
+
alias Drinkup.Event
+
+
@scope __MODULE__
+
@group :consumers
+
+
def start_link(_) do
+
:pg.start_link(@scope)
+
end
+
+
def child_spec(opts) do
+
%{
+
id: __MODULE__,
+
start: {__MODULE__, :start_link, [opts]},
+
type: :worker,
+
restart: :permanent,
+
shutdown: 500
+
}
+
end
+
+
@spec join() :: :ok
+
def join(), do: join(self())
+
+
@spec join(pid()) :: :ok
+
def join(pid), do: :pg.join(@scope, @group, pid)
+
+
@spec dispatch(Event.t()) :: :ok
+
def dispatch(event) do
+
@scope
+
|> :pg.get_members(@group)
+
|> Enum.each(&send(&1, {:event, event}))
+
end
+
+
# TODO: read `:pg` docs on what `monitor` is used fo
+
end
+14
lib/drinkup.ex
···
defmodule Drinkup do
+
use Supervisor
+
+
def start_link(arg \\ []) do
+
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
+
end
+
+
def init(_) do
+
children = [
+
Drinkup.ConsumerGroup,
+
Drinkup.Socket
+
]
+
+
Supervisor.init(children, strategy: :one_for_one)
+
end
end
+4 -2
lib/event.ex
···
defmodule Drinkup.Event do
alias Drinkup.Event
-
@spec from(String.t(), map()) ::
+
@type t() ::
Event.Commit.t()
| Event.Sync.t()
| Event.Identity.t()
| Event.Account.t()
| Event.Info.t()
-
| nil
+
+
@spec from(String.t(), map()) :: t() | nil
def from("#commit", payload), do: Event.Commit.from(payload)
def from("#sync", payload), do: Event.Sync.from(payload)
def from("#identity", payload), do: Event.Identity.from(payload)
···
@spec valid_seq?(integer() | nil, any()) :: boolean()
def valid_seq?(nil, seq) when is_integer(seq), do: true
+
def valid_seq?(last_seq, nil) when is_integer(last_seq), do: true
def valid_seq?(last_seq, seq) when is_integer(last_seq) and is_integer(seq), do: seq > last_seq
def valid_seq?(_last_seq, _seq), do: false
end
+20 -11
lib/socket.ex
···
"""
require Logger
-
alias Drinkup.Event
+
alias Drinkup.{ConsumerGroup, Event}
@behaviour :gen_statem
@default_host "https://bsky.network"
···
@impl true
def callback_mode, do: [:state_functions, :state_enter]
-
def start_link(opts \\ []) do
+
def child_spec(opts) do
+
%{
+
id: __MODULE__,
+
start: {__MODULE__, :start_link, [opts, []]},
+
type: :worker,
+
restart: :permanent,
+
shutdown: 500
+
}
+
end
+
+
def start_link(opts \\ [], statem_opts) do
opts = Keyword.validate!(opts, host: @default_host)
host = Keyword.get(opts, :host)
cursor = Keyword.get(opts, :cursor)
-
:gen_statem.start_link(__MODULE__, {host, cursor}, [])
+
:gen_statem.start_link(__MODULE__, {host, cursor}, statem_opts)
end
@impl true
···
with {:ok, header, next} <- CAR.DagCbor.decode(frame),
{:ok, payload, _} <- CAR.DagCbor.decode(next),
{%{"op" => @op_regular, "t" => type}, _} <- {header, payload},
-
true <- type == "#info" || Event.valid_seq?(data.seq, payload["seq"]),
-
data <- %{data | seq: payload["seq"] || data.seq},
-
message <-
-
Event.from(type, payload) do
+
true <- Event.valid_seq?(data.seq, payload["seq"]) do
+
data = %{data | seq: payload["seq"] || data.seq}
+
message = Event.from(type, payload)
:ok = :gun.update_flow(conn, stream, @flow)
case message do
-
%Event.Commit{} = commit ->
-
IO.inspect(commit.ops, label: commit.repo)
+
nil ->
+
Logger.warning("Received unrecognised event from firehose: #{inspect({type, payload})}")
-
msg ->
-
IO.inspect(msg)
+
message ->
+
ConsumerGroup.dispatch(message)
end
{:keep_state, data}