Elixir ATProtocol firehose & subscription listener

feat: validate message sequence

ovyerus.com 4ec6c6d7 7cb3b53a

verified
Changed files
+23 -11
lib
+23 -11
lib/drinkup.ex
···
|> URI.append_query(URI.encode_query(%{cursor: cursor}))
|> URI.to_string()
-
WebSockex.start_link(url, __MODULE__, %{cursor: cursor})
+
WebSockex.start_link(url, __MODULE__, 0)
end
def handle_connect(conn, state) do
···
with {:ok, header, next} <- CAR.DagCbor.decode(msg),
{:ok, payload, _} <- CAR.DagCbor.decode(next),
{%{"op" => @op_regular, "t" => type}, _} <- {header, payload},
-
message <- from_payload(type, payload) do
+
true <- type == "#info" || valid_seq?(state, payload["seq"]),
+
message <-
+
from_payload(type, payload) do
case message do
%Firehose.Commit{} = commit ->
IO.inspect(commit.ops, label: commit.repo)
···
msg ->
IO.inspect(msg)
end
+
+
{:ok, payload["seq"] || state}
else
+
false ->
+
Logger.error("Got out of sequence or invalid `seq` from Firehose")
+
{:ok, state}
+
{%{"op" => @op_error, "t" => type}, payload} ->
Logger.error("Got error from Firehose: #{inspect({type, payload})}")
+
{:ok, state}
{:error, reason} ->
Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}")
+
{:ok, state}
end
-
-
{:ok, state}
end
def handle_frame({:text, msg}, state) do
-
Logger.warning("Got unexpected text frame from Firehose #{inspect(msg)}")
+
Logger.warning("Got unexpected text frame from Firehose: #{inspect(msg)}")
{:ok, state}
end
+
+
@spec valid_seq?(integer(), any()) :: boolean()
+
defp valid_seq?(last_seq, seq) when is_integer(seq), do: seq > last_seq
+
defp valid_seq?(_last_seq, _seq), do: false
@spec from_payload(String.t(), map()) ::
Firehose.Commit.t()
···
| Firehose.Account.t()
| Firehose.Info.t()
| nil
-
def from_payload("#commit", payload), do: Firehose.Commit.from(payload)
-
def from_payload("#sync", payload), do: Firehose.Sync.from(payload)
-
def from_payload("#identity", payload), do: Firehose.Identity.from(payload)
-
def from_payload("#account", payload), do: Firehose.Account.from(payload)
-
def from_payload("#info", payload), do: Firehose.Info.from(payload)
-
def from_payload(_type, _payload), do: nil
+
defp from_payload("#commit", payload), do: Firehose.Commit.from(payload)
+
defp from_payload("#sync", payload), do: Firehose.Sync.from(payload)
+
defp from_payload("#identity", payload), do: Firehose.Identity.from(payload)
+
defp from_payload("#account", payload), do: Firehose.Account.from(payload)
+
defp from_payload("#info", payload), do: Firehose.Info.from(payload)
+
defp from_payload(_type, _payload), do: nil
end