forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/config"
16 "tangled.sh/tangled.sh/core/appview/db"
17 "tangled.sh/tangled.sh/core/appview/serververify"
18 "tangled.sh/tangled.sh/core/idresolver"
19 "tangled.sh/tangled.sh/core/rbac"
20)
21
22type Ingester struct {
23 Db db.DbWrapper
24 Enforcer *rbac.Enforcer
25 IdResolver *idresolver.Resolver
26 Config *config.Config
27 Logger *slog.Logger
28}
29
30type processFunc func(ctx context.Context, e *models.Event) error
31
32func (i *Ingester) Ingest() processFunc {
33 return func(ctx context.Context, e *models.Event) error {
34 var err error
35 defer func() {
36 eventTime := e.TimeUS
37 lastTimeUs := eventTime + 1
38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
40 }
41 }()
42
43 l := i.Logger.With("kind", e.Kind)
44 switch e.Kind {
45 case models.EventKindAccount:
46 if !e.Account.Active && *e.Account.Status == "deactivated" {
47 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
48 }
49 case models.EventKindIdentity:
50 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
51 case models.EventKindCommit:
52 switch e.Commit.Collection {
53 case tangled.GraphFollowNSID:
54 err = i.ingestFollow(e)
55 case tangled.FeedStarNSID:
56 err = i.ingestStar(e)
57 case tangled.PublicKeyNSID:
58 err = i.ingestPublicKey(e)
59 case tangled.RepoArtifactNSID:
60 err = i.ingestArtifact(e)
61 case tangled.ActorProfileNSID:
62 err = i.ingestProfile(e)
63 case tangled.SpindleMemberNSID:
64 err = i.ingestSpindleMember(e)
65 case tangled.SpindleNSID:
66 err = i.ingestSpindle(e)
67 case tangled.KnotMemberNSID:
68 err = i.ingestKnotMember(e)
69 case tangled.KnotNSID:
70 err = i.ingestKnot(e)
71 case tangled.StringNSID:
72 err = i.ingestString(e)
73 }
74 l = i.Logger.With("nsid", e.Commit.Collection)
75 }
76
77 if err != nil {
78 l.Debug("error ingesting record", "err", err)
79 }
80
81 return nil
82 }
83}
84
85func (i *Ingester) ingestStar(e *models.Event) error {
86 var err error
87 did := e.Did
88
89 l := i.Logger.With("handler", "ingestStar")
90 l = l.With("nsid", e.Commit.Collection)
91
92 switch e.Commit.Operation {
93 case models.CommitOperationCreate, models.CommitOperationUpdate:
94 var subjectUri syntax.ATURI
95
96 raw := json.RawMessage(e.Commit.Record)
97 record := tangled.FeedStar{}
98 err := json.Unmarshal(raw, &record)
99 if err != nil {
100 l.Error("invalid record", "err", err)
101 return err
102 }
103
104 subjectUri, err = syntax.ParseATURI(record.Subject)
105 if err != nil {
106 l.Error("invalid record", "err", err)
107 return err
108 }
109 err = db.AddStar(i.Db, &db.Star{
110 StarredByDid: did,
111 RepoAt: subjectUri,
112 Rkey: e.Commit.RKey,
113 })
114 case models.CommitOperationDelete:
115 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
116 }
117
118 if err != nil {
119 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
120 }
121
122 return nil
123}
124
125func (i *Ingester) ingestFollow(e *models.Event) error {
126 var err error
127 did := e.Did
128
129 l := i.Logger.With("handler", "ingestFollow")
130 l = l.With("nsid", e.Commit.Collection)
131
132 switch e.Commit.Operation {
133 case models.CommitOperationCreate, models.CommitOperationUpdate:
134 raw := json.RawMessage(e.Commit.Record)
135 record := tangled.GraphFollow{}
136 err = json.Unmarshal(raw, &record)
137 if err != nil {
138 l.Error("invalid record", "err", err)
139 return err
140 }
141
142 err = db.AddFollow(i.Db, &db.Follow{
143 UserDid: did,
144 SubjectDid: record.Subject,
145 Rkey: e.Commit.RKey,
146 })
147 case models.CommitOperationDelete:
148 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
149 }
150
151 if err != nil {
152 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
153 }
154
155 return nil
156}
157
158func (i *Ingester) ingestPublicKey(e *models.Event) error {
159 did := e.Did
160 var err error
161
162 l := i.Logger.With("handler", "ingestPublicKey")
163 l = l.With("nsid", e.Commit.Collection)
164
165 switch e.Commit.Operation {
166 case models.CommitOperationCreate, models.CommitOperationUpdate:
167 l.Debug("processing add of pubkey")
168 raw := json.RawMessage(e.Commit.Record)
169 record := tangled.PublicKey{}
170 err = json.Unmarshal(raw, &record)
171 if err != nil {
172 l.Error("invalid record", "err", err)
173 return err
174 }
175
176 name := record.Name
177 key := record.Key
178 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
179 case models.CommitOperationDelete:
180 l.Debug("processing delete of pubkey")
181 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
182 }
183
184 if err != nil {
185 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
186 }
187
188 return nil
189}
190
191func (i *Ingester) ingestArtifact(e *models.Event) error {
192 did := e.Did
193 var err error
194
195 l := i.Logger.With("handler", "ingestArtifact")
196 l = l.With("nsid", e.Commit.Collection)
197
198 switch e.Commit.Operation {
199 case models.CommitOperationCreate, models.CommitOperationUpdate:
200 raw := json.RawMessage(e.Commit.Record)
201 record := tangled.RepoArtifact{}
202 err = json.Unmarshal(raw, &record)
203 if err != nil {
204 l.Error("invalid record", "err", err)
205 return err
206 }
207
208 repoAt, err := syntax.ParseATURI(record.Repo)
209 if err != nil {
210 return err
211 }
212
213 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
214 if err != nil {
215 return err
216 }
217
218 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
219 if err != nil || !ok {
220 return err
221 }
222
223 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
224 if err != nil {
225 createdAt = time.Now()
226 }
227
228 artifact := db.Artifact{
229 Did: did,
230 Rkey: e.Commit.RKey,
231 RepoAt: repoAt,
232 Tag: plumbing.Hash(record.Tag),
233 CreatedAt: createdAt,
234 BlobCid: cid.Cid(record.Artifact.Ref),
235 Name: record.Name,
236 Size: uint64(record.Artifact.Size),
237 MimeType: record.Artifact.MimeType,
238 }
239
240 err = db.AddArtifact(i.Db, artifact)
241 case models.CommitOperationDelete:
242 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
243 }
244
245 if err != nil {
246 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
247 }
248
249 return nil
250}
251
252func (i *Ingester) ingestProfile(e *models.Event) error {
253 did := e.Did
254 var err error
255
256 l := i.Logger.With("handler", "ingestProfile")
257 l = l.With("nsid", e.Commit.Collection)
258
259 if e.Commit.RKey != "self" {
260 return fmt.Errorf("ingestProfile only ingests `self` record")
261 }
262
263 switch e.Commit.Operation {
264 case models.CommitOperationCreate, models.CommitOperationUpdate:
265 raw := json.RawMessage(e.Commit.Record)
266 record := tangled.ActorProfile{}
267 err = json.Unmarshal(raw, &record)
268 if err != nil {
269 l.Error("invalid record", "err", err)
270 return err
271 }
272
273 description := ""
274 if record.Description != nil {
275 description = *record.Description
276 }
277
278 includeBluesky := record.Bluesky
279
280 location := ""
281 if record.Location != nil {
282 location = *record.Location
283 }
284
285 var links [5]string
286 for i, l := range record.Links {
287 if i < 5 {
288 links[i] = l
289 }
290 }
291
292 var stats [2]db.VanityStat
293 for i, s := range record.Stats {
294 if i < 2 {
295 stats[i].Kind = db.VanityStatKind(s)
296 }
297 }
298
299 var pinned [6]syntax.ATURI
300 for i, r := range record.PinnedRepositories {
301 if i < 6 {
302 pinned[i] = syntax.ATURI(r)
303 }
304 }
305
306 profile := db.Profile{
307 Did: did,
308 Description: description,
309 IncludeBluesky: includeBluesky,
310 Location: location,
311 Links: links,
312 Stats: stats,
313 PinnedRepos: pinned,
314 }
315
316 ddb, ok := i.Db.Execer.(*db.DB)
317 if !ok {
318 return fmt.Errorf("failed to index profile record, invalid db cast")
319 }
320
321 tx, err := ddb.Begin()
322 if err != nil {
323 return fmt.Errorf("failed to start transaction")
324 }
325
326 err = db.ValidateProfile(tx, &profile)
327 if err != nil {
328 return fmt.Errorf("invalid profile record")
329 }
330
331 err = db.UpsertProfile(tx, &profile)
332 case models.CommitOperationDelete:
333 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
334 }
335
336 if err != nil {
337 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
338 }
339
340 return nil
341}
342
343func (i *Ingester) ingestSpindleMember(e *models.Event) error {
344 did := e.Did
345 var err error
346
347 l := i.Logger.With("handler", "ingestSpindleMember")
348 l = l.With("nsid", e.Commit.Collection)
349
350 switch e.Commit.Operation {
351 case models.CommitOperationCreate:
352 raw := json.RawMessage(e.Commit.Record)
353 record := tangled.SpindleMember{}
354 err = json.Unmarshal(raw, &record)
355 if err != nil {
356 l.Error("invalid record", "err", err)
357 return err
358 }
359
360 // only spindle owner can invite to spindles
361 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
362 if err != nil || !ok {
363 return fmt.Errorf("failed to enforce permissions: %w", err)
364 }
365
366 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
367 if err != nil {
368 return err
369 }
370
371 if memberId.Handle.IsInvalidHandle() {
372 return err
373 }
374
375 ddb, ok := i.Db.Execer.(*db.DB)
376 if !ok {
377 return fmt.Errorf("failed to index profile record, invalid db cast")
378 }
379
380 err = db.AddSpindleMember(ddb, db.SpindleMember{
381 Did: syntax.DID(did),
382 Rkey: e.Commit.RKey,
383 Instance: record.Instance,
384 Subject: memberId.DID,
385 })
386 if !ok {
387 return fmt.Errorf("failed to add to db: %w", err)
388 }
389
390 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
391 if err != nil {
392 return fmt.Errorf("failed to update ACLs: %w", err)
393 }
394
395 l.Info("added spindle member")
396 case models.CommitOperationDelete:
397 rkey := e.Commit.RKey
398
399 ddb, ok := i.Db.Execer.(*db.DB)
400 if !ok {
401 return fmt.Errorf("failed to index profile record, invalid db cast")
402 }
403
404 // get record from db first
405 members, err := db.GetSpindleMembers(
406 ddb,
407 db.FilterEq("did", did),
408 db.FilterEq("rkey", rkey),
409 )
410 if err != nil || len(members) != 1 {
411 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
412 }
413 member := members[0]
414
415 tx, err := ddb.Begin()
416 if err != nil {
417 return fmt.Errorf("failed to start txn: %w", err)
418 }
419
420 // remove record by rkey && update enforcer
421 if err = db.RemoveSpindleMember(
422 tx,
423 db.FilterEq("did", did),
424 db.FilterEq("rkey", rkey),
425 ); err != nil {
426 return fmt.Errorf("failed to remove from db: %w", err)
427 }
428
429 // update enforcer
430 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
431 if err != nil {
432 return fmt.Errorf("failed to update ACLs: %w", err)
433 }
434
435 if err = tx.Commit(); err != nil {
436 return fmt.Errorf("failed to commit txn: %w", err)
437 }
438
439 if err = i.Enforcer.E.SavePolicy(); err != nil {
440 return fmt.Errorf("failed to save ACLs: %w", err)
441 }
442
443 l.Info("removed spindle member")
444 }
445
446 return nil
447}
448
449func (i *Ingester) ingestSpindle(e *models.Event) error {
450 did := e.Did
451 var err error
452
453 l := i.Logger.With("handler", "ingestSpindle")
454 l = l.With("nsid", e.Commit.Collection)
455
456 switch e.Commit.Operation {
457 case models.CommitOperationCreate:
458 raw := json.RawMessage(e.Commit.Record)
459 record := tangled.Spindle{}
460 err = json.Unmarshal(raw, &record)
461 if err != nil {
462 l.Error("invalid record", "err", err)
463 return err
464 }
465
466 instance := e.Commit.RKey
467
468 ddb, ok := i.Db.Execer.(*db.DB)
469 if !ok {
470 return fmt.Errorf("failed to index profile record, invalid db cast")
471 }
472
473 err := db.AddSpindle(ddb, db.Spindle{
474 Owner: syntax.DID(did),
475 Instance: instance,
476 })
477 if err != nil {
478 l.Error("failed to add spindle to db", "err", err, "instance", instance)
479 return err
480 }
481
482 err = serververify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev)
483 if err != nil {
484 l.Error("failed to add spindle to db", "err", err, "instance", instance)
485 return err
486 }
487
488 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
489 if err != nil {
490 return fmt.Errorf("failed to mark verified: %w", err)
491 }
492
493 return nil
494
495 case models.CommitOperationDelete:
496 instance := e.Commit.RKey
497
498 ddb, ok := i.Db.Execer.(*db.DB)
499 if !ok {
500 return fmt.Errorf("failed to index profile record, invalid db cast")
501 }
502
503 // get record from db first
504 spindles, err := db.GetSpindles(
505 ddb,
506 db.FilterEq("owner", did),
507 db.FilterEq("instance", instance),
508 )
509 if err != nil || len(spindles) != 1 {
510 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
511 }
512 spindle := spindles[0]
513
514 tx, err := ddb.Begin()
515 if err != nil {
516 return err
517 }
518 defer func() {
519 tx.Rollback()
520 i.Enforcer.E.LoadPolicy()
521 }()
522
523 // remove spindle members first
524 err = db.RemoveSpindleMember(
525 tx,
526 db.FilterEq("owner", did),
527 db.FilterEq("instance", instance),
528 )
529 if err != nil {
530 return err
531 }
532
533 err = db.DeleteSpindle(
534 tx,
535 db.FilterEq("owner", did),
536 db.FilterEq("instance", instance),
537 )
538 if err != nil {
539 return err
540 }
541
542 if spindle.Verified != nil {
543 err = i.Enforcer.RemoveSpindle(instance)
544 if err != nil {
545 return err
546 }
547 }
548
549 err = tx.Commit()
550 if err != nil {
551 return err
552 }
553
554 err = i.Enforcer.E.SavePolicy()
555 if err != nil {
556 return err
557 }
558 }
559
560 return nil
561}
562
563func (i *Ingester) ingestString(e *models.Event) error {
564 did := e.Did
565 rkey := e.Commit.RKey
566
567 var err error
568
569 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
570 l.Info("ingesting record")
571
572 ddb, ok := i.Db.Execer.(*db.DB)
573 if !ok {
574 return fmt.Errorf("failed to index string record, invalid db cast")
575 }
576
577 switch e.Commit.Operation {
578 case models.CommitOperationCreate, models.CommitOperationUpdate:
579 raw := json.RawMessage(e.Commit.Record)
580 record := tangled.String{}
581 err = json.Unmarshal(raw, &record)
582 if err != nil {
583 l.Error("invalid record", "err", err)
584 return err
585 }
586
587 string := db.StringFromRecord(did, rkey, record)
588
589 if err = string.Validate(); err != nil {
590 l.Error("invalid record", "err", err)
591 return err
592 }
593
594 if err = db.AddString(ddb, string); err != nil {
595 l.Error("failed to add string", "err", err)
596 return err
597 }
598
599 return nil
600
601 case models.CommitOperationDelete:
602 if err := db.DeleteString(
603 ddb,
604 db.FilterEq("did", did),
605 db.FilterEq("rkey", rkey),
606 ); err != nil {
607 l.Error("failed to delete", "err", err)
608 return fmt.Errorf("failed to delete string record: %w", err)
609 }
610
611 return nil
612 }
613
614 return nil
615}
616
617func (i *Ingester) ingestKnotMember(e *models.Event) error {
618 did := e.Did
619 var err error
620
621 l := i.Logger.With("handler", "ingestKnotMember")
622 l = l.With("nsid", e.Commit.Collection)
623
624 switch e.Commit.Operation {
625 case models.CommitOperationCreate:
626 raw := json.RawMessage(e.Commit.Record)
627 record := tangled.KnotMember{}
628 err = json.Unmarshal(raw, &record)
629 if err != nil {
630 l.Error("invalid record", "err", err)
631 return err
632 }
633
634 // only knot owner can invite to knots
635 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
636 if err != nil || !ok {
637 return fmt.Errorf("failed to enforce permissions: %w", err)
638 }
639
640 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
641 if err != nil {
642 return err
643 }
644
645 if memberId.Handle.IsInvalidHandle() {
646 return err
647 }
648
649 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
650 if err != nil {
651 return fmt.Errorf("failed to update ACLs: %w", err)
652 }
653
654 l.Info("added knot member")
655 case models.CommitOperationDelete:
656 // we don't store knot members in a table (like we do for spindle)
657 // and we can't remove this just yet. possibly fixed if we switch
658 // to either:
659 // 1. a knot_members table like with spindle and store the rkey
660 // 2. use the knot host as the rkey
661 //
662 // TODO: implement member deletion
663 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
664 }
665
666 return nil
667}
668
669func (i *Ingester) ingestKnot(e *models.Event) error {
670 did := e.Did
671 var err error
672
673 l := i.Logger.With("handler", "ingestKnot")
674 l = l.With("nsid", e.Commit.Collection)
675
676 switch e.Commit.Operation {
677 case models.CommitOperationCreate:
678 raw := json.RawMessage(e.Commit.Record)
679 record := tangled.Knot{}
680 err = json.Unmarshal(raw, &record)
681 if err != nil {
682 l.Error("invalid record", "err", err)
683 return err
684 }
685
686 domain := e.Commit.RKey
687
688 ddb, ok := i.Db.Execer.(*db.DB)
689 if !ok {
690 return fmt.Errorf("failed to index profile record, invalid db cast")
691 }
692
693 err := db.AddKnot(ddb, domain, did)
694 if err != nil {
695 l.Error("failed to add knot to db", "err", err, "domain", domain)
696 return err
697 }
698
699 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
700 if err != nil {
701 l.Error("failed to verify knot", "err", err, "domain", domain)
702 return err
703 }
704
705 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
706 if err != nil {
707 return fmt.Errorf("failed to mark verified: %w", err)
708 }
709
710 return nil
711
712 case models.CommitOperationDelete:
713 domain := e.Commit.RKey
714
715 ddb, ok := i.Db.Execer.(*db.DB)
716 if !ok {
717 return fmt.Errorf("failed to index knot record, invalid db cast")
718 }
719
720 // get record from db first
721 registrations, err := db.GetRegistrations(
722 ddb,
723 db.FilterEq("domain", domain),
724 db.FilterEq("did", did),
725 )
726 if err != nil {
727 return fmt.Errorf("failed to get registration: %w", err)
728 }
729 if len(registrations) != 1 {
730 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
731 }
732 registration := registrations[0]
733
734 tx, err := ddb.Begin()
735 if err != nil {
736 return err
737 }
738 defer func() {
739 tx.Rollback()
740 i.Enforcer.E.LoadPolicy()
741 }()
742
743 err = db.DeleteKnot(
744 tx,
745 db.FilterEq("did", did),
746 db.FilterEq("domain", domain),
747 )
748 if err != nil {
749 return err
750 }
751
752 if registration.Registered != nil {
753 err = i.Enforcer.RemoveKnot(domain)
754 if err != nil {
755 return err
756 }
757 }
758
759 err = tx.Commit()
760 if err != nil {
761 return err
762 }
763
764 err = i.Enforcer.E.SavePolicy()
765 if err != nil {
766 return err
767 }
768 }
769
770 return nil
771}