Elixir ATProtocol firehose & subscription listener
at v0.1.0 1.4 kB view raw
1defmodule Drinkup.Event do 2 require Logger 3 alias Drinkup.{Event, Options} 4 5 @type t() :: 6 Event.Commit.t() 7 | Event.Sync.t() 8 | Event.Identity.t() 9 | Event.Account.t() 10 | Event.Info.t() 11 12 @spec from(String.t(), map()) :: t() | nil 13 def from("#commit", payload), do: Event.Commit.from(payload) 14 def from("#sync", payload), do: Event.Sync.from(payload) 15 def from("#identity", payload), do: Event.Identity.from(payload) 16 def from("#account", payload), do: Event.Account.from(payload) 17 def from("#info", payload), do: Event.Info.from(payload) 18 def from(_type, _payload), do: nil 19 20 @spec valid_seq?(integer() | nil, any()) :: boolean() 21 def valid_seq?(nil, seq) when is_integer(seq), do: true 22 def valid_seq?(last_seq, nil) when is_integer(last_seq), do: true 23 def valid_seq?(last_seq, seq) when is_integer(last_seq) and is_integer(seq), do: seq > last_seq 24 def valid_seq?(_last_seq, _seq), do: false 25 26 @spec dispatch(t(), Options.t()) :: :ok 27 def dispatch(message, %Options{consumer: consumer, name: name}) do 28 supervisor_name = {:via, Registry, {Drinkup.Registry, {name, Tasks}}} 29 30 {:ok, _pid} = 31 Task.Supervisor.start_child(supervisor_name, fn -> 32 try do 33 consumer.handle_event(message) 34 rescue 35 e -> 36 Logger.error("Error in event handler: #{Exception.format(:error, e, __STACKTRACE__)}") 37 end 38 end) 39 40 :ok 41 end 42end