1defmodule Drinkup.Socket do
2 @moduledoc """
3 gen_statem process for managing the websocket connection to an ATProto relay.
4 """
5
6 require Logger
7 alias Drinkup.Event
8
9 @behaviour :gen_statem
10 @default_host "https://bsky.network"
11 @timeout :timer.seconds(5)
12 # TODO: `flow` determines messages in buffer. Determine ideal value?
13 @flow 10
14
15 @op_regular 1
16 @op_error -1
17
18 defstruct [:options, :seq, :conn, :stream]
19
20 @impl true
21 def callback_mode, do: [:state_functions, :state_enter]
22
23 def child_spec(opts) do
24 %{
25 id: __MODULE__,
26 start: {__MODULE__, :start_link, [opts, []]},
27 type: :worker,
28 restart: :permanent,
29 shutdown: 500
30 }
31 end
32
33 def start_link(%{consumer: _} = options, statem_opts) do
34 options = Map.merge(%{host: @default_host, cursor: nil}, options)
35
36 :gen_statem.start_link(__MODULE__, options, statem_opts)
37 end
38
39 @impl true
40 def init(%{cursor: seq} = options) do
41 data = %__MODULE__{seq: seq, options: options}
42 {:ok, :disconnected, data, [{:next_event, :internal, :connect}]}
43 end
44
45 def disconnected(:enter, _from, data) do
46 Logger.debug("Initial connection")
47 # TODO: differentiate between initial & reconnects, probably stuff to do with seq
48 {:next_state, :disconnected, data}
49 end
50
51 def disconnected(:internal, :connect, data) do
52 {:next_state, :connecting_http, data}
53 end
54
55 def connecting_http(:enter, _from, %{options: options} = data) do
56 Logger.debug("Connecting to http")
57
58 %{host: host, port: port} = URI.new!(options.host)
59
60 {:ok, conn} =
61 :gun.open(:binary.bin_to_list(host), port, %{
62 retry: 0,
63 protocols: [:http],
64 connect_timeout: @timeout,
65 domain_lookup_timeout: @timeout,
66 tls_handshake_timeout: @timeout,
67 tls_opts: [
68 verify: :verify_peer,
69 cacerts: :certifi.cacerts(),
70 depth: 3,
71 customize_hostname_check: [
72 match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
73 ]
74 ]
75 })
76
77 {:keep_state, %{data | conn: conn}, [{:state_timeout, @timeout, :connect_timeout}]}
78 end
79
80 def connecting_http(:info, {:gun_up, _conn, :http}, data) do
81 {:next_state, :connecting_ws, data}
82 end
83
84 def connecting_http(:state_timeout, :connect_timeout, _data) do
85 {:stop, :connect_http_timeout}
86 end
87
88 def connecting_ws(:enter, _from, %{conn: conn, seq: seq} = data) do
89 Logger.debug("Upgrading connection to websocket")
90 path = "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(%{cursor: seq})
91 stream = :gun.ws_upgrade(conn, path, [], %{flow: @flow})
92 {:keep_state, %{data | stream: stream}, [{:state_timeout, @timeout, :upgrade_timeout}]}
93 end
94
95 def connecting_ws(:info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, data) do
96 {:next_state, :connected, data}
97 end
98
99 def connecting_ws(:state_timeout, :upgrade_timeout, _data) do
100 {:stop, :connect_ws_timeout}
101 end
102
103 def connected(:enter, _from, _data) do
104 Logger.debug("Connected to websocket")
105 :keep_state_and_data
106 end
107
108 def connected(:info, {:gun_ws, conn, stream, {:binary, frame}}, %{options: options} = data) do
109 # TODO: let clients specify a handler for raw* (*decoded) packets to support any atproto subscription
110 # Will also need support for JSON frames
111 with {:ok, header, next} <- CAR.DagCbor.decode(frame),
112 {:ok, payload, _} <- CAR.DagCbor.decode(next),
113 {%{"op" => @op_regular, "t" => type}, _} <- {header, payload},
114 true <- Event.valid_seq?(data.seq, payload["seq"]) do
115 data = %{data | seq: payload["seq"] || data.seq}
116 message = Event.from(type, payload)
117 :ok = :gun.update_flow(conn, stream, @flow)
118
119 case message do
120 nil ->
121 Logger.warning("Received unrecognised event from firehose: #{inspect({type, payload})}")
122
123 message ->
124 Event.dispatch(options.consumer, message)
125 end
126
127 {:keep_state, data}
128 else
129 false ->
130 Logger.error("Got out of sequence or invalid `seq` from Firehose")
131 {:keep_state, data}
132
133 {%{"op" => @op_error, "t" => type}, payload} ->
134 Logger.error("Got error from Firehose: #{inspect({type, payload})}")
135 {:keep_state, data}
136
137 {:error, reason} ->
138 Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}")
139 {:keep_state, data}
140 end
141 end
142
143 def connected(:info, {:gun_ws, _conn, _stream, :close}, _data) do
144 Logger.info("Websocket closed, reason unknown")
145 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
146 end
147
148 def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, _data) do
149 Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}")
150 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
151 end
152
153 def connected(:info, {:gun_down, old_conn, _proto, _reason, _killed_streams}, %{conn: new_conn})
154 when old_conn != new_conn do
155 Logger.debug("Ignoring received :gun_down for a previous connection.")
156 :keep_state_and_data
157 end
158
159 def connected(:info, {:gun_down, _conn, _proto, _reason, _killed_streams}, _data) do
160 Logger.info("Websocket connection killed. Attempting to reconnect")
161 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
162 end
163
164 def connected(:internal, :reconnect, %{conn: conn} = data) do
165 :ok = :gun.close(conn)
166 :ok = :gun.flush(conn)
167
168 # TODO: reconnect backoff
169 {:next_state, :disconnected, %{data | conn: nil, stream: nil},
170 [{:next_event, :internal, :connect}]}
171 end
172end