Elixir ATProtocol firehose & subscription listener
at v0.1.0 2.5 kB view raw
1defmodule Drinkup.Event.Commit do 2 @moduledoc """ 3 Struct for commit events from the ATProto Firehose. 4 """ 5 6 # TODO: see atp specs 7 @type tid() :: String.t() 8 9 alias __MODULE__.RepoOp 10 use TypedStruct 11 12 typedstruct enforce: true do 13 field :seq, integer() 14 # DEPCREATED 15 field :rebase, bool() 16 # DEPRECATED 17 field :too_big, bool() 18 field :repo, String.t() 19 field :commit, binary() 20 field :rev, tid() 21 field :since, tid() | nil 22 field :blocks, CAR.Archive.t() 23 field :ops, list(RepoOp.t()) 24 # DEPRECATED 25 field :blobs, list(binary()) 26 field :prev_data, binary(), enforce: nil 27 field :time, NaiveDateTime.t() 28 end 29 30 @spec from(map()) :: t() 31 def from( 32 %{ 33 "seq" => seq, 34 "rebase" => rebase, 35 "tooBig" => too_big, 36 "repo" => repo, 37 "commit" => commit, 38 "rev" => rev, 39 "since" => since, 40 "blocks" => %CBOR.Tag{value: blocks}, 41 "ops" => ops, 42 "blobs" => blobs, 43 "time" => time 44 } = msg 45 ) do 46 prev_data = 47 Map.get(msg, "prevData") 48 49 time = NaiveDateTime.from_iso8601!(time) 50 {:ok, blocks} = CAR.decode(blocks) 51 52 %__MODULE__{ 53 seq: seq, 54 rebase: rebase, 55 too_big: too_big, 56 repo: repo, 57 commit: commit, 58 rev: rev, 59 since: since, 60 blocks: blocks, 61 ops: Enum.map(ops, &RepoOp.from(&1, blocks)), 62 blobs: blobs, 63 prev_data: prev_data, 64 time: time 65 } 66 end 67 68 defmodule RepoOp do 69 typedstruct enforce: true do 70 @type action() :: :create | :update | :delete | String.t() 71 72 field :action, action() 73 field :path, String.t() 74 field :cid, binary() 75 field :prev, binary(), enforce: false 76 field :record, map() | nil 77 end 78 79 @spec from(map(), CAR.Archive.t()) :: t() 80 def from(%{"action" => action, "path" => path, "cid" => cid} = op, %CAR.Archive{} = blocks) do 81 prev = Map.get(op, "prev") 82 record = CAR.Archive.get_block(blocks, cid) 83 84 %__MODULE__{ 85 action: recognise_action(action), 86 path: path, 87 cid: cid, 88 prev: prev, 89 record: record 90 } 91 end 92 93 @spec recognise_action(String.t()) :: action() 94 defp recognise_action(action) when action in ["create", "update", "delete"], 95 do: String.to_atom(action) 96 97 defp recognise_action(action) when is_binary(action), do: action 98 defp recognise_action(nil), do: nil 99 end 100end