1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "maps"
9 "slices"
10
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 jmodels "github.com/bluesky-social/jetstream/pkg/models"
15 "github.com/go-git/go-git/v5/plumbing"
16 "github.com/ipfs/go-cid"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/serververify"
22 "tangled.org/core/appview/validator"
23 "tangled.org/core/idresolver"
24 "tangled.org/core/rbac"
25)
26
27type Ingester struct {
28 Db db.DbWrapper
29 Enforcer *rbac.Enforcer
30 IdResolver *idresolver.Resolver
31 Config *config.Config
32 Logger *slog.Logger
33 Validator *validator.Validator
34}
35
36type processFunc func(ctx context.Context, e *jmodels.Event) error
37
38func (i *Ingester) Ingest() processFunc {
39 return func(ctx context.Context, e *jmodels.Event) error {
40 var err error
41 defer func() {
42 eventTime := e.TimeUS
43 lastTimeUs := eventTime + 1
44 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
45 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
46 }
47 }()
48
49 l := i.Logger.With("kind", e.Kind)
50 switch e.Kind {
51 case jmodels.EventKindAccount:
52 if !e.Account.Active && *e.Account.Status == "deactivated" {
53 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
54 }
55 case jmodels.EventKindIdentity:
56 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
57 case jmodels.EventKindCommit:
58 switch e.Commit.Collection {
59 case tangled.GraphFollowNSID:
60 err = i.ingestFollow(e)
61 case tangled.FeedStarNSID:
62 err = i.ingestStar(e)
63 case tangled.PublicKeyNSID:
64 err = i.ingestPublicKey(e)
65 case tangled.RepoArtifactNSID:
66 err = i.ingestArtifact(e)
67 case tangled.ActorProfileNSID:
68 err = i.ingestProfile(e)
69 case tangled.SpindleMemberNSID:
70 err = i.ingestSpindleMember(ctx, e)
71 case tangled.SpindleNSID:
72 err = i.ingestSpindle(ctx, e)
73 case tangled.KnotMemberNSID:
74 err = i.ingestKnotMember(e)
75 case tangled.KnotNSID:
76 err = i.ingestKnot(e)
77 case tangled.StringNSID:
78 err = i.ingestString(e)
79 case tangled.RepoIssueNSID:
80 err = i.ingestIssue(ctx, e)
81 case tangled.RepoIssueCommentNSID:
82 err = i.ingestIssueComment(e)
83 case tangled.LabelDefinitionNSID:
84 err = i.ingestLabelDefinition(e)
85 case tangled.LabelOpNSID:
86 err = i.ingestLabelOp(e)
87 }
88 l = i.Logger.With("nsid", e.Commit.Collection)
89 }
90
91 if err != nil {
92 l.Warn("refused to ingest record", "err", err)
93 }
94
95 return nil
96 }
97}
98
99func (i *Ingester) ingestStar(e *jmodels.Event) error {
100 var err error
101 did := e.Did
102
103 l := i.Logger.With("handler", "ingestStar")
104 l = l.With("nsid", e.Commit.Collection)
105
106 switch e.Commit.Operation {
107 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
108 var subjectUri syntax.ATURI
109
110 raw := json.RawMessage(e.Commit.Record)
111 record := tangled.FeedStar{}
112 err := json.Unmarshal(raw, &record)
113 if err != nil {
114 l.Error("invalid record", "err", err)
115 return err
116 }
117
118 subjectUri, err = syntax.ParseATURI(record.Subject)
119 if err != nil {
120 l.Error("invalid record", "err", err)
121 return err
122 }
123 err = db.AddStar(i.Db, &models.Star{
124 Did: did,
125 RepoAt: subjectUri,
126 Rkey: e.Commit.RKey,
127 })
128 case jmodels.CommitOperationDelete:
129 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
130 }
131
132 if err != nil {
133 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
134 }
135
136 return nil
137}
138
139func (i *Ingester) ingestFollow(e *jmodels.Event) error {
140 var err error
141 did := e.Did
142
143 l := i.Logger.With("handler", "ingestFollow")
144 l = l.With("nsid", e.Commit.Collection)
145
146 switch e.Commit.Operation {
147 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
148 raw := json.RawMessage(e.Commit.Record)
149 record := tangled.GraphFollow{}
150 err = json.Unmarshal(raw, &record)
151 if err != nil {
152 l.Error("invalid record", "err", err)
153 return err
154 }
155
156 err = db.AddFollow(i.Db, &models.Follow{
157 UserDid: did,
158 SubjectDid: record.Subject,
159 Rkey: e.Commit.RKey,
160 })
161 case jmodels.CommitOperationDelete:
162 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
163 }
164
165 if err != nil {
166 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
167 }
168
169 return nil
170}
171
172func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
173 did := e.Did
174 var err error
175
176 l := i.Logger.With("handler", "ingestPublicKey")
177 l = l.With("nsid", e.Commit.Collection)
178
179 switch e.Commit.Operation {
180 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
181 l.Debug("processing add of pubkey")
182 raw := json.RawMessage(e.Commit.Record)
183 record := tangled.PublicKey{}
184 err = json.Unmarshal(raw, &record)
185 if err != nil {
186 l.Error("invalid record", "err", err)
187 return err
188 }
189
190 name := record.Name
191 key := record.Key
192 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
193 case jmodels.CommitOperationDelete:
194 l.Debug("processing delete of pubkey")
195 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
196 }
197
198 if err != nil {
199 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
200 }
201
202 return nil
203}
204
205func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
206 did := e.Did
207 var err error
208
209 l := i.Logger.With("handler", "ingestArtifact")
210 l = l.With("nsid", e.Commit.Collection)
211
212 switch e.Commit.Operation {
213 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
214 raw := json.RawMessage(e.Commit.Record)
215 record := tangled.RepoArtifact{}
216 err = json.Unmarshal(raw, &record)
217 if err != nil {
218 l.Error("invalid record", "err", err)
219 return err
220 }
221
222 repoAt, err := syntax.ParseATURI(record.Repo)
223 if err != nil {
224 return err
225 }
226
227 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
228 if err != nil {
229 return err
230 }
231
232 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
233 if err != nil || !ok {
234 return err
235 }
236
237 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
238 if err != nil {
239 createdAt = time.Now()
240 }
241
242 artifact := models.Artifact{
243 Did: did,
244 Rkey: e.Commit.RKey,
245 RepoAt: repoAt,
246 Tag: plumbing.Hash(record.Tag),
247 CreatedAt: createdAt,
248 BlobCid: cid.Cid(record.Artifact.Ref),
249 Name: record.Name,
250 Size: uint64(record.Artifact.Size),
251 MimeType: record.Artifact.MimeType,
252 }
253
254 err = db.AddArtifact(i.Db, artifact)
255 case jmodels.CommitOperationDelete:
256 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
257 }
258
259 if err != nil {
260 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
261 }
262
263 return nil
264}
265
266func (i *Ingester) ingestProfile(e *jmodels.Event) error {
267 did := e.Did
268 var err error
269
270 l := i.Logger.With("handler", "ingestProfile")
271 l = l.With("nsid", e.Commit.Collection)
272
273 if e.Commit.RKey != "self" {
274 return fmt.Errorf("ingestProfile only ingests `self` record")
275 }
276
277 switch e.Commit.Operation {
278 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
279 raw := json.RawMessage(e.Commit.Record)
280 record := tangled.ActorProfile{}
281 err = json.Unmarshal(raw, &record)
282 if err != nil {
283 l.Error("invalid record", "err", err)
284 return err
285 }
286
287 description := ""
288 if record.Description != nil {
289 description = *record.Description
290 }
291
292 includeBluesky := record.Bluesky
293
294 pronouns := ""
295 if record.Pronouns != nil {
296 pronouns = *record.Pronouns
297 }
298
299 location := ""
300 if record.Location != nil {
301 location = *record.Location
302 }
303
304 var links [5]string
305 for i, l := range record.Links {
306 if i < 5 {
307 links[i] = l
308 }
309 }
310
311 var stats [2]models.VanityStat
312 for i, s := range record.Stats {
313 if i < 2 {
314 stats[i].Kind = models.VanityStatKind(s)
315 }
316 }
317
318 var pinned [6]syntax.ATURI
319 for i, r := range record.PinnedRepositories {
320 if i < 6 {
321 pinned[i] = syntax.ATURI(r)
322 }
323 }
324
325 profile := models.Profile{
326 Did: did,
327 Description: description,
328 IncludeBluesky: includeBluesky,
329 Location: location,
330 Links: links,
331 Stats: stats,
332 PinnedRepos: pinned,
333 Pronouns: pronouns,
334 }
335
336 ddb, ok := i.Db.Execer.(*db.DB)
337 if !ok {
338 return fmt.Errorf("failed to index profile record, invalid db cast")
339 }
340
341 tx, err := ddb.Begin()
342 if err != nil {
343 return fmt.Errorf("failed to start transaction")
344 }
345
346 err = db.ValidateProfile(tx, &profile)
347 if err != nil {
348 return fmt.Errorf("invalid profile record")
349 }
350
351 err = db.UpsertProfile(tx, &profile)
352 case jmodels.CommitOperationDelete:
353 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
354 }
355
356 if err != nil {
357 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
358 }
359
360 return nil
361}
362
363func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
364 did := e.Did
365 var err error
366
367 l := i.Logger.With("handler", "ingestSpindleMember")
368 l = l.With("nsid", e.Commit.Collection)
369
370 switch e.Commit.Operation {
371 case jmodels.CommitOperationCreate:
372 raw := json.RawMessage(e.Commit.Record)
373 record := tangled.SpindleMember{}
374 err = json.Unmarshal(raw, &record)
375 if err != nil {
376 l.Error("invalid record", "err", err)
377 return err
378 }
379
380 // only spindle owner can invite to spindles
381 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
382 if err != nil || !ok {
383 return fmt.Errorf("failed to enforce permissions: %w", err)
384 }
385
386 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
387 if err != nil {
388 return err
389 }
390
391 if memberId.Handle.IsInvalidHandle() {
392 return err
393 }
394
395 ddb, ok := i.Db.Execer.(*db.DB)
396 if !ok {
397 return fmt.Errorf("failed to index profile record, invalid db cast")
398 }
399
400 err = db.AddSpindleMember(ddb, models.SpindleMember{
401 Did: syntax.DID(did),
402 Rkey: e.Commit.RKey,
403 Instance: record.Instance,
404 Subject: memberId.DID,
405 })
406 if !ok {
407 return fmt.Errorf("failed to add to db: %w", err)
408 }
409
410 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
411 if err != nil {
412 return fmt.Errorf("failed to update ACLs: %w", err)
413 }
414
415 l.Info("added spindle member")
416 case jmodels.CommitOperationDelete:
417 rkey := e.Commit.RKey
418
419 ddb, ok := i.Db.Execer.(*db.DB)
420 if !ok {
421 return fmt.Errorf("failed to index profile record, invalid db cast")
422 }
423
424 // get record from db first
425 members, err := db.GetSpindleMembers(
426 ddb,
427 db.FilterEq("did", did),
428 db.FilterEq("rkey", rkey),
429 )
430 if err != nil || len(members) != 1 {
431 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
432 }
433 member := members[0]
434
435 tx, err := ddb.Begin()
436 if err != nil {
437 return fmt.Errorf("failed to start txn: %w", err)
438 }
439
440 // remove record by rkey && update enforcer
441 if err = db.RemoveSpindleMember(
442 tx,
443 db.FilterEq("did", did),
444 db.FilterEq("rkey", rkey),
445 ); err != nil {
446 return fmt.Errorf("failed to remove from db: %w", err)
447 }
448
449 // update enforcer
450 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
451 if err != nil {
452 return fmt.Errorf("failed to update ACLs: %w", err)
453 }
454
455 if err = tx.Commit(); err != nil {
456 return fmt.Errorf("failed to commit txn: %w", err)
457 }
458
459 if err = i.Enforcer.E.SavePolicy(); err != nil {
460 return fmt.Errorf("failed to save ACLs: %w", err)
461 }
462
463 l.Info("removed spindle member")
464 }
465
466 return nil
467}
468
469func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
470 did := e.Did
471 var err error
472
473 l := i.Logger.With("handler", "ingestSpindle")
474 l = l.With("nsid", e.Commit.Collection)
475
476 switch e.Commit.Operation {
477 case jmodels.CommitOperationCreate:
478 raw := json.RawMessage(e.Commit.Record)
479 record := tangled.Spindle{}
480 err = json.Unmarshal(raw, &record)
481 if err != nil {
482 l.Error("invalid record", "err", err)
483 return err
484 }
485
486 instance := e.Commit.RKey
487
488 ddb, ok := i.Db.Execer.(*db.DB)
489 if !ok {
490 return fmt.Errorf("failed to index profile record, invalid db cast")
491 }
492
493 err := db.AddSpindle(ddb, models.Spindle{
494 Owner: syntax.DID(did),
495 Instance: instance,
496 })
497 if err != nil {
498 l.Error("failed to add spindle to db", "err", err, "instance", instance)
499 return err
500 }
501
502 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
503 if err != nil {
504 l.Error("failed to add spindle to db", "err", err, "instance", instance)
505 return err
506 }
507
508 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
509 if err != nil {
510 return fmt.Errorf("failed to mark verified: %w", err)
511 }
512
513 return nil
514
515 case jmodels.CommitOperationDelete:
516 instance := e.Commit.RKey
517
518 ddb, ok := i.Db.Execer.(*db.DB)
519 if !ok {
520 return fmt.Errorf("failed to index profile record, invalid db cast")
521 }
522
523 // get record from db first
524 spindles, err := db.GetSpindles(
525 ddb,
526 db.FilterEq("owner", did),
527 db.FilterEq("instance", instance),
528 )
529 if err != nil || len(spindles) != 1 {
530 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
531 }
532 spindle := spindles[0]
533
534 tx, err := ddb.Begin()
535 if err != nil {
536 return err
537 }
538 defer func() {
539 tx.Rollback()
540 i.Enforcer.E.LoadPolicy()
541 }()
542
543 // remove spindle members first
544 err = db.RemoveSpindleMember(
545 tx,
546 db.FilterEq("owner", did),
547 db.FilterEq("instance", instance),
548 )
549 if err != nil {
550 return err
551 }
552
553 err = db.DeleteSpindle(
554 tx,
555 db.FilterEq("owner", did),
556 db.FilterEq("instance", instance),
557 )
558 if err != nil {
559 return err
560 }
561
562 if spindle.Verified != nil {
563 err = i.Enforcer.RemoveSpindle(instance)
564 if err != nil {
565 return err
566 }
567 }
568
569 err = tx.Commit()
570 if err != nil {
571 return err
572 }
573
574 err = i.Enforcer.E.SavePolicy()
575 if err != nil {
576 return err
577 }
578 }
579
580 return nil
581}
582
583func (i *Ingester) ingestString(e *jmodels.Event) error {
584 did := e.Did
585 rkey := e.Commit.RKey
586
587 var err error
588
589 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
590 l.Info("ingesting record")
591
592 ddb, ok := i.Db.Execer.(*db.DB)
593 if !ok {
594 return fmt.Errorf("failed to index string record, invalid db cast")
595 }
596
597 switch e.Commit.Operation {
598 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
599 raw := json.RawMessage(e.Commit.Record)
600 record := tangled.String{}
601 err = json.Unmarshal(raw, &record)
602 if err != nil {
603 l.Error("invalid record", "err", err)
604 return err
605 }
606
607 string := models.StringFromRecord(did, rkey, record)
608
609 if err = i.Validator.ValidateString(&string); err != nil {
610 l.Error("invalid record", "err", err)
611 return err
612 }
613
614 if err = db.AddString(ddb, string); err != nil {
615 l.Error("failed to add string", "err", err)
616 return err
617 }
618
619 return nil
620
621 case jmodels.CommitOperationDelete:
622 if err := db.DeleteString(
623 ddb,
624 db.FilterEq("did", did),
625 db.FilterEq("rkey", rkey),
626 ); err != nil {
627 l.Error("failed to delete", "err", err)
628 return fmt.Errorf("failed to delete string record: %w", err)
629 }
630
631 return nil
632 }
633
634 return nil
635}
636
637func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
638 did := e.Did
639 var err error
640
641 l := i.Logger.With("handler", "ingestKnotMember")
642 l = l.With("nsid", e.Commit.Collection)
643
644 switch e.Commit.Operation {
645 case jmodels.CommitOperationCreate:
646 raw := json.RawMessage(e.Commit.Record)
647 record := tangled.KnotMember{}
648 err = json.Unmarshal(raw, &record)
649 if err != nil {
650 l.Error("invalid record", "err", err)
651 return err
652 }
653
654 // only knot owner can invite to knots
655 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
656 if err != nil || !ok {
657 return fmt.Errorf("failed to enforce permissions: %w", err)
658 }
659
660 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
661 if err != nil {
662 return err
663 }
664
665 if memberId.Handle.IsInvalidHandle() {
666 return err
667 }
668
669 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
670 if err != nil {
671 return fmt.Errorf("failed to update ACLs: %w", err)
672 }
673
674 l.Info("added knot member")
675 case jmodels.CommitOperationDelete:
676 // we don't store knot members in a table (like we do for spindle)
677 // and we can't remove this just yet. possibly fixed if we switch
678 // to either:
679 // 1. a knot_members table like with spindle and store the rkey
680 // 2. use the knot host as the rkey
681 //
682 // TODO: implement member deletion
683 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
684 }
685
686 return nil
687}
688
689func (i *Ingester) ingestKnot(e *jmodels.Event) error {
690 did := e.Did
691 var err error
692
693 l := i.Logger.With("handler", "ingestKnot")
694 l = l.With("nsid", e.Commit.Collection)
695
696 switch e.Commit.Operation {
697 case jmodels.CommitOperationCreate:
698 raw := json.RawMessage(e.Commit.Record)
699 record := tangled.Knot{}
700 err = json.Unmarshal(raw, &record)
701 if err != nil {
702 l.Error("invalid record", "err", err)
703 return err
704 }
705
706 domain := e.Commit.RKey
707
708 ddb, ok := i.Db.Execer.(*db.DB)
709 if !ok {
710 return fmt.Errorf("failed to index profile record, invalid db cast")
711 }
712
713 err := db.AddKnot(ddb, domain, did)
714 if err != nil {
715 l.Error("failed to add knot to db", "err", err, "domain", domain)
716 return err
717 }
718
719 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
720 if err != nil {
721 l.Error("failed to verify knot", "err", err, "domain", domain)
722 return err
723 }
724
725 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
726 if err != nil {
727 return fmt.Errorf("failed to mark verified: %w", err)
728 }
729
730 return nil
731
732 case jmodels.CommitOperationDelete:
733 domain := e.Commit.RKey
734
735 ddb, ok := i.Db.Execer.(*db.DB)
736 if !ok {
737 return fmt.Errorf("failed to index knot record, invalid db cast")
738 }
739
740 // get record from db first
741 registrations, err := db.GetRegistrations(
742 ddb,
743 db.FilterEq("domain", domain),
744 db.FilterEq("did", did),
745 )
746 if err != nil {
747 return fmt.Errorf("failed to get registration: %w", err)
748 }
749 if len(registrations) != 1 {
750 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
751 }
752 registration := registrations[0]
753
754 tx, err := ddb.Begin()
755 if err != nil {
756 return err
757 }
758 defer func() {
759 tx.Rollback()
760 i.Enforcer.E.LoadPolicy()
761 }()
762
763 err = db.DeleteKnot(
764 tx,
765 db.FilterEq("did", did),
766 db.FilterEq("domain", domain),
767 )
768 if err != nil {
769 return err
770 }
771
772 if registration.Registered != nil {
773 err = i.Enforcer.RemoveKnot(domain)
774 if err != nil {
775 return err
776 }
777 }
778
779 err = tx.Commit()
780 if err != nil {
781 return err
782 }
783
784 err = i.Enforcer.E.SavePolicy()
785 if err != nil {
786 return err
787 }
788 }
789
790 return nil
791}
792func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
793 did := e.Did
794 rkey := e.Commit.RKey
795
796 var err error
797
798 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
799 l.Info("ingesting record")
800
801 ddb, ok := i.Db.Execer.(*db.DB)
802 if !ok {
803 return fmt.Errorf("failed to index issue record, invalid db cast")
804 }
805
806 switch e.Commit.Operation {
807 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
808 raw := json.RawMessage(e.Commit.Record)
809 record := tangled.RepoIssue{}
810 err = json.Unmarshal(raw, &record)
811 if err != nil {
812 l.Error("invalid record", "err", err)
813 return err
814 }
815
816 issue := models.IssueFromRecord(did, rkey, record)
817
818 if err := i.Validator.ValidateIssue(&issue); err != nil {
819 return fmt.Errorf("failed to validate issue: %w", err)
820 }
821
822 tx, err := ddb.BeginTx(ctx, nil)
823 if err != nil {
824 l.Error("failed to begin transaction", "err", err)
825 return err
826 }
827 defer tx.Rollback()
828
829 err = db.PutIssue(tx, &issue)
830 if err != nil {
831 l.Error("failed to create issue", "err", err)
832 return err
833 }
834
835 err = tx.Commit()
836 if err != nil {
837 l.Error("failed to commit txn", "err", err)
838 return err
839 }
840
841 return nil
842
843 case jmodels.CommitOperationDelete:
844 if err := db.DeleteIssues(
845 ddb,
846 db.FilterEq("did", did),
847 db.FilterEq("rkey", rkey),
848 ); err != nil {
849 l.Error("failed to delete", "err", err)
850 return fmt.Errorf("failed to delete issue record: %w", err)
851 }
852
853 return nil
854 }
855
856 return nil
857}
858
859func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
860 did := e.Did
861 rkey := e.Commit.RKey
862
863 var err error
864
865 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
866 l.Info("ingesting record")
867
868 ddb, ok := i.Db.Execer.(*db.DB)
869 if !ok {
870 return fmt.Errorf("failed to index issue comment record, invalid db cast")
871 }
872
873 switch e.Commit.Operation {
874 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
875 raw := json.RawMessage(e.Commit.Record)
876 record := tangled.RepoIssueComment{}
877 err = json.Unmarshal(raw, &record)
878 if err != nil {
879 return fmt.Errorf("invalid record: %w", err)
880 }
881
882 comment, err := models.IssueCommentFromRecord(did, rkey, record)
883 if err != nil {
884 return fmt.Errorf("failed to parse comment from record: %w", err)
885 }
886
887 if err := i.Validator.ValidateIssueComment(comment); err != nil {
888 return fmt.Errorf("failed to validate comment: %w", err)
889 }
890
891 _, err = db.AddIssueComment(ddb, *comment)
892 if err != nil {
893 return fmt.Errorf("failed to create issue comment: %w", err)
894 }
895
896 return nil
897
898 case jmodels.CommitOperationDelete:
899 if err := db.DeleteIssueComments(
900 ddb,
901 db.FilterEq("did", did),
902 db.FilterEq("rkey", rkey),
903 ); err != nil {
904 return fmt.Errorf("failed to delete issue comment record: %w", err)
905 }
906
907 return nil
908 }
909
910 return nil
911}
912
913func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
914 did := e.Did
915 rkey := e.Commit.RKey
916
917 var err error
918
919 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
920 l.Info("ingesting record")
921
922 ddb, ok := i.Db.Execer.(*db.DB)
923 if !ok {
924 return fmt.Errorf("failed to index label definition, invalid db cast")
925 }
926
927 switch e.Commit.Operation {
928 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
929 raw := json.RawMessage(e.Commit.Record)
930 record := tangled.LabelDefinition{}
931 err = json.Unmarshal(raw, &record)
932 if err != nil {
933 return fmt.Errorf("invalid record: %w", err)
934 }
935
936 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
937 if err != nil {
938 return fmt.Errorf("failed to parse labeldef from record: %w", err)
939 }
940
941 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
942 return fmt.Errorf("failed to validate labeldef: %w", err)
943 }
944
945 _, err = db.AddLabelDefinition(ddb, def)
946 if err != nil {
947 return fmt.Errorf("failed to create labeldef: %w", err)
948 }
949
950 return nil
951
952 case jmodels.CommitOperationDelete:
953 if err := db.DeleteLabelDefinition(
954 ddb,
955 db.FilterEq("did", did),
956 db.FilterEq("rkey", rkey),
957 ); err != nil {
958 return fmt.Errorf("failed to delete labeldef record: %w", err)
959 }
960
961 return nil
962 }
963
964 return nil
965}
966
967func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
968 did := e.Did
969 rkey := e.Commit.RKey
970
971 var err error
972
973 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
974 l.Info("ingesting record")
975
976 ddb, ok := i.Db.Execer.(*db.DB)
977 if !ok {
978 return fmt.Errorf("failed to index label op, invalid db cast")
979 }
980
981 switch e.Commit.Operation {
982 case jmodels.CommitOperationCreate:
983 raw := json.RawMessage(e.Commit.Record)
984 record := tangled.LabelOp{}
985 err = json.Unmarshal(raw, &record)
986 if err != nil {
987 return fmt.Errorf("invalid record: %w", err)
988 }
989
990 subject := syntax.ATURI(record.Subject)
991 collection := subject.Collection()
992
993 var repo *models.Repo
994 switch collection {
995 case tangled.RepoIssueNSID:
996 i, err := db.GetIssues(ddb, db.FilterEq("at_uri", subject))
997 if err != nil || len(i) != 1 {
998 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
999 }
1000 repo = i[0].Repo
1001 default:
1002 return fmt.Errorf("unsupport label subject: %s", collection)
1003 }
1004
1005 actx, err := db.NewLabelApplicationCtx(ddb, db.FilterIn("at_uri", repo.Labels))
1006 if err != nil {
1007 return fmt.Errorf("failed to build label application ctx: %w", err)
1008 }
1009
1010 ops := models.LabelOpsFromRecord(did, rkey, record)
1011
1012 for _, o := range ops {
1013 def, ok := actx.Defs[o.OperandKey]
1014 if !ok {
1015 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1016 }
1017 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1018 return fmt.Errorf("failed to validate labelop: %w", err)
1019 }
1020 }
1021
1022 tx, err := ddb.Begin()
1023 if err != nil {
1024 return err
1025 }
1026 defer tx.Rollback()
1027
1028 for _, o := range ops {
1029 _, err = db.AddLabelOp(tx, &o)
1030 if err != nil {
1031 return fmt.Errorf("failed to add labelop: %w", err)
1032 }
1033 }
1034
1035 if err = tx.Commit(); err != nil {
1036 return err
1037 }
1038 }
1039
1040 return nil
1041}