forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "maps"
9 "slices"
10
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 jmodels "github.com/bluesky-social/jetstream/pkg/models"
15 "github.com/go-git/go-git/v5/plumbing"
16 "github.com/ipfs/go-cid"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/serververify"
22 "tangled.org/core/appview/validator"
23 "tangled.org/core/idresolver"
24 "tangled.org/core/orm"
25 "tangled.org/core/rbac"
26)
27
28type Ingester struct {
29 Db db.DbWrapper
30 Enforcer *rbac.Enforcer
31 IdResolver *idresolver.Resolver
32 Config *config.Config
33 Logger *slog.Logger
34 Validator *validator.Validator
35}
36
37type processFunc func(ctx context.Context, e *jmodels.Event) error
38
39func (i *Ingester) Ingest() processFunc {
40 return func(ctx context.Context, e *jmodels.Event) error {
41 var err error
42 defer func() {
43 eventTime := e.TimeUS
44 lastTimeUs := eventTime + 1
45 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
46 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
47 }
48 }()
49
50 l := i.Logger.With("kind", e.Kind)
51 switch e.Kind {
52 case jmodels.EventKindAccount:
53 if !e.Account.Active && *e.Account.Status == "deactivated" {
54 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
55 }
56 case jmodels.EventKindIdentity:
57 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
58 case jmodels.EventKindCommit:
59 switch e.Commit.Collection {
60 case tangled.GraphFollowNSID:
61 err = i.ingestFollow(e)
62 case tangled.FeedStarNSID:
63 err = i.ingestStar(e)
64 case tangled.PublicKeyNSID:
65 err = i.ingestPublicKey(e)
66 case tangled.RepoArtifactNSID:
67 err = i.ingestArtifact(e)
68 case tangled.ActorProfileNSID:
69 err = i.ingestProfile(e)
70 case tangled.SpindleMemberNSID:
71 err = i.ingestSpindleMember(ctx, e)
72 case tangled.SpindleNSID:
73 err = i.ingestSpindle(ctx, e)
74 case tangled.KnotMemberNSID:
75 err = i.ingestKnotMember(e)
76 case tangled.KnotNSID:
77 err = i.ingestKnot(e)
78 case tangled.StringNSID:
79 err = i.ingestString(e)
80 case tangled.RepoIssueNSID:
81 err = i.ingestIssue(ctx, e)
82 case tangled.RepoIssueCommentNSID:
83 err = i.ingestIssueComment(e)
84 case tangled.LabelDefinitionNSID:
85 err = i.ingestLabelDefinition(e)
86 case tangled.LabelOpNSID:
87 err = i.ingestLabelOp(e)
88 }
89 l = i.Logger.With("nsid", e.Commit.Collection)
90 }
91
92 if err != nil {
93 l.Warn("refused to ingest record", "err", err)
94 }
95
96 return nil
97 }
98}
99
100func (i *Ingester) ingestStar(e *jmodels.Event) error {
101 var err error
102 did := e.Did
103
104 l := i.Logger.With("handler", "ingestStar")
105 l = l.With("nsid", e.Commit.Collection)
106
107 switch e.Commit.Operation {
108 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
109 var subjectUri syntax.ATURI
110
111 raw := json.RawMessage(e.Commit.Record)
112 record := tangled.FeedStar{}
113 err := json.Unmarshal(raw, &record)
114 if err != nil {
115 l.Error("invalid record", "err", err)
116 return err
117 }
118
119 subjectUri, err = syntax.ParseATURI(record.Subject)
120 if err != nil {
121 l.Error("invalid record", "err", err)
122 return err
123 }
124 err = db.AddStar(i.Db, &models.Star{
125 Did: did,
126 RepoAt: subjectUri,
127 Rkey: e.Commit.RKey,
128 })
129 case jmodels.CommitOperationDelete:
130 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
131 }
132
133 if err != nil {
134 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
135 }
136
137 return nil
138}
139
140func (i *Ingester) ingestFollow(e *jmodels.Event) error {
141 var err error
142 did := e.Did
143
144 l := i.Logger.With("handler", "ingestFollow")
145 l = l.With("nsid", e.Commit.Collection)
146
147 switch e.Commit.Operation {
148 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
149 raw := json.RawMessage(e.Commit.Record)
150 record := tangled.GraphFollow{}
151 err = json.Unmarshal(raw, &record)
152 if err != nil {
153 l.Error("invalid record", "err", err)
154 return err
155 }
156
157 err = db.AddFollow(i.Db, &models.Follow{
158 UserDid: did,
159 SubjectDid: record.Subject,
160 Rkey: e.Commit.RKey,
161 })
162 case jmodels.CommitOperationDelete:
163 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
164 }
165
166 if err != nil {
167 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
168 }
169
170 return nil
171}
172
173func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
174 did := e.Did
175 var err error
176
177 l := i.Logger.With("handler", "ingestPublicKey")
178 l = l.With("nsid", e.Commit.Collection)
179
180 switch e.Commit.Operation {
181 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
182 l.Debug("processing add of pubkey")
183 raw := json.RawMessage(e.Commit.Record)
184 record := tangled.PublicKey{}
185 err = json.Unmarshal(raw, &record)
186 if err != nil {
187 l.Error("invalid record", "err", err)
188 return err
189 }
190
191 name := record.Name
192 key := record.Key
193 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
194 case jmodels.CommitOperationDelete:
195 l.Debug("processing delete of pubkey")
196 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
197 }
198
199 if err != nil {
200 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
201 }
202
203 return nil
204}
205
206func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
207 did := e.Did
208 var err error
209
210 l := i.Logger.With("handler", "ingestArtifact")
211 l = l.With("nsid", e.Commit.Collection)
212
213 switch e.Commit.Operation {
214 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
215 raw := json.RawMessage(e.Commit.Record)
216 record := tangled.RepoArtifact{}
217 err = json.Unmarshal(raw, &record)
218 if err != nil {
219 l.Error("invalid record", "err", err)
220 return err
221 }
222
223 repoAt, err := syntax.ParseATURI(record.Repo)
224 if err != nil {
225 return err
226 }
227
228 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
229 if err != nil {
230 return err
231 }
232
233 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
234 if err != nil || !ok {
235 return err
236 }
237
238 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
239 if err != nil {
240 createdAt = time.Now()
241 }
242
243 artifact := models.Artifact{
244 Did: did,
245 Rkey: e.Commit.RKey,
246 RepoAt: repoAt,
247 Tag: plumbing.Hash(record.Tag),
248 CreatedAt: createdAt,
249 BlobCid: cid.Cid(record.Artifact.Ref),
250 Name: record.Name,
251 Size: uint64(record.Artifact.Size),
252 MimeType: record.Artifact.MimeType,
253 }
254
255 err = db.AddArtifact(i.Db, artifact)
256 case jmodels.CommitOperationDelete:
257 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
258 }
259
260 if err != nil {
261 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
262 }
263
264 return nil
265}
266
267func (i *Ingester) ingestProfile(e *jmodels.Event) error {
268 did := e.Did
269 var err error
270
271 l := i.Logger.With("handler", "ingestProfile")
272 l = l.With("nsid", e.Commit.Collection)
273
274 if e.Commit.RKey != "self" {
275 return fmt.Errorf("ingestProfile only ingests `self` record")
276 }
277
278 switch e.Commit.Operation {
279 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
280 raw := json.RawMessage(e.Commit.Record)
281 record := tangled.ActorProfile{}
282 err = json.Unmarshal(raw, &record)
283 if err != nil {
284 l.Error("invalid record", "err", err)
285 return err
286 }
287
288 description := ""
289 if record.Description != nil {
290 description = *record.Description
291 }
292
293 includeBluesky := record.Bluesky
294
295 pronouns := ""
296 if record.Pronouns != nil {
297 pronouns = *record.Pronouns
298 }
299
300 location := ""
301 if record.Location != nil {
302 location = *record.Location
303 }
304
305 var links [5]string
306 for i, l := range record.Links {
307 if i < 5 {
308 links[i] = l
309 }
310 }
311
312 var stats [2]models.VanityStat
313 for i, s := range record.Stats {
314 if i < 2 {
315 stats[i].Kind = models.VanityStatKind(s)
316 }
317 }
318
319 var pinned [6]syntax.ATURI
320 for i, r := range record.PinnedRepositories {
321 if i < 6 {
322 pinned[i] = syntax.ATURI(r)
323 }
324 }
325
326 profile := models.Profile{
327 Did: did,
328 Description: description,
329 IncludeBluesky: includeBluesky,
330 Location: location,
331 Links: links,
332 Stats: stats,
333 PinnedRepos: pinned,
334 Pronouns: pronouns,
335 }
336
337 ddb, ok := i.Db.Execer.(*db.DB)
338 if !ok {
339 return fmt.Errorf("failed to index profile record, invalid db cast")
340 }
341
342 tx, err := ddb.Begin()
343 if err != nil {
344 return fmt.Errorf("failed to start transaction")
345 }
346
347 err = db.ValidateProfile(tx, &profile)
348 if err != nil {
349 return fmt.Errorf("invalid profile record")
350 }
351
352 err = db.UpsertProfile(tx, &profile)
353 case jmodels.CommitOperationDelete:
354 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
355 }
356
357 if err != nil {
358 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
359 }
360
361 return nil
362}
363
364func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
365 did := e.Did
366 var err error
367
368 l := i.Logger.With("handler", "ingestSpindleMember")
369 l = l.With("nsid", e.Commit.Collection)
370
371 switch e.Commit.Operation {
372 case jmodels.CommitOperationCreate:
373 raw := json.RawMessage(e.Commit.Record)
374 record := tangled.SpindleMember{}
375 err = json.Unmarshal(raw, &record)
376 if err != nil {
377 l.Error("invalid record", "err", err)
378 return err
379 }
380
381 // only spindle owner can invite to spindles
382 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
383 if err != nil || !ok {
384 return fmt.Errorf("failed to enforce permissions: %w", err)
385 }
386
387 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
388 if err != nil {
389 return err
390 }
391
392 if memberId.Handle.IsInvalidHandle() {
393 return err
394 }
395
396 ddb, ok := i.Db.Execer.(*db.DB)
397 if !ok {
398 return fmt.Errorf("failed to index profile record, invalid db cast")
399 }
400
401 err = db.AddSpindleMember(ddb, models.SpindleMember{
402 Did: syntax.DID(did),
403 Rkey: e.Commit.RKey,
404 Instance: record.Instance,
405 Subject: memberId.DID,
406 })
407 if !ok {
408 return fmt.Errorf("failed to add to db: %w", err)
409 }
410
411 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
412 if err != nil {
413 return fmt.Errorf("failed to update ACLs: %w", err)
414 }
415
416 l.Info("added spindle member")
417 case jmodels.CommitOperationDelete:
418 rkey := e.Commit.RKey
419
420 ddb, ok := i.Db.Execer.(*db.DB)
421 if !ok {
422 return fmt.Errorf("failed to index profile record, invalid db cast")
423 }
424
425 // get record from db first
426 members, err := db.GetSpindleMembers(
427 ddb,
428 orm.FilterEq("did", did),
429 orm.FilterEq("rkey", rkey),
430 )
431 if err != nil || len(members) != 1 {
432 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
433 }
434 member := members[0]
435
436 tx, err := ddb.Begin()
437 if err != nil {
438 return fmt.Errorf("failed to start txn: %w", err)
439 }
440
441 // remove record by rkey && update enforcer
442 if err = db.RemoveSpindleMember(
443 tx,
444 orm.FilterEq("did", did),
445 orm.FilterEq("rkey", rkey),
446 ); err != nil {
447 return fmt.Errorf("failed to remove from db: %w", err)
448 }
449
450 // update enforcer
451 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
452 if err != nil {
453 return fmt.Errorf("failed to update ACLs: %w", err)
454 }
455
456 if err = tx.Commit(); err != nil {
457 return fmt.Errorf("failed to commit txn: %w", err)
458 }
459
460 if err = i.Enforcer.E.SavePolicy(); err != nil {
461 return fmt.Errorf("failed to save ACLs: %w", err)
462 }
463
464 l.Info("removed spindle member")
465 }
466
467 return nil
468}
469
470func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
471 did := e.Did
472 var err error
473
474 l := i.Logger.With("handler", "ingestSpindle")
475 l = l.With("nsid", e.Commit.Collection)
476
477 switch e.Commit.Operation {
478 case jmodels.CommitOperationCreate:
479 raw := json.RawMessage(e.Commit.Record)
480 record := tangled.Spindle{}
481 err = json.Unmarshal(raw, &record)
482 if err != nil {
483 l.Error("invalid record", "err", err)
484 return err
485 }
486
487 instance := e.Commit.RKey
488
489 ddb, ok := i.Db.Execer.(*db.DB)
490 if !ok {
491 return fmt.Errorf("failed to index profile record, invalid db cast")
492 }
493
494 err := db.AddSpindle(ddb, models.Spindle{
495 Owner: syntax.DID(did),
496 Instance: instance,
497 })
498 if err != nil {
499 l.Error("failed to add spindle to db", "err", err, "instance", instance)
500 return err
501 }
502
503 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
504 if err != nil {
505 l.Error("failed to add spindle to db", "err", err, "instance", instance)
506 return err
507 }
508
509 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
510 if err != nil {
511 return fmt.Errorf("failed to mark verified: %w", err)
512 }
513
514 return nil
515
516 case jmodels.CommitOperationDelete:
517 instance := e.Commit.RKey
518
519 ddb, ok := i.Db.Execer.(*db.DB)
520 if !ok {
521 return fmt.Errorf("failed to index profile record, invalid db cast")
522 }
523
524 // get record from db first
525 spindles, err := db.GetSpindles(
526 ddb,
527 orm.FilterEq("owner", did),
528 orm.FilterEq("instance", instance),
529 )
530 if err != nil || len(spindles) != 1 {
531 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
532 }
533 spindle := spindles[0]
534
535 tx, err := ddb.Begin()
536 if err != nil {
537 return err
538 }
539 defer func() {
540 tx.Rollback()
541 i.Enforcer.E.LoadPolicy()
542 }()
543
544 // remove spindle members first
545 err = db.RemoveSpindleMember(
546 tx,
547 orm.FilterEq("owner", did),
548 orm.FilterEq("instance", instance),
549 )
550 if err != nil {
551 return err
552 }
553
554 err = db.DeleteSpindle(
555 tx,
556 orm.FilterEq("owner", did),
557 orm.FilterEq("instance", instance),
558 )
559 if err != nil {
560 return err
561 }
562
563 if spindle.Verified != nil {
564 err = i.Enforcer.RemoveSpindle(instance)
565 if err != nil {
566 return err
567 }
568 }
569
570 err = tx.Commit()
571 if err != nil {
572 return err
573 }
574
575 err = i.Enforcer.E.SavePolicy()
576 if err != nil {
577 return err
578 }
579 }
580
581 return nil
582}
583
584func (i *Ingester) ingestString(e *jmodels.Event) error {
585 did := e.Did
586 rkey := e.Commit.RKey
587
588 var err error
589
590 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
591 l.Info("ingesting record")
592
593 ddb, ok := i.Db.Execer.(*db.DB)
594 if !ok {
595 return fmt.Errorf("failed to index string record, invalid db cast")
596 }
597
598 switch e.Commit.Operation {
599 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
600 raw := json.RawMessage(e.Commit.Record)
601 record := tangled.String{}
602 err = json.Unmarshal(raw, &record)
603 if err != nil {
604 l.Error("invalid record", "err", err)
605 return err
606 }
607
608 string := models.StringFromRecord(did, rkey, record)
609
610 if err = i.Validator.ValidateString(&string); err != nil {
611 l.Error("invalid record", "err", err)
612 return err
613 }
614
615 if err = db.AddString(ddb, string); err != nil {
616 l.Error("failed to add string", "err", err)
617 return err
618 }
619
620 return nil
621
622 case jmodels.CommitOperationDelete:
623 if err := db.DeleteString(
624 ddb,
625 orm.FilterEq("did", did),
626 orm.FilterEq("rkey", rkey),
627 ); err != nil {
628 l.Error("failed to delete", "err", err)
629 return fmt.Errorf("failed to delete string record: %w", err)
630 }
631
632 return nil
633 }
634
635 return nil
636}
637
638func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
639 did := e.Did
640 var err error
641
642 l := i.Logger.With("handler", "ingestKnotMember")
643 l = l.With("nsid", e.Commit.Collection)
644
645 switch e.Commit.Operation {
646 case jmodels.CommitOperationCreate:
647 raw := json.RawMessage(e.Commit.Record)
648 record := tangled.KnotMember{}
649 err = json.Unmarshal(raw, &record)
650 if err != nil {
651 l.Error("invalid record", "err", err)
652 return err
653 }
654
655 // only knot owner can invite to knots
656 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
657 if err != nil || !ok {
658 return fmt.Errorf("failed to enforce permissions: %w", err)
659 }
660
661 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
662 if err != nil {
663 return err
664 }
665
666 if memberId.Handle.IsInvalidHandle() {
667 return err
668 }
669
670 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
671 if err != nil {
672 return fmt.Errorf("failed to update ACLs: %w", err)
673 }
674
675 l.Info("added knot member")
676 case jmodels.CommitOperationDelete:
677 // we don't store knot members in a table (like we do for spindle)
678 // and we can't remove this just yet. possibly fixed if we switch
679 // to either:
680 // 1. a knot_members table like with spindle and store the rkey
681 // 2. use the knot host as the rkey
682 //
683 // TODO: implement member deletion
684 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
685 }
686
687 return nil
688}
689
690func (i *Ingester) ingestKnot(e *jmodels.Event) error {
691 did := e.Did
692 var err error
693
694 l := i.Logger.With("handler", "ingestKnot")
695 l = l.With("nsid", e.Commit.Collection)
696
697 switch e.Commit.Operation {
698 case jmodels.CommitOperationCreate:
699 raw := json.RawMessage(e.Commit.Record)
700 record := tangled.Knot{}
701 err = json.Unmarshal(raw, &record)
702 if err != nil {
703 l.Error("invalid record", "err", err)
704 return err
705 }
706
707 domain := e.Commit.RKey
708
709 ddb, ok := i.Db.Execer.(*db.DB)
710 if !ok {
711 return fmt.Errorf("failed to index profile record, invalid db cast")
712 }
713
714 err := db.AddKnot(ddb, domain, did)
715 if err != nil {
716 l.Error("failed to add knot to db", "err", err, "domain", domain)
717 return err
718 }
719
720 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
721 if err != nil {
722 l.Error("failed to verify knot", "err", err, "domain", domain)
723 return err
724 }
725
726 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
727 if err != nil {
728 return fmt.Errorf("failed to mark verified: %w", err)
729 }
730
731 return nil
732
733 case jmodels.CommitOperationDelete:
734 domain := e.Commit.RKey
735
736 ddb, ok := i.Db.Execer.(*db.DB)
737 if !ok {
738 return fmt.Errorf("failed to index knot record, invalid db cast")
739 }
740
741 // get record from db first
742 registrations, err := db.GetRegistrations(
743 ddb,
744 orm.FilterEq("domain", domain),
745 orm.FilterEq("did", did),
746 )
747 if err != nil {
748 return fmt.Errorf("failed to get registration: %w", err)
749 }
750 if len(registrations) != 1 {
751 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
752 }
753 registration := registrations[0]
754
755 tx, err := ddb.Begin()
756 if err != nil {
757 return err
758 }
759 defer func() {
760 tx.Rollback()
761 i.Enforcer.E.LoadPolicy()
762 }()
763
764 err = db.DeleteKnot(
765 tx,
766 orm.FilterEq("did", did),
767 orm.FilterEq("domain", domain),
768 )
769 if err != nil {
770 return err
771 }
772
773 if registration.Registered != nil {
774 err = i.Enforcer.RemoveKnot(domain)
775 if err != nil {
776 return err
777 }
778 }
779
780 err = tx.Commit()
781 if err != nil {
782 return err
783 }
784
785 err = i.Enforcer.E.SavePolicy()
786 if err != nil {
787 return err
788 }
789 }
790
791 return nil
792}
793func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
794 did := e.Did
795 rkey := e.Commit.RKey
796
797 var err error
798
799 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
800 l.Info("ingesting record")
801
802 ddb, ok := i.Db.Execer.(*db.DB)
803 if !ok {
804 return fmt.Errorf("failed to index issue record, invalid db cast")
805 }
806
807 switch e.Commit.Operation {
808 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
809 raw := json.RawMessage(e.Commit.Record)
810 record := tangled.RepoIssue{}
811 err = json.Unmarshal(raw, &record)
812 if err != nil {
813 l.Error("invalid record", "err", err)
814 return err
815 }
816
817 issue := models.IssueFromRecord(did, rkey, record)
818
819 if err := i.Validator.ValidateIssue(&issue); err != nil {
820 return fmt.Errorf("failed to validate issue: %w", err)
821 }
822
823 tx, err := ddb.BeginTx(ctx, nil)
824 if err != nil {
825 l.Error("failed to begin transaction", "err", err)
826 return err
827 }
828 defer tx.Rollback()
829
830 err = db.PutIssue(tx, &issue)
831 if err != nil {
832 l.Error("failed to create issue", "err", err)
833 return err
834 }
835
836 err = tx.Commit()
837 if err != nil {
838 l.Error("failed to commit txn", "err", err)
839 return err
840 }
841
842 return nil
843
844 case jmodels.CommitOperationDelete:
845 tx, err := ddb.BeginTx(ctx, nil)
846 if err != nil {
847 l.Error("failed to begin transaction", "err", err)
848 return err
849 }
850 defer tx.Rollback()
851
852 if err := db.DeleteIssues(
853 tx,
854 did,
855 rkey,
856 ); err != nil {
857 l.Error("failed to delete", "err", err)
858 return fmt.Errorf("failed to delete issue record: %w", err)
859 }
860 if err := tx.Commit(); err != nil {
861 l.Error("failed to commit txn", "err", err)
862 return err
863 }
864
865 return nil
866 }
867
868 return nil
869}
870
871func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
872 did := e.Did
873 rkey := e.Commit.RKey
874
875 var err error
876
877 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
878 l.Info("ingesting record")
879
880 ddb, ok := i.Db.Execer.(*db.DB)
881 if !ok {
882 return fmt.Errorf("failed to index issue comment record, invalid db cast")
883 }
884
885 switch e.Commit.Operation {
886 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
887 raw := json.RawMessage(e.Commit.Record)
888 record := tangled.RepoIssueComment{}
889 err = json.Unmarshal(raw, &record)
890 if err != nil {
891 return fmt.Errorf("invalid record: %w", err)
892 }
893
894 comment, err := models.IssueCommentFromRecord(did, rkey, record)
895 if err != nil {
896 return fmt.Errorf("failed to parse comment from record: %w", err)
897 }
898
899 if err := i.Validator.ValidateIssueComment(comment); err != nil {
900 return fmt.Errorf("failed to validate comment: %w", err)
901 }
902
903 tx, err := ddb.Begin()
904 if err != nil {
905 return fmt.Errorf("failed to start transaction: %w", err)
906 }
907 defer tx.Rollback()
908
909 _, err = db.AddIssueComment(tx, *comment)
910 if err != nil {
911 return fmt.Errorf("failed to create issue comment: %w", err)
912 }
913
914 return tx.Commit()
915
916 case jmodels.CommitOperationDelete:
917 if err := db.DeleteIssueComments(
918 ddb,
919 orm.FilterEq("did", did),
920 orm.FilterEq("rkey", rkey),
921 ); err != nil {
922 return fmt.Errorf("failed to delete issue comment record: %w", err)
923 }
924
925 return nil
926 }
927
928 return nil
929}
930
931func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
932 did := e.Did
933 rkey := e.Commit.RKey
934
935 var err error
936
937 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
938 l.Info("ingesting record")
939
940 ddb, ok := i.Db.Execer.(*db.DB)
941 if !ok {
942 return fmt.Errorf("failed to index label definition, invalid db cast")
943 }
944
945 switch e.Commit.Operation {
946 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
947 raw := json.RawMessage(e.Commit.Record)
948 record := tangled.LabelDefinition{}
949 err = json.Unmarshal(raw, &record)
950 if err != nil {
951 return fmt.Errorf("invalid record: %w", err)
952 }
953
954 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
955 if err != nil {
956 return fmt.Errorf("failed to parse labeldef from record: %w", err)
957 }
958
959 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
960 return fmt.Errorf("failed to validate labeldef: %w", err)
961 }
962
963 _, err = db.AddLabelDefinition(ddb, def)
964 if err != nil {
965 return fmt.Errorf("failed to create labeldef: %w", err)
966 }
967
968 return nil
969
970 case jmodels.CommitOperationDelete:
971 if err := db.DeleteLabelDefinition(
972 ddb,
973 orm.FilterEq("did", did),
974 orm.FilterEq("rkey", rkey),
975 ); err != nil {
976 return fmt.Errorf("failed to delete labeldef record: %w", err)
977 }
978
979 return nil
980 }
981
982 return nil
983}
984
985func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
986 did := e.Did
987 rkey := e.Commit.RKey
988
989 var err error
990
991 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
992 l.Info("ingesting record")
993
994 ddb, ok := i.Db.Execer.(*db.DB)
995 if !ok {
996 return fmt.Errorf("failed to index label op, invalid db cast")
997 }
998
999 switch e.Commit.Operation {
1000 case jmodels.CommitOperationCreate:
1001 raw := json.RawMessage(e.Commit.Record)
1002 record := tangled.LabelOp{}
1003 err = json.Unmarshal(raw, &record)
1004 if err != nil {
1005 return fmt.Errorf("invalid record: %w", err)
1006 }
1007
1008 subject := syntax.ATURI(record.Subject)
1009 collection := subject.Collection()
1010
1011 var repo *models.Repo
1012 switch collection {
1013 case tangled.RepoIssueNSID:
1014 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1015 if err != nil || len(i) != 1 {
1016 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1017 }
1018 repo = i[0].Repo
1019 default:
1020 return fmt.Errorf("unsupport label subject: %s", collection)
1021 }
1022
1023 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1024 if err != nil {
1025 return fmt.Errorf("failed to build label application ctx: %w", err)
1026 }
1027
1028 ops := models.LabelOpsFromRecord(did, rkey, record)
1029
1030 for _, o := range ops {
1031 def, ok := actx.Defs[o.OperandKey]
1032 if !ok {
1033 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1034 }
1035 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1036 return fmt.Errorf("failed to validate labelop: %w", err)
1037 }
1038 }
1039
1040 tx, err := ddb.Begin()
1041 if err != nil {
1042 return err
1043 }
1044 defer tx.Rollback()
1045
1046 for _, o := range ops {
1047 _, err = db.AddLabelOp(tx, &o)
1048 if err != nil {
1049 return fmt.Errorf("failed to add labelop: %w", err)
1050 }
1051 }
1052
1053 if err = tx.Commit(); err != nil {
1054 return err
1055 }
1056 }
1057
1058 return nil
1059}