Elixir ATProtocol firehose & subscription listener
at main 5.5 kB view raw
1defmodule Drinkup.Socket do 2 @moduledoc """ 3 gen_statem process for managing the websocket connection to an ATProto relay. 4 """ 5 6 require Logger 7 alias Drinkup.{Event, Options} 8 9 @behaviour :gen_statem 10 @timeout :timer.seconds(5) 11 # TODO: `flow` determines messages in buffer. Determine ideal value? 12 @flow 10 13 14 @op_regular 1 15 @op_error -1 16 17 defstruct [:options, :seq, :conn, :stream] 18 19 @impl true 20 def callback_mode, do: [:state_functions, :state_enter] 21 22 def child_spec(opts) do 23 %{ 24 id: __MODULE__, 25 start: {__MODULE__, :start_link, [opts, []]}, 26 type: :worker, 27 restart: :permanent, 28 shutdown: 500 29 } 30 end 31 32 def start_link(%Options{} = options, statem_opts) do 33 :gen_statem.start_link(__MODULE__, options, statem_opts) 34 end 35 36 @impl true 37 def init(%{cursor: seq} = options) do 38 data = %__MODULE__{seq: seq, options: options} 39 {:ok, :disconnected, data, [{:next_event, :internal, :connect}]} 40 end 41 42 def disconnected(:enter, _from, data) do 43 Logger.debug("Initial connection") 44 # TODO: differentiate between initial & reconnects, probably stuff to do with seq 45 {:next_state, :disconnected, data} 46 end 47 48 def disconnected(:internal, :connect, data) do 49 {:next_state, :connecting_http, data} 50 end 51 52 def connecting_http(:enter, _from, %{options: options} = data) do 53 Logger.debug("Connecting to http") 54 55 %{host: host, port: port} = URI.new!(options.host) 56 57 {:ok, conn} = 58 :gun.open(:binary.bin_to_list(host), port, %{ 59 retry: 0, 60 protocols: [:http], 61 connect_timeout: @timeout, 62 domain_lookup_timeout: @timeout, 63 tls_handshake_timeout: @timeout, 64 tls_opts: [ 65 verify: :verify_peer, 66 cacerts: :certifi.cacerts(), 67 depth: 3, 68 customize_hostname_check: [ 69 match_fun: :public_key.pkix_verify_hostname_match_fun(:https) 70 ] 71 ] 72 }) 73 74 {:keep_state, %{data | conn: conn}, [{:state_timeout, @timeout, :connect_timeout}]} 75 end 76 77 def connecting_http(:info, {:gun_up, _conn, :http}, data) do 78 {:next_state, :connecting_ws, data} 79 end 80 81 def connecting_http(:state_timeout, :connect_timeout, _data) do 82 {:stop, :connect_http_timeout} 83 end 84 85 def connecting_ws(:enter, _from, %{conn: conn, seq: seq} = data) do 86 Logger.debug("Upgrading connection to websocket") 87 path = "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(%{cursor: seq}) 88 stream = :gun.ws_upgrade(conn, path, [], %{flow: @flow}) 89 {:keep_state, %{data | stream: stream}, [{:state_timeout, @timeout, :upgrade_timeout}]} 90 end 91 92 def connecting_ws(:info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, data) do 93 {:next_state, :connected, data} 94 end 95 96 def connecting_ws(:state_timeout, :upgrade_timeout, _data) do 97 {:stop, :connect_ws_timeout} 98 end 99 100 def connected(:enter, _from, _data) do 101 Logger.debug("Connected to websocket") 102 :keep_state_and_data 103 end 104 105 def connected(:info, {:gun_ws, conn, stream, {:binary, frame}}, %{options: options} = data) do 106 # TODO: let clients specify a handler for raw* (*decoded) packets to support any atproto subscription 107 # Will also need support for JSON frames 108 with {:ok, header, next} <- CAR.DagCbor.decode(frame), 109 {:ok, payload, _} <- CAR.DagCbor.decode(next), 110 {%{"op" => @op_regular, "t" => type}, _} <- {header, payload}, 111 true <- Event.valid_seq?(data.seq, payload["seq"]) do 112 data = %{data | seq: payload["seq"] || data.seq} 113 message = Event.from(type, payload) 114 :ok = :gun.update_flow(conn, stream, @flow) 115 116 case message do 117 nil -> 118 Logger.warning("Received unrecognised event from firehose: #{inspect({type, payload})}") 119 120 message -> 121 Event.dispatch(message, options) 122 end 123 124 {:keep_state, data} 125 else 126 false -> 127 Logger.error("Got out of sequence or invalid `seq` from Firehose") 128 {:keep_state, data} 129 130 {%{"op" => @op_error, "t" => type}, payload} -> 131 Logger.error("Got error from Firehose: #{inspect({type, payload})}") 132 {:keep_state, data} 133 134 {:error, reason} -> 135 Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}") 136 {:keep_state, data} 137 end 138 end 139 140 def connected(:info, {:gun_ws, _conn, _stream, :close}, _data) do 141 Logger.info("Websocket closed, reason unknown") 142 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 143 end 144 145 def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, _data) do 146 Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}") 147 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 148 end 149 150 def connected(:info, {:gun_down, old_conn, _proto, _reason, _killed_streams}, %{conn: new_conn}) 151 when old_conn != new_conn do 152 Logger.debug("Ignoring received :gun_down for a previous connection.") 153 :keep_state_and_data 154 end 155 156 def connected(:info, {:gun_down, _conn, _proto, _reason, _killed_streams}, _data) do 157 Logger.info("Websocket connection killed. Attempting to reconnect") 158 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 159 end 160 161 def connected(:internal, :reconnect, %{conn: conn} = data) do 162 :ok = :gun.close(conn) 163 :ok = :gun.flush(conn) 164 165 # TODO: reconnect backoff 166 {:next_state, :disconnected, %{data | conn: nil, stream: nil}, 167 [{:next_event, :internal, :connect}]} 168 end 169end