1defmodule Drinkup.Socket do
2 @moduledoc """
3 gen_statem process for managing the websocket connection to an ATProto relay.
4 """
5
6 require Logger
7 alias Drinkup.{Event, Options}
8
9 @behaviour :gen_statem
10 @timeout :timer.seconds(5)
11 # TODO: `flow` determines messages in buffer. Determine ideal value?
12 @flow 10
13
14 @op_regular 1
15 @op_error -1
16
17 defstruct [:options, :seq, :conn, :stream]
18
19 @impl true
20 def callback_mode, do: [:state_functions, :state_enter]
21
22 def child_spec(opts) do
23 %{
24 id: __MODULE__,
25 start: {__MODULE__, :start_link, [opts, []]},
26 type: :worker,
27 restart: :permanent,
28 shutdown: 500
29 }
30 end
31
32 def start_link(%Options{} = options, statem_opts) do
33 :gen_statem.start_link(__MODULE__, options, statem_opts)
34 end
35
36 @impl true
37 def init(%{cursor: seq} = options) do
38 data = %__MODULE__{seq: seq, options: options}
39 {:ok, :disconnected, data, [{:next_event, :internal, :connect}]}
40 end
41
42 def disconnected(:enter, _from, data) do
43 Logger.debug("Initial connection")
44 # TODO: differentiate between initial & reconnects, probably stuff to do with seq
45 {:next_state, :disconnected, data}
46 end
47
48 def disconnected(:internal, :connect, data) do
49 {:next_state, :connecting_http, data}
50 end
51
52 def connecting_http(:enter, _from, %{options: options} = data) do
53 Logger.debug("Connecting to http")
54
55 %{host: host, port: port} = URI.new!(options.host)
56
57 {:ok, conn} =
58 :gun.open(:binary.bin_to_list(host), port, %{
59 retry: 0,
60 protocols: [:http],
61 connect_timeout: @timeout,
62 domain_lookup_timeout: @timeout,
63 tls_handshake_timeout: @timeout,
64 tls_opts: [
65 verify: :verify_peer,
66 cacerts: :certifi.cacerts(),
67 depth: 3,
68 customize_hostname_check: [
69 match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
70 ]
71 ]
72 })
73
74 {:keep_state, %{data | conn: conn}, [{:state_timeout, @timeout, :connect_timeout}]}
75 end
76
77 def connecting_http(:info, {:gun_up, _conn, :http}, data) do
78 {:next_state, :connecting_ws, data}
79 end
80
81 def connecting_http(:state_timeout, :connect_timeout, _data) do
82 {:stop, :connect_http_timeout}
83 end
84
85 def connecting_ws(:enter, _from, %{conn: conn, seq: seq} = data) do
86 Logger.debug("Upgrading connection to websocket")
87 path = "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(%{cursor: seq})
88 stream = :gun.ws_upgrade(conn, path, [], %{flow: @flow})
89 {:keep_state, %{data | stream: stream}, [{:state_timeout, @timeout, :upgrade_timeout}]}
90 end
91
92 def connecting_ws(:info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, data) do
93 {:next_state, :connected, data}
94 end
95
96 def connecting_ws(:state_timeout, :upgrade_timeout, _data) do
97 {:stop, :connect_ws_timeout}
98 end
99
100 def connected(:enter, _from, _data) do
101 Logger.debug("Connected to websocket")
102 :keep_state_and_data
103 end
104
105 def connected(:info, {:gun_ws, conn, stream, {:binary, frame}}, %{options: options} = data) do
106 # TODO: let clients specify a handler for raw* (*decoded) packets to support any atproto subscription
107 # Will also need support for JSON frames
108 with {:ok, header, next} <- CAR.DagCbor.decode(frame),
109 {:ok, payload, _} <- CAR.DagCbor.decode(next),
110 {%{"op" => @op_regular, "t" => type}, _} <- {header, payload},
111 true <- Event.valid_seq?(data.seq, payload["seq"]) do
112 data = %{data | seq: payload["seq"] || data.seq}
113 message = Event.from(type, payload)
114 :ok = :gun.update_flow(conn, stream, @flow)
115
116 case message do
117 nil ->
118 Logger.warning("Received unrecognised event from firehose: #{inspect({type, payload})}")
119
120 message ->
121 Event.dispatch(message, options)
122 end
123
124 {:keep_state, data}
125 else
126 false ->
127 Logger.error("Got out of sequence or invalid `seq` from Firehose")
128 {:keep_state, data}
129
130 {%{"op" => @op_error, "t" => type}, payload} ->
131 Logger.error("Got error from Firehose: #{inspect({type, payload})}")
132 {:keep_state, data}
133
134 {:error, reason} ->
135 Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}")
136 {:keep_state, data}
137 end
138 end
139
140 def connected(:info, {:gun_ws, _conn, _stream, :close}, _data) do
141 Logger.info("Websocket closed, reason unknown")
142 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
143 end
144
145 def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, _data) do
146 Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}")
147 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
148 end
149
150 def connected(:info, {:gun_down, old_conn, _proto, _reason, _killed_streams}, %{conn: new_conn})
151 when old_conn != new_conn do
152 Logger.debug("Ignoring received :gun_down for a previous connection.")
153 :keep_state_and_data
154 end
155
156 def connected(:info, {:gun_down, _conn, _proto, _reason, _killed_streams}, _data) do
157 Logger.info("Websocket connection killed. Attempting to reconnect")
158 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
159 end
160
161 def connected(:internal, :reconnect, %{conn: conn} = data) do
162 :ok = :gun.close(conn)
163 :ok = :gun.flush(conn)
164
165 # TODO: reconnect backoff
166 {:next_state, :disconnected, %{data | conn: nil, stream: nil},
167 [{:next_event, :internal, :connect}]}
168 end
169end