Elixir ATProtocol firehose & subscription listener
1defmodule Drinkup.Socket do 2 @moduledoc """ 3 gen_statem process for managing the websocket connection to an ATProto relay. 4 """ 5 6 require Logger 7 alias Drinkup.Event 8 9 @behaviour :gen_statem 10 @default_host "https://bsky.network" 11 @timeout :timer.seconds(5) 12 # TODO: `flow` determines messages in buffer. Determine ideal value? 13 @flow 10 14 15 @op_regular 1 16 @op_error -1 17 18 defstruct [:host, :seq, :conn, :stream] 19 20 @impl true 21 def callback_mode, do: [:state_functions, :state_enter] 22 23 def start_link(opts \\ []) do 24 opts = Keyword.validate!(opts, host: @default_host) 25 host = Keyword.get(opts, :host) 26 cursor = Keyword.get(opts, :cursor) 27 28 :gen_statem.start_link(__MODULE__, {host, cursor}, []) 29 end 30 31 @impl true 32 def init({host, cursor}) do 33 data = %__MODULE__{host: host, seq: cursor} 34 {:ok, :disconnected, data, [{:next_event, :internal, :connect}]} 35 end 36 37 def disconnected(:enter, _from, data) do 38 Logger.debug("Initial connection") 39 # TODO: differentiate between initial & reconnects, probably stuff to do with seq 40 {:next_state, :disconnected, data} 41 end 42 43 def disconnected(:internal, :connect, data) do 44 {:next_state, :connecting_http, data} 45 end 46 47 def connecting_http(:enter, _from, data) do 48 Logger.debug("Connecting to http") 49 50 %{host: host, port: port} = URI.new!(data.host) 51 52 {:ok, conn} = 53 :gun.open(:binary.bin_to_list(host), port, %{ 54 retry: 0, 55 protocols: [:http], 56 connect_timeout: @timeout, 57 domain_lookup_timeout: @timeout, 58 tls_handshake_timeout: @timeout, 59 tls_opts: [ 60 verify: :verify_peer, 61 cacerts: :certifi.cacerts(), 62 depth: 3, 63 customize_hostname_check: [ 64 match_fun: :public_key.pkix_verify_hostname_match_fun(:https) 65 ] 66 ] 67 }) 68 69 {:keep_state, %{data | conn: conn}, [{:state_timeout, @timeout, :connect_timeout}]} 70 end 71 72 def connecting_http(:info, {:gun_up, _conn, :http}, data) do 73 {:next_state, :connecting_ws, data} 74 end 75 76 def connecting_http(:state_timeout, :connect_timeout, _data) do 77 {:stop, :connect_http_timeout} 78 end 79 80 def connecting_ws(:enter, _from, %{conn: conn, seq: seq} = data) do 81 Logger.debug("Upgrading connection to websocket") 82 path = "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(%{cursor: seq}) 83 stream = :gun.ws_upgrade(conn, path, [], %{flow: @flow}) 84 {:keep_state, %{data | stream: stream}, [{:state_timeout, @timeout, :upgrade_timeout}]} 85 end 86 87 def connecting_ws(:info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, data) do 88 {:next_state, :connected, data} 89 end 90 91 def connecting_ws(:state_timeout, :upgrade_timeout, _data) do 92 {:stop, :connect_ws_timeout} 93 end 94 95 def connected(:enter, _from, _data) do 96 Logger.debug("Connected to websocket") 97 :keep_state_and_data 98 end 99 100 def connected(:info, {:gun_ws, conn, stream, {:binary, frame}}, data) do 101 # TODO: let clients specify a handler for raw* (*decoded) packets to support any atproto subscription 102 # Will also need support for JSON frames 103 with {:ok, header, next} <- CAR.DagCbor.decode(frame), 104 {:ok, payload, _} <- CAR.DagCbor.decode(next), 105 {%{"op" => @op_regular, "t" => type}, _} <- {header, payload}, 106 true <- type == "#info" || Event.valid_seq?(data.seq, payload["seq"]), 107 data <- %{data | seq: payload["seq"] || data.seq}, 108 message <- 109 Event.from(type, payload) do 110 :ok = :gun.update_flow(conn, stream, @flow) 111 112 case message do 113 %Event.Commit{} = commit -> 114 IO.inspect(commit.ops, label: commit.repo) 115 116 msg -> 117 IO.inspect(msg) 118 end 119 120 {:keep_state, data} 121 else 122 false -> 123 Logger.error("Got out of sequence or invalid `seq` from Firehose") 124 {:keep_state, data} 125 126 {%{"op" => @op_error, "t" => type}, payload} -> 127 Logger.error("Got error from Firehose: #{inspect({type, payload})}") 128 {:keep_state, data} 129 130 {:error, reason} -> 131 Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}") 132 {:keep_state, data} 133 end 134 end 135 136 def connected(:info, {:gun_ws, _conn, _stream, :close}, _data) do 137 Logger.info("Websocket closed, reason unknown") 138 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 139 end 140 141 def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, _data) do 142 Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}") 143 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 144 end 145 146 def connected(:info, {:gun_down, old_conn, _proto, _reason, _killed_streams}, %{conn: new_conn}) 147 when old_conn != new_conn do 148 Logger.debug("Ignoring received :gun_down for a previous connection.") 149 :keep_state_and_data 150 end 151 152 def connected(:info, {:gun_down, _conn, _proto, _reason, _killed_streams}, _data) do 153 Logger.info("Websocket connection killed. Attempting to reconnect") 154 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 155 end 156 157 def connected(:internal, :reconnect, %{conn: conn} = data) do 158 :ok = :gun.close(conn) 159 :ok = :gun.flush(conn) 160 161 # TODO: reconnect backoff 162 {:next_state, :disconnected, %{data | conn: nil, stream: nil}, 163 [{:next_event, :internal, :connect}]} 164 end 165end