1defmodule Drinkup.Event do
2 require Logger
3 alias Drinkup.{Event, Options}
4
5 @type t() ::
6 Event.Commit.t()
7 | Event.Sync.t()
8 | Event.Identity.t()
9 | Event.Account.t()
10 | Event.Info.t()
11
12 @spec from(String.t(), map()) :: t() | nil
13 def from("#commit", payload), do: Event.Commit.from(payload)
14 def from("#sync", payload), do: Event.Sync.from(payload)
15 def from("#identity", payload), do: Event.Identity.from(payload)
16 def from("#account", payload), do: Event.Account.from(payload)
17 def from("#info", payload), do: Event.Info.from(payload)
18 def from(_type, _payload), do: nil
19
20 @spec valid_seq?(integer() | nil, any()) :: boolean()
21 def valid_seq?(nil, seq) when is_integer(seq), do: true
22 def valid_seq?(last_seq, nil) when is_integer(last_seq), do: true
23 def valid_seq?(last_seq, seq) when is_integer(last_seq) and is_integer(seq), do: seq > last_seq
24 def valid_seq?(_last_seq, _seq), do: false
25
26 @spec dispatch(t(), Options.t()) :: :ok
27 def dispatch(message, %Options{consumer: consumer, name: name}) do
28 supervisor_name = {:via, Registry, {Drinkup.Registry, {name, Tasks}}}
29
30 {:ok, _pid} =
31 Task.Supervisor.start_child(supervisor_name, fn ->
32 try do
33 consumer.handle_event(message)
34 rescue
35 e ->
36 Logger.error("Error in event handler: #{Exception.format(:error, e, __STACKTRACE__)}")
37 end
38 end)
39
40 :ok
41 end
42end