Elixir ATProtocol firehose & subscription listener
1defmodule Drinkup.Socket do 2 @moduledoc """ 3 gen_statem process for managing the websocket connection to an ATProto relay. 4 """ 5 6 require Logger 7 alias Drinkup.{ConsumerGroup, Event} 8 9 @behaviour :gen_statem 10 @default_host "https://bsky.network" 11 @timeout :timer.seconds(5) 12 # TODO: `flow` determines messages in buffer. Determine ideal value? 13 @flow 10 14 15 @op_regular 1 16 @op_error -1 17 18 defstruct [:host, :seq, :conn, :stream] 19 20 @impl true 21 def callback_mode, do: [:state_functions, :state_enter] 22 23 def child_spec(opts) do 24 %{ 25 id: __MODULE__, 26 start: {__MODULE__, :start_link, [opts, []]}, 27 type: :worker, 28 restart: :permanent, 29 shutdown: 500 30 } 31 end 32 33 def start_link(opts \\ [], statem_opts) do 34 opts = Keyword.validate!(opts, host: @default_host) 35 host = Keyword.get(opts, :host) 36 cursor = Keyword.get(opts, :cursor) 37 38 :gen_statem.start_link(__MODULE__, {host, cursor}, statem_opts) 39 end 40 41 @impl true 42 def init({host, cursor}) do 43 data = %__MODULE__{host: host, seq: cursor} 44 {:ok, :disconnected, data, [{:next_event, :internal, :connect}]} 45 end 46 47 def disconnected(:enter, _from, data) do 48 Logger.debug("Initial connection") 49 # TODO: differentiate between initial & reconnects, probably stuff to do with seq 50 {:next_state, :disconnected, data} 51 end 52 53 def disconnected(:internal, :connect, data) do 54 {:next_state, :connecting_http, data} 55 end 56 57 def connecting_http(:enter, _from, data) do 58 Logger.debug("Connecting to http") 59 60 %{host: host, port: port} = URI.new!(data.host) 61 62 {:ok, conn} = 63 :gun.open(:binary.bin_to_list(host), port, %{ 64 retry: 0, 65 protocols: [:http], 66 connect_timeout: @timeout, 67 domain_lookup_timeout: @timeout, 68 tls_handshake_timeout: @timeout, 69 tls_opts: [ 70 verify: :verify_peer, 71 cacerts: :certifi.cacerts(), 72 depth: 3, 73 customize_hostname_check: [ 74 match_fun: :public_key.pkix_verify_hostname_match_fun(:https) 75 ] 76 ] 77 }) 78 79 {:keep_state, %{data | conn: conn}, [{:state_timeout, @timeout, :connect_timeout}]} 80 end 81 82 def connecting_http(:info, {:gun_up, _conn, :http}, data) do 83 {:next_state, :connecting_ws, data} 84 end 85 86 def connecting_http(:state_timeout, :connect_timeout, _data) do 87 {:stop, :connect_http_timeout} 88 end 89 90 def connecting_ws(:enter, _from, %{conn: conn, seq: seq} = data) do 91 Logger.debug("Upgrading connection to websocket") 92 path = "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(%{cursor: seq}) 93 stream = :gun.ws_upgrade(conn, path, [], %{flow: @flow}) 94 {:keep_state, %{data | stream: stream}, [{:state_timeout, @timeout, :upgrade_timeout}]} 95 end 96 97 def connecting_ws(:info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, data) do 98 {:next_state, :connected, data} 99 end 100 101 def connecting_ws(:state_timeout, :upgrade_timeout, _data) do 102 {:stop, :connect_ws_timeout} 103 end 104 105 def connected(:enter, _from, _data) do 106 Logger.debug("Connected to websocket") 107 :keep_state_and_data 108 end 109 110 def connected(:info, {:gun_ws, conn, stream, {:binary, frame}}, data) do 111 # TODO: let clients specify a handler for raw* (*decoded) packets to support any atproto subscription 112 # Will also need support for JSON frames 113 with {:ok, header, next} <- CAR.DagCbor.decode(frame), 114 {:ok, payload, _} <- CAR.DagCbor.decode(next), 115 {%{"op" => @op_regular, "t" => type}, _} <- {header, payload}, 116 true <- Event.valid_seq?(data.seq, payload["seq"]) do 117 data = %{data | seq: payload["seq"] || data.seq} 118 message = Event.from(type, payload) 119 :ok = :gun.update_flow(conn, stream, @flow) 120 121 case message do 122 nil -> 123 Logger.warning("Received unrecognised event from firehose: #{inspect({type, payload})}") 124 125 message -> 126 ConsumerGroup.dispatch(message) 127 end 128 129 {:keep_state, data} 130 else 131 false -> 132 Logger.error("Got out of sequence or invalid `seq` from Firehose") 133 {:keep_state, data} 134 135 {%{"op" => @op_error, "t" => type}, payload} -> 136 Logger.error("Got error from Firehose: #{inspect({type, payload})}") 137 {:keep_state, data} 138 139 {:error, reason} -> 140 Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}") 141 {:keep_state, data} 142 end 143 end 144 145 def connected(:info, {:gun_ws, _conn, _stream, :close}, _data) do 146 Logger.info("Websocket closed, reason unknown") 147 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 148 end 149 150 def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, _data) do 151 Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}") 152 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 153 end 154 155 def connected(:info, {:gun_down, old_conn, _proto, _reason, _killed_streams}, %{conn: new_conn}) 156 when old_conn != new_conn do 157 Logger.debug("Ignoring received :gun_down for a previous connection.") 158 :keep_state_and_data 159 end 160 161 def connected(:info, {:gun_down, _conn, _proto, _reason, _killed_streams}, _data) do 162 Logger.info("Websocket connection killed. Attempting to reconnect") 163 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 164 end 165 166 def connected(:internal, :reconnect, %{conn: conn} = data) do 167 :ok = :gun.close(conn) 168 :ok = :gun.flush(conn) 169 170 # TODO: reconnect backoff 171 {:next_state, :disconnected, %{data | conn: nil, stream: nil}, 172 [{:next_event, :internal, :connect}]} 173 end 174end