1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "time"
9
10 "tangled.sh/tangled.sh/core/api/tangled"
11 "tangled.sh/tangled.sh/core/eventconsumer"
12 "tangled.sh/tangled.sh/core/idresolver"
13 "tangled.sh/tangled.sh/core/rbac"
14 "tangled.sh/tangled.sh/core/spindle/db"
15
16 comatproto "github.com/bluesky-social/indigo/api/atproto"
17 "github.com/bluesky-social/indigo/atproto/identity"
18 "github.com/bluesky-social/indigo/atproto/syntax"
19 "github.com/bluesky-social/indigo/xrpc"
20 "github.com/bluesky-social/jetstream/pkg/models"
21 securejoin "github.com/cyphar/filepath-securejoin"
22)
23
24type Ingester func(ctx context.Context, e *models.Event) error
25
26func (s *Spindle) ingest() Ingester {
27 return func(ctx context.Context, e *models.Event) error {
28 var err error
29 defer func() {
30 eventTime := e.TimeUS
31 lastTimeUs := eventTime + 1
32 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil {
33 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
34 }
35 }()
36
37 if e.Kind != models.EventKindCommit {
38 return nil
39 }
40
41 switch e.Commit.Collection {
42 case tangled.SpindleMemberNSID:
43 err = s.ingestMember(ctx, e)
44 case tangled.RepoNSID:
45 err = s.ingestRepo(ctx, e)
46 case tangled.RepoCollaboratorNSID:
47 err = s.ingestCollaborator(ctx, e)
48 }
49
50 if err != nil {
51 s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err)
52 }
53
54 return nil
55 }
56}
57
58func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
59 var err error
60 did := e.Did
61 rkey := e.Commit.RKey
62
63 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
64
65 switch e.Commit.Operation {
66 case models.CommitOperationCreate, models.CommitOperationUpdate:
67 raw := e.Commit.Record
68 record := tangled.SpindleMember{}
69 err = json.Unmarshal(raw, &record)
70 if err != nil {
71 l.Error("invalid record", "error", err)
72 return err
73 }
74
75 domain := s.cfg.Server.Hostname
76 recordInstance := record.Instance
77
78 if recordInstance != domain {
79 l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
80 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
81 }
82
83 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
84 if err != nil || !ok {
85 l.Error("failed to add member", "did", did, "error", err)
86 return fmt.Errorf("failed to enforce permissions: %w", err)
87 }
88
89 if err := db.AddSpindleMember(s.db, db.SpindleMember{
90 Did: syntax.DID(did),
91 Rkey: rkey,
92 Instance: recordInstance,
93 Subject: syntax.DID(record.Subject),
94 Created: time.Now(),
95 }); err != nil {
96 l.Error("failed to add member", "error", err)
97 return fmt.Errorf("failed to add member: %w", err)
98 }
99
100 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil {
101 l.Error("failed to add member", "error", err)
102 return fmt.Errorf("failed to add member: %w", err)
103 }
104 l.Info("added member from firehose", "member", record.Subject)
105
106 if err := s.db.AddDid(record.Subject); err != nil {
107 l.Error("failed to add did", "error", err)
108 return fmt.Errorf("failed to add did: %w", err)
109 }
110 s.jc.AddDid(record.Subject)
111
112 return nil
113
114 case models.CommitOperationDelete:
115 record, err := db.GetSpindleMember(s.db, did, rkey)
116 if err != nil {
117 l.Error("failed to find member", "error", err)
118 return fmt.Errorf("failed to find member: %w", err)
119 }
120
121 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
122 l.Error("failed to remove member", "error", err)
123 return fmt.Errorf("failed to remove member: %w", err)
124 }
125
126 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil {
127 l.Error("failed to add member", "error", err)
128 return fmt.Errorf("failed to add member: %w", err)
129 }
130 l.Info("added member from firehose", "member", record.Subject)
131
132 if err := s.db.RemoveDid(record.Subject.String()); err != nil {
133 l.Error("failed to add did", "error", err)
134 return fmt.Errorf("failed to add did: %w", err)
135 }
136 s.jc.RemoveDid(record.Subject.String())
137
138 }
139 return nil
140}
141
142func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error {
143 var err error
144 did := e.Did
145 resolver := idresolver.DefaultResolver()
146
147 l := s.l.With("component", "ingester", "record", tangled.RepoNSID)
148
149 l.Info("ingesting repo record")
150
151 switch e.Commit.Operation {
152 case models.CommitOperationCreate, models.CommitOperationUpdate:
153 raw := e.Commit.Record
154 record := tangled.Repo{}
155 err = json.Unmarshal(raw, &record)
156 if err != nil {
157 l.Error("invalid record", "error", err)
158 return err
159 }
160
161 domain := s.cfg.Server.Hostname
162
163 // no spindle configured for this repo
164 if record.Spindle == nil {
165 l.Info("no spindle configured", "did", record.Owner, "name", record.Name)
166 return nil
167 }
168
169 // this repo did not want this spindle
170 if *record.Spindle != domain {
171 l.Info("different spindle configured", "did", record.Owner, "name", record.Name, "spindle", *record.Spindle, "domain", domain)
172 return nil
173 }
174
175 // add this repo to the watch list
176 if err := s.db.AddRepo(record.Knot, record.Owner, record.Name); err != nil {
177 l.Error("failed to add repo", "error", err)
178 return fmt.Errorf("failed to add repo: %w", err)
179 }
180
181 didSlashRepo, err := securejoin.SecureJoin(record.Owner, record.Name)
182 if err != nil {
183 return err
184 }
185
186 // add repo to rbac
187 if err := s.e.AddRepo(record.Owner, rbac.ThisServer, didSlashRepo); err != nil {
188 l.Error("failed to add repo to enforcer", "error", err)
189 return fmt.Errorf("failed to add repo: %w", err)
190 }
191
192 // add collaborators to rbac
193 owner, err := resolver.ResolveIdent(ctx, did)
194 if err != nil || owner.Handle.IsInvalidHandle() {
195 return err
196 }
197 if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil {
198 return err
199 }
200
201 // add this knot to the event consumer
202 src := eventconsumer.NewKnotSource(record.Knot)
203 s.ks.AddSource(context.Background(), src)
204
205 return nil
206
207 }
208 return nil
209}
210
211func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error {
212 var err error
213
214 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did)
215
216 l.Info("ingesting collaborator record")
217
218 switch e.Commit.Operation {
219 case models.CommitOperationCreate, models.CommitOperationUpdate:
220 raw := e.Commit.Record
221 record := tangled.RepoCollaborator{}
222 err = json.Unmarshal(raw, &record)
223 if err != nil {
224 l.Error("invalid record", "error", err)
225 return err
226 }
227
228 resolver := idresolver.DefaultResolver()
229
230 subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
231 if err != nil || subjectId.Handle.IsInvalidHandle() {
232 return err
233 }
234
235 repoAt, err := syntax.ParseATURI(record.Repo)
236 if err != nil {
237 l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo)
238 return nil
239 }
240
241 // TODO: get rid of this entirely
242 // resolve this aturi to extract the repo record
243 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
244 if err != nil || owner.Handle.IsInvalidHandle() {
245 return fmt.Errorf("failed to resolve handle: %w", err)
246 }
247
248 xrpcc := xrpc.Client{
249 Host: owner.PDSEndpoint(),
250 }
251
252 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
253 if err != nil {
254 return err
255 }
256
257 repo := resp.Value.Val.(*tangled.Repo)
258 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
259
260 // check perms for this user
261 if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
262 return fmt.Errorf("insufficient permissions: %w", err)
263 }
264
265 // add collaborator to rbac
266 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
267 l.Error("failed to add repo to enforcer", "error", err)
268 return fmt.Errorf("failed to add repo: %w", err)
269 }
270
271 return nil
272 }
273 return nil
274}
275
276func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error {
277 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators")
278
279 l.Info("fetching and adding existing collaborators")
280
281 xrpcc := xrpc.Client{
282 Host: owner.PDSEndpoint(),
283 }
284
285 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false)
286 if err != nil {
287 return err
288 }
289
290 var errs error
291 for _, r := range resp.Records {
292 if r == nil {
293 continue
294 }
295 record := r.Value.Val.(*tangled.RepoCollaborator)
296
297 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
298 l.Error("failed to add repo to enforcer", "error", err)
299 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err))
300 }
301 }
302
303 return errs
304}