Elixir ATProtocol firehose & subscription listener
1defmodule Drinkup.Consumer do 2 @moduledoc """ 3 An unopinionated consumer of the Firehose. Will receive all events, not just commits. 4 """ 5 6 alias Drinkup.{ConsumerGroup, Event} 7 8 @callback handle_event(Event.t()) :: any() 9 10 defmacro __using__(_opts) do 11 quote location: :keep do 12 use GenServer 13 require Logger 14 15 @behaviour Drinkup.Consumer 16 17 def child_spec(opts) do 18 %{ 19 id: __MODULE__, 20 start: {__MODULE__, :start_link, [opts]}, 21 type: :worker, 22 restart: :permanent, 23 max_restarts: 0, 24 shutdown: 500 25 } 26 end 27 28 def start_link(opts) do 29 GenServer.start_link(__MODULE__, [], opts) 30 end 31 32 @impl GenServer 33 def init(_) do 34 ConsumerGroup.join() 35 {:ok, nil} 36 end 37 38 @impl GenServer 39 def handle_info({:event, event}, state) do 40 {:ok, _pid} = 41 Task.start(fn -> 42 try do 43 __MODULE__.handle_event(event) 44 rescue 45 e -> 46 Logger.error( 47 "Error in event handler: #{Exception.format(:error, e, __STACKTRACE__)}" 48 ) 49 end 50 end) 51 52 {:noreply, state} 53 end 54 55 defoverridable GenServer 56 end 57 end 58end