1defmodule Drinkup.Consumer do
2 @moduledoc """
3 An unopinionated consumer of the Firehose. Will receive all events, not just commits.
4 """
5
6 alias Drinkup.{ConsumerGroup, Event}
7
8 @callback handle_event(Event.t()) :: any()
9
10 defmacro __using__(_opts) do
11 quote location: :keep do
12 use GenServer
13 require Logger
14
15 @behaviour Drinkup.Consumer
16
17 def child_spec(opts) do
18 %{
19 id: __MODULE__,
20 start: {__MODULE__, :start_link, [opts]},
21 type: :worker,
22 restart: :permanent,
23 max_restarts: 0,
24 shutdown: 500
25 }
26 end
27
28 def start_link(opts) do
29 GenServer.start_link(__MODULE__, [], opts)
30 end
31
32 @impl GenServer
33 def init(_) do
34 ConsumerGroup.join()
35 {:ok, nil}
36 end
37
38 @impl GenServer
39 def handle_info({:event, event}, state) do
40 {:ok, _pid} =
41 Task.start(fn ->
42 try do
43 __MODULE__.handle_event(event)
44 rescue
45 e ->
46 Logger.error(
47 "Error in event handler: #{Exception.format(:error, e, __STACKTRACE__)}"
48 )
49 end
50 end)
51
52 {:noreply, state}
53 end
54
55 defoverridable GenServer
56 end
57 end
58end