1defmodule Drinkup.Socket do
2 @moduledoc """
3 gen_statem process for managing the websocket connection to an ATProto relay.
4 """
5
6 require Logger
7 alias Drinkup.Event
8
9 @behaviour :gen_statem
10 @default_host "https://bsky.network"
11 @timeout :timer.seconds(5)
12 # TODO: `flow` determines messages in buffer. Determine ideal value?
13 @flow 10
14
15 @op_regular 1
16 @op_error -1
17
18 defstruct [:host, :seq, :conn, :stream]
19
20 @impl true
21 def callback_mode, do: [:state_functions, :state_enter]
22
23 def start_link(opts \\ []) do
24 opts = Keyword.validate!(opts, host: @default_host)
25 host = Keyword.get(opts, :host)
26 cursor = Keyword.get(opts, :cursor)
27
28 :gen_statem.start_link(__MODULE__, {host, cursor}, [])
29 end
30
31 @impl true
32 def init({host, cursor}) do
33 data = %__MODULE__{host: host, seq: cursor}
34 {:ok, :disconnected, data, [{:next_event, :internal, :connect}]}
35 end
36
37 def disconnected(:enter, _from, data) do
38 Logger.debug("Initial connection")
39 # TODO: differentiate between initial & reconnects, probably stuff to do with seq
40 {:next_state, :disconnected, data}
41 end
42
43 def disconnected(:internal, :connect, data) do
44 {:next_state, :connecting_http, data}
45 end
46
47 def connecting_http(:enter, _from, data) do
48 Logger.debug("Connecting to http")
49
50 %{host: host, port: port} = URI.new!(data.host)
51
52 {:ok, conn} =
53 :gun.open(:binary.bin_to_list(host), port, %{
54 retry: 0,
55 protocols: [:http],
56 connect_timeout: @timeout,
57 domain_lookup_timeout: @timeout,
58 tls_handshake_timeout: @timeout,
59 tls_opts: [
60 verify: :verify_peer,
61 cacerts: :certifi.cacerts(),
62 depth: 3,
63 customize_hostname_check: [
64 match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
65 ]
66 ]
67 })
68
69 {:keep_state, %{data | conn: conn}, [{:state_timeout, @timeout, :connect_timeout}]}
70 end
71
72 def connecting_http(:info, {:gun_up, _conn, :http}, data) do
73 {:next_state, :connecting_ws, data}
74 end
75
76 def connecting_http(:state_timeout, :connect_timeout, _data) do
77 {:stop, :connect_http_timeout}
78 end
79
80 def connecting_ws(:enter, _from, %{conn: conn, seq: seq} = data) do
81 Logger.debug("Upgrading connection to websocket")
82 path = "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(%{cursor: seq})
83 stream = :gun.ws_upgrade(conn, path, [], %{flow: @flow})
84 {:keep_state, %{data | stream: stream}, [{:state_timeout, @timeout, :upgrade_timeout}]}
85 end
86
87 def connecting_ws(:info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, data) do
88 {:next_state, :connected, data}
89 end
90
91 def connecting_ws(:state_timeout, :upgrade_timeout, _data) do
92 {:stop, :connect_ws_timeout}
93 end
94
95 def connected(:enter, _from, _data) do
96 Logger.debug("Connected to websocket")
97 :keep_state_and_data
98 end
99
100 def connected(:info, {:gun_ws, conn, stream, {:binary, frame}}, data) do
101 # TODO: let clients specify a handler for raw* (*decoded) packets to support any atproto subscription
102 # Will also need support for JSON frames
103 with {:ok, header, next} <- CAR.DagCbor.decode(frame),
104 {:ok, payload, _} <- CAR.DagCbor.decode(next),
105 {%{"op" => @op_regular, "t" => type}, _} <- {header, payload},
106 true <- type == "#info" || Event.valid_seq?(data.seq, payload["seq"]),
107 data <- %{data | seq: payload["seq"] || data.seq},
108 message <-
109 Event.from(type, payload) do
110 :ok = :gun.update_flow(conn, stream, @flow)
111
112 case message do
113 %Event.Commit{} = commit ->
114 IO.inspect(commit.ops, label: commit.repo)
115
116 msg ->
117 IO.inspect(msg)
118 end
119
120 {:keep_state, data}
121 else
122 false ->
123 Logger.error("Got out of sequence or invalid `seq` from Firehose")
124 {:keep_state, data}
125
126 {%{"op" => @op_error, "t" => type}, payload} ->
127 Logger.error("Got error from Firehose: #{inspect({type, payload})}")
128 {:keep_state, data}
129
130 {:error, reason} ->
131 Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}")
132 {:keep_state, data}
133 end
134 end
135
136 def connected(:info, {:gun_ws, _conn, _stream, :close}, _data) do
137 Logger.info("Websocket closed, reason unknown")
138 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
139 end
140
141 def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, _data) do
142 Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}")
143 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
144 end
145
146 def connected(:info, {:gun_down, old_conn, _proto, _reason, _killed_streams}, %{conn: new_conn})
147 when old_conn != new_conn do
148 Logger.debug("Ignoring received :gun_down for a previous connection.")
149 :keep_state_and_data
150 end
151
152 def connected(:info, {:gun_down, _conn, _proto, _reason, _killed_streams}, _data) do
153 Logger.info("Websocket connection killed. Attempting to reconnect")
154 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
155 end
156
157 def connected(:internal, :reconnect, %{conn: conn} = data) do
158 :ok = :gun.close(conn)
159 :ok = :gun.flush(conn)
160
161 # TODO: reconnect backoff
162 {:next_state, :disconnected, %{data | conn: nil, stream: nil},
163 [{:next_event, :internal, :connect}]}
164 end
165end