1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7
8 "tangled.sh/tangled.sh/core/api/tangled"
9 "tangled.sh/tangled.sh/core/eventconsumer"
10
11 "github.com/bluesky-social/jetstream/pkg/models"
12)
13
14type Ingester func(ctx context.Context, e *models.Event) error
15
16func (s *Spindle) ingest() Ingester {
17 return func(ctx context.Context, e *models.Event) error {
18 var err error
19 defer func() {
20 eventTime := e.TimeUS
21 lastTimeUs := eventTime + 1
22 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil {
23 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
24 }
25 }()
26
27 if e.Kind != models.EventKindCommit {
28 return nil
29 }
30
31 switch e.Commit.Collection {
32 case tangled.SpindleMemberNSID:
33 s.ingestMember(ctx, e)
34 case tangled.RepoNSID:
35 s.ingestRepo(ctx, e)
36 }
37
38 return err
39 }
40}
41
42func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
43 did := e.Did
44 var err error
45
46 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
47
48 switch e.Commit.Operation {
49 case models.CommitOperationCreate, models.CommitOperationUpdate:
50 raw := e.Commit.Record
51 record := tangled.SpindleMember{}
52 err = json.Unmarshal(raw, &record)
53 if err != nil {
54 l.Error("invalid record", "error", err)
55 return err
56 }
57
58 domain := s.cfg.Server.Hostname
59 if s.cfg.Server.Dev {
60 domain = s.cfg.Server.ListenAddr
61 }
62 recordInstance := record.Instance
63
64 if recordInstance != domain {
65 l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
66 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
67 }
68
69 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
70 if err != nil || !ok {
71 l.Error("failed to add member", "did", did, "error", err)
72 return fmt.Errorf("failed to enforce permissions: %w", err)
73 }
74
75 if err := s.e.AddKnotMember(rbacDomain, record.Subject); err != nil {
76 l.Error("failed to add member", "error", err)
77 return fmt.Errorf("failed to add member: %w", err)
78 }
79 l.Info("added member from firehose", "member", record.Subject)
80
81 if err := s.db.AddDid(record.Subject); err != nil {
82 l.Error("failed to add did", "error", err)
83 return fmt.Errorf("failed to add did: %w", err)
84 }
85 s.jc.AddDid(record.Subject)
86
87 return nil
88
89 }
90 return nil
91}
92
93func (s *Spindle) ingestRepo(_ context.Context, e *models.Event) error {
94 var err error
95
96 l := s.l.With("component", "ingester", "record", tangled.RepoNSID)
97
98 l.Info("ingesting repo record")
99
100 switch e.Commit.Operation {
101 case models.CommitOperationCreate, models.CommitOperationUpdate:
102 raw := e.Commit.Record
103 record := tangled.Repo{}
104 err = json.Unmarshal(raw, &record)
105 if err != nil {
106 l.Error("invalid record", "error", err)
107 return err
108 }
109
110 domain := s.cfg.Server.Hostname
111
112 // no spindle configured for this repo
113 if record.Spindle == nil {
114 l.Info("no spindle configured", "did", record.Owner, "name", record.Name)
115 return nil
116 }
117
118 // this repo did not want this spindle
119 if *record.Spindle != domain {
120 l.Info("different spindle configured", "did", record.Owner, "name", record.Name, "spindle", *record.Spindle, "domain", domain)
121 return nil
122 }
123
124 // add this repo to the watch list
125 if err := s.db.AddRepo(record.Knot, record.Owner, record.Name); err != nil {
126 l.Error("failed to add repo", "error", err)
127 return fmt.Errorf("failed to add repo: %w", err)
128 }
129
130 // add this knot to the event consumer
131 src := eventconsumer.NewKnotSource(record.Knot)
132 s.ks.AddSource(context.Background(), src)
133
134 return nil
135
136 }
137 return nil
138}