Elixir ATProtocol firehose & subscription listener
at main 2.3 kB view raw
1defmodule Drinkup.RecordConsumer do 2 @moduledoc """ 3 An opinionated consumer of the Firehose that eats consumers 4 """ 5 6 @callback handle_create(any()) :: any() 7 @callback handle_update(any()) :: any() 8 @callback handle_delete(any()) :: any() 9 10 defmacro __using__(opts) do 11 {collections, _opts} = Keyword.pop(opts, :collections, []) 12 13 quote location: :keep do 14 @behaviour Drinkup.Consumer 15 @behaviour Drinkup.RecordConsumer 16 17 def handle_event(%Drinkup.Event.Commit{} = event) do 18 event.ops 19 |> Enum.filter(fn %{path: path} -> 20 path |> String.split("/") |> Enum.at(0) |> matches_collections?() 21 end) 22 |> Enum.map(&Drinkup.RecordConsumer.Record.from(&1, event.repo)) 23 |> Enum.each(&apply(__MODULE__, :"handle_#{&1.action}", [&1])) 24 end 25 26 def handle_event(_event), do: :noop 27 28 unquote( 29 if collections == [] do 30 quote do 31 def matches_collections?(_type), do: true 32 end 33 else 34 quote do 35 def matches_collections?(nil), do: false 36 37 def matches_collections?(type) when is_binary(type), 38 do: 39 Enum.any?(unquote(collections), fn 40 matcher when is_binary(matcher) -> type == matcher 41 matcher -> Regex.match?(matcher, type) 42 end) 43 end 44 end 45 ) 46 47 @impl true 48 def handle_create(_record), do: nil 49 @impl true 50 def handle_update(_record), do: nil 51 @impl true 52 def handle_delete(_record), do: nil 53 54 defoverridable handle_create: 1, handle_update: 1, handle_delete: 1 55 end 56 end 57 58 defmodule Record do 59 alias Drinkup.Event.Commit.RepoOp 60 use TypedStruct 61 62 typedstruct do 63 field :type, String.t() 64 field :rkey, String.t() 65 field :did, String.t() 66 field :action, :create | :update | :delete 67 field :cid, binary() | nil 68 field :record, map() | nil 69 end 70 71 @spec from(RepoOp.t(), String.t()) :: t() 72 def from(%RepoOp{action: action, path: path, cid: cid, record: record}, did) do 73 [type, rkey] = String.split(path, "/") 74 75 %__MODULE__{ 76 type: type, 77 rkey: rkey, 78 did: did, 79 action: action, 80 cid: cid, 81 record: record 82 } 83 end 84 end 85end