forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/config"
16 "tangled.sh/tangled.sh/core/appview/db"
17 "tangled.sh/tangled.sh/core/appview/idresolver"
18 "tangled.sh/tangled.sh/core/appview/spindleverify"
19 "tangled.sh/tangled.sh/core/rbac"
20)
21
22type Ingester struct {
23 Db db.DbWrapper
24 Enforcer *rbac.Enforcer
25 IdResolver *idresolver.Resolver
26 Config *config.Config
27 Logger *slog.Logger
28}
29
30type processFunc func(ctx context.Context, e *models.Event) error
31
32func (i *Ingester) Ingest() processFunc {
33 return func(ctx context.Context, e *models.Event) error {
34 var err error
35 defer func() {
36 eventTime := e.TimeUS
37 lastTimeUs := eventTime + 1
38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
40 }
41 }()
42
43 if e.Kind != models.EventKindCommit {
44 return nil
45 }
46
47 switch e.Commit.Collection {
48 case tangled.GraphFollowNSID:
49 err = i.ingestFollow(e)
50 case tangled.FeedStarNSID:
51 err = i.ingestStar(e)
52 case tangled.PublicKeyNSID:
53 err = i.ingestPublicKey(e)
54 case tangled.RepoArtifactNSID:
55 err = i.ingestArtifact(e)
56 case tangled.ActorProfileNSID:
57 err = i.ingestProfile(e)
58 case tangled.SpindleMemberNSID:
59 err = i.ingestSpindleMember(e)
60 case tangled.SpindleNSID:
61 err = i.ingestSpindle(e)
62 }
63
64 if err != nil {
65 l := i.Logger.With("nsid", e.Commit.Collection)
66 l.Error("error ingesting record", "err", err)
67 }
68
69 return err
70 }
71}
72
73func (i *Ingester) ingestStar(e *models.Event) error {
74 var err error
75 did := e.Did
76
77 l := i.Logger.With("handler", "ingestStar")
78 l = l.With("nsid", e.Commit.Collection)
79
80 switch e.Commit.Operation {
81 case models.CommitOperationCreate, models.CommitOperationUpdate:
82 var subjectUri syntax.ATURI
83
84 raw := json.RawMessage(e.Commit.Record)
85 record := tangled.FeedStar{}
86 err := json.Unmarshal(raw, &record)
87 if err != nil {
88 l.Error("invalid record", "err", err)
89 return err
90 }
91
92 subjectUri, err = syntax.ParseATURI(record.Subject)
93 if err != nil {
94 l.Error("invalid record", "err", err)
95 return err
96 }
97 err = db.AddStar(i.Db, did, subjectUri, e.Commit.RKey)
98 case models.CommitOperationDelete:
99 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
100 }
101
102 if err != nil {
103 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
104 }
105
106 return nil
107}
108
109func (i *Ingester) ingestFollow(e *models.Event) error {
110 var err error
111 did := e.Did
112
113 l := i.Logger.With("handler", "ingestFollow")
114 l = l.With("nsid", e.Commit.Collection)
115
116 switch e.Commit.Operation {
117 case models.CommitOperationCreate, models.CommitOperationUpdate:
118 raw := json.RawMessage(e.Commit.Record)
119 record := tangled.GraphFollow{}
120 err = json.Unmarshal(raw, &record)
121 if err != nil {
122 l.Error("invalid record", "err", err)
123 return err
124 }
125
126 subjectDid := record.Subject
127 err = db.AddFollow(i.Db, did, subjectDid, e.Commit.RKey)
128 case models.CommitOperationDelete:
129 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
130 }
131
132 if err != nil {
133 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
134 }
135
136 return nil
137}
138
139func (i *Ingester) ingestPublicKey(e *models.Event) error {
140 did := e.Did
141 var err error
142
143 l := i.Logger.With("handler", "ingestPublicKey")
144 l = l.With("nsid", e.Commit.Collection)
145
146 switch e.Commit.Operation {
147 case models.CommitOperationCreate, models.CommitOperationUpdate:
148 l.Debug("processing add of pubkey")
149 raw := json.RawMessage(e.Commit.Record)
150 record := tangled.PublicKey{}
151 err = json.Unmarshal(raw, &record)
152 if err != nil {
153 l.Error("invalid record", "err", err)
154 return err
155 }
156
157 name := record.Name
158 key := record.Key
159 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
160 case models.CommitOperationDelete:
161 l.Debug("processing delete of pubkey")
162 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
163 }
164
165 if err != nil {
166 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
167 }
168
169 return nil
170}
171
172func (i *Ingester) ingestArtifact(e *models.Event) error {
173 did := e.Did
174 var err error
175
176 l := i.Logger.With("handler", "ingestArtifact")
177 l = l.With("nsid", e.Commit.Collection)
178
179 switch e.Commit.Operation {
180 case models.CommitOperationCreate, models.CommitOperationUpdate:
181 raw := json.RawMessage(e.Commit.Record)
182 record := tangled.RepoArtifact{}
183 err = json.Unmarshal(raw, &record)
184 if err != nil {
185 l.Error("invalid record", "err", err)
186 return err
187 }
188
189 repoAt, err := syntax.ParseATURI(record.Repo)
190 if err != nil {
191 return err
192 }
193
194 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
195 if err != nil {
196 return err
197 }
198
199 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
200 if err != nil || !ok {
201 return err
202 }
203
204 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
205 if err != nil {
206 createdAt = time.Now()
207 }
208
209 artifact := db.Artifact{
210 Did: did,
211 Rkey: e.Commit.RKey,
212 RepoAt: repoAt,
213 Tag: plumbing.Hash(record.Tag),
214 CreatedAt: createdAt,
215 BlobCid: cid.Cid(record.Artifact.Ref),
216 Name: record.Name,
217 Size: uint64(record.Artifact.Size),
218 MimeType: record.Artifact.MimeType,
219 }
220
221 err = db.AddArtifact(i.Db, artifact)
222 case models.CommitOperationDelete:
223 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
224 }
225
226 if err != nil {
227 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
228 }
229
230 return nil
231}
232
233func (i *Ingester) ingestProfile(e *models.Event) error {
234 did := e.Did
235 var err error
236
237 l := i.Logger.With("handler", "ingestProfile")
238 l = l.With("nsid", e.Commit.Collection)
239
240 if e.Commit.RKey != "self" {
241 return fmt.Errorf("ingestProfile only ingests `self` record")
242 }
243
244 switch e.Commit.Operation {
245 case models.CommitOperationCreate, models.CommitOperationUpdate:
246 raw := json.RawMessage(e.Commit.Record)
247 record := tangled.ActorProfile{}
248 err = json.Unmarshal(raw, &record)
249 if err != nil {
250 l.Error("invalid record", "err", err)
251 return err
252 }
253
254 description := ""
255 if record.Description != nil {
256 description = *record.Description
257 }
258
259 includeBluesky := record.Bluesky
260
261 location := ""
262 if record.Location != nil {
263 location = *record.Location
264 }
265
266 var links [5]string
267 for i, l := range record.Links {
268 if i < 5 {
269 links[i] = l
270 }
271 }
272
273 var stats [2]db.VanityStat
274 for i, s := range record.Stats {
275 if i < 2 {
276 stats[i].Kind = db.VanityStatKind(s)
277 }
278 }
279
280 var pinned [6]syntax.ATURI
281 for i, r := range record.PinnedRepositories {
282 if i < 6 {
283 pinned[i] = syntax.ATURI(r)
284 }
285 }
286
287 profile := db.Profile{
288 Did: did,
289 Description: description,
290 IncludeBluesky: includeBluesky,
291 Location: location,
292 Links: links,
293 Stats: stats,
294 PinnedRepos: pinned,
295 }
296
297 ddb, ok := i.Db.Execer.(*db.DB)
298 if !ok {
299 return fmt.Errorf("failed to index profile record, invalid db cast")
300 }
301
302 tx, err := ddb.Begin()
303 if err != nil {
304 return fmt.Errorf("failed to start transaction")
305 }
306
307 err = db.ValidateProfile(tx, &profile)
308 if err != nil {
309 return fmt.Errorf("invalid profile record")
310 }
311
312 err = db.UpsertProfile(tx, &profile)
313 case models.CommitOperationDelete:
314 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
315 }
316
317 if err != nil {
318 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
319 }
320
321 return nil
322}
323
324func (i *Ingester) ingestSpindleMember(e *models.Event) error {
325 did := e.Did
326 var err error
327
328 l := i.Logger.With("handler", "ingestSpindleMember")
329 l = l.With("nsid", e.Commit.Collection)
330
331 switch e.Commit.Operation {
332 case models.CommitOperationCreate:
333 raw := json.RawMessage(e.Commit.Record)
334 record := tangled.SpindleMember{}
335 err = json.Unmarshal(raw, &record)
336 if err != nil {
337 l.Error("invalid record", "err", err)
338 return err
339 }
340
341 // only spindle owner can invite to spindles
342 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
343 if err != nil || !ok {
344 return fmt.Errorf("failed to enforce permissions: %w", err)
345 }
346
347 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
348 if err != nil {
349 return err
350 }
351
352 if memberId.Handle.IsInvalidHandle() {
353 return err
354 }
355
356 ddb, ok := i.Db.Execer.(*db.DB)
357 if !ok {
358 return fmt.Errorf("failed to index profile record, invalid db cast")
359 }
360
361 err = db.AddSpindleMember(ddb, db.SpindleMember{
362 Did: syntax.DID(did),
363 Rkey: e.Commit.RKey,
364 Instance: record.Instance,
365 Subject: memberId.DID,
366 })
367 if !ok {
368 return fmt.Errorf("failed to add to db: %w", err)
369 }
370
371 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
372 if err != nil {
373 return fmt.Errorf("failed to update ACLs: %w", err)
374 }
375 case models.CommitOperationDelete:
376 rkey := e.Commit.RKey
377
378 ddb, ok := i.Db.Execer.(*db.DB)
379 if !ok {
380 return fmt.Errorf("failed to index profile record, invalid db cast")
381 }
382
383 // get record from db first
384 members, err := db.GetSpindleMembers(
385 ddb,
386 db.FilterEq("did", did),
387 db.FilterEq("rkey", rkey),
388 )
389 if err != nil || len(members) != 1 {
390 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
391 }
392 member := members[0]
393
394 tx, err := ddb.Begin()
395 if err != nil {
396 return fmt.Errorf("failed to start txn: %w", err)
397 }
398
399 // remove record by rkey && update enforcer
400 if err = db.RemoveSpindleMember(
401 tx,
402 db.FilterEq("did", did),
403 db.FilterEq("rkey", rkey),
404 ); err != nil {
405 return fmt.Errorf("failed to remove from db: %w", err)
406 }
407
408 // update enforcer
409 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
410 if err != nil {
411 return fmt.Errorf("failed to update ACLs: %w", err)
412 }
413
414 if err = tx.Commit(); err != nil {
415 return fmt.Errorf("failed to commit txn: %w", err)
416 }
417
418 if err = i.Enforcer.E.SavePolicy(); err != nil {
419 return fmt.Errorf("failed to save ACLs: %w", err)
420 }
421 }
422
423 return nil
424}
425
426func (i *Ingester) ingestSpindle(e *models.Event) error {
427 did := e.Did
428 var err error
429
430 l := i.Logger.With("handler", "ingestSpindle")
431 l = l.With("nsid", e.Commit.Collection)
432
433 switch e.Commit.Operation {
434 case models.CommitOperationCreate:
435 raw := json.RawMessage(e.Commit.Record)
436 record := tangled.Spindle{}
437 err = json.Unmarshal(raw, &record)
438 if err != nil {
439 l.Error("invalid record", "err", err)
440 return err
441 }
442
443 instance := e.Commit.RKey
444
445 ddb, ok := i.Db.Execer.(*db.DB)
446 if !ok {
447 return fmt.Errorf("failed to index profile record, invalid db cast")
448 }
449
450 err := db.AddSpindle(ddb, db.Spindle{
451 Owner: syntax.DID(did),
452 Instance: instance,
453 })
454 if err != nil {
455 l.Error("failed to add spindle to db", "err", err, "instance", instance)
456 return err
457 }
458
459 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev)
460 if err != nil {
461 l.Error("failed to add spindle to db", "err", err, "instance", instance)
462 return err
463 }
464
465 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did)
466 if err != nil {
467 return fmt.Errorf("failed to mark verified: %w", err)
468 }
469
470 return nil
471
472 case models.CommitOperationDelete:
473 instance := e.Commit.RKey
474
475 ddb, ok := i.Db.Execer.(*db.DB)
476 if !ok {
477 return fmt.Errorf("failed to index profile record, invalid db cast")
478 }
479
480 tx, err := ddb.Begin()
481 if err != nil {
482 return err
483 }
484 defer func() {
485 tx.Rollback()
486 i.Enforcer.E.LoadPolicy()
487 }()
488
489 err = db.DeleteSpindle(
490 tx,
491 db.FilterEq("owner", did),
492 db.FilterEq("instance", instance),
493 )
494 if err != nil {
495 return err
496 }
497
498 err = i.Enforcer.RemoveSpindle(instance)
499 if err != nil {
500 return err
501 }
502
503 err = tx.Commit()
504 if err != nil {
505 return err
506 }
507
508 err = i.Enforcer.E.SavePolicy()
509 if err != nil {
510 return err
511 }
512 }
513
514 return nil
515}