1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "maps"
9 "slices"
10
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 jmodels "github.com/bluesky-social/jetstream/pkg/models"
15 "github.com/go-git/go-git/v5/plumbing"
16 "github.com/ipfs/go-cid"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/serververify"
22 "tangled.org/core/appview/validator"
23 "tangled.org/core/idresolver"
24 "tangled.org/core/rbac"
25)
26
27type Ingester struct {
28 Db db.DbWrapper
29 Enforcer *rbac.Enforcer
30 IdResolver *idresolver.Resolver
31 Config *config.Config
32 Logger *slog.Logger
33 Validator *validator.Validator
34}
35
36type processFunc func(ctx context.Context, e *jmodels.Event) error
37
38func (i *Ingester) Ingest() processFunc {
39 return func(ctx context.Context, e *jmodels.Event) error {
40 var err error
41 defer func() {
42 eventTime := e.TimeUS
43 lastTimeUs := eventTime + 1
44 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
45 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
46 }
47 }()
48
49 l := i.Logger.With("kind", e.Kind)
50 switch e.Kind {
51 case jmodels.EventKindAccount:
52 if !e.Account.Active && *e.Account.Status == "deactivated" {
53 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
54 }
55 case jmodels.EventKindIdentity:
56 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
57 case jmodels.EventKindCommit:
58 switch e.Commit.Collection {
59 case tangled.GraphFollowNSID:
60 err = i.ingestFollow(e)
61 case tangled.FeedStarNSID:
62 err = i.ingestStar(e)
63 case tangled.PublicKeyNSID:
64 err = i.ingestPublicKey(e)
65 case tangled.RepoArtifactNSID:
66 err = i.ingestArtifact(e)
67 case tangled.ActorProfileNSID:
68 err = i.ingestProfile(e)
69 case tangled.SpindleMemberNSID:
70 err = i.ingestSpindleMember(ctx, e)
71 case tangled.SpindleNSID:
72 err = i.ingestSpindle(ctx, e)
73 case tangled.KnotMemberNSID:
74 err = i.ingestKnotMember(e)
75 case tangled.KnotNSID:
76 err = i.ingestKnot(e)
77 case tangled.StringNSID:
78 err = i.ingestString(e)
79 case tangled.RepoIssueNSID:
80 err = i.ingestIssue(ctx, e)
81 case tangled.RepoIssueCommentNSID:
82 err = i.ingestIssueComment(e)
83 case tangled.LabelDefinitionNSID:
84 err = i.ingestLabelDefinition(e)
85 case tangled.LabelOpNSID:
86 err = i.ingestLabelOp(e)
87 }
88 l = i.Logger.With("nsid", e.Commit.Collection)
89 }
90
91 if err != nil {
92 l.Debug("error ingesting record", "err", err)
93 }
94
95 return nil
96 }
97}
98
99func (i *Ingester) ingestStar(e *jmodels.Event) error {
100 var err error
101 did := e.Did
102
103 l := i.Logger.With("handler", "ingestStar")
104 l = l.With("nsid", e.Commit.Collection)
105
106 switch e.Commit.Operation {
107 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
108 var subjectUri syntax.ATURI
109
110 raw := json.RawMessage(e.Commit.Record)
111 record := tangled.FeedStar{}
112 err := json.Unmarshal(raw, &record)
113 if err != nil {
114 l.Error("invalid record", "err", err)
115 return err
116 }
117
118 subjectUri, err = syntax.ParseATURI(record.Subject)
119 if err != nil {
120 l.Error("invalid record", "err", err)
121 return err
122 }
123 err = db.AddStar(i.Db, &models.Star{
124 StarredByDid: did,
125 RepoAt: subjectUri,
126 Rkey: e.Commit.RKey,
127 })
128 case jmodels.CommitOperationDelete:
129 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
130 }
131
132 if err != nil {
133 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
134 }
135
136 return nil
137}
138
139func (i *Ingester) ingestFollow(e *jmodels.Event) error {
140 var err error
141 did := e.Did
142
143 l := i.Logger.With("handler", "ingestFollow")
144 l = l.With("nsid", e.Commit.Collection)
145
146 switch e.Commit.Operation {
147 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
148 raw := json.RawMessage(e.Commit.Record)
149 record := tangled.GraphFollow{}
150 err = json.Unmarshal(raw, &record)
151 if err != nil {
152 l.Error("invalid record", "err", err)
153 return err
154 }
155
156 err = db.AddFollow(i.Db, &models.Follow{
157 UserDid: did,
158 SubjectDid: record.Subject,
159 Rkey: e.Commit.RKey,
160 })
161 case jmodels.CommitOperationDelete:
162 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
163 }
164
165 if err != nil {
166 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
167 }
168
169 return nil
170}
171
172func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
173 did := e.Did
174 var err error
175
176 l := i.Logger.With("handler", "ingestPublicKey")
177 l = l.With("nsid", e.Commit.Collection)
178
179 switch e.Commit.Operation {
180 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
181 l.Debug("processing add of pubkey")
182 raw := json.RawMessage(e.Commit.Record)
183 record := tangled.PublicKey{}
184 err = json.Unmarshal(raw, &record)
185 if err != nil {
186 l.Error("invalid record", "err", err)
187 return err
188 }
189
190 name := record.Name
191 key := record.Key
192 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
193 case jmodels.CommitOperationDelete:
194 l.Debug("processing delete of pubkey")
195 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
196 }
197
198 if err != nil {
199 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
200 }
201
202 return nil
203}
204
205func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
206 did := e.Did
207 var err error
208
209 l := i.Logger.With("handler", "ingestArtifact")
210 l = l.With("nsid", e.Commit.Collection)
211
212 switch e.Commit.Operation {
213 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
214 raw := json.RawMessage(e.Commit.Record)
215 record := tangled.RepoArtifact{}
216 err = json.Unmarshal(raw, &record)
217 if err != nil {
218 l.Error("invalid record", "err", err)
219 return err
220 }
221
222 repoAt, err := syntax.ParseATURI(record.Repo)
223 if err != nil {
224 return err
225 }
226
227 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
228 if err != nil {
229 return err
230 }
231
232 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
233 if err != nil || !ok {
234 return err
235 }
236
237 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
238 if err != nil {
239 createdAt = time.Now()
240 }
241
242 artifact := models.Artifact{
243 Did: did,
244 Rkey: e.Commit.RKey,
245 RepoAt: repoAt,
246 Tag: plumbing.Hash(record.Tag),
247 CreatedAt: createdAt,
248 BlobCid: cid.Cid(record.Artifact.Ref),
249 Name: record.Name,
250 Size: uint64(record.Artifact.Size),
251 MimeType: record.Artifact.MimeType,
252 }
253
254 err = db.AddArtifact(i.Db, artifact)
255 case jmodels.CommitOperationDelete:
256 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
257 }
258
259 if err != nil {
260 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
261 }
262
263 return nil
264}
265
266func (i *Ingester) ingestProfile(e *jmodels.Event) error {
267 did := e.Did
268 var err error
269
270 l := i.Logger.With("handler", "ingestProfile")
271 l = l.With("nsid", e.Commit.Collection)
272
273 if e.Commit.RKey != "self" {
274 return fmt.Errorf("ingestProfile only ingests `self` record")
275 }
276
277 switch e.Commit.Operation {
278 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
279 raw := json.RawMessage(e.Commit.Record)
280 record := tangled.ActorProfile{}
281 err = json.Unmarshal(raw, &record)
282 if err != nil {
283 l.Error("invalid record", "err", err)
284 return err
285 }
286
287 description := ""
288 if record.Description != nil {
289 description = *record.Description
290 }
291
292 includeBluesky := record.Bluesky
293
294 location := ""
295 if record.Location != nil {
296 location = *record.Location
297 }
298
299 var links [5]string
300 for i, l := range record.Links {
301 if i < 5 {
302 links[i] = l
303 }
304 }
305
306 var stats [2]models.VanityStat
307 for i, s := range record.Stats {
308 if i < 2 {
309 stats[i].Kind = models.VanityStatKind(s)
310 }
311 }
312
313 var pinned [6]syntax.ATURI
314 for i, r := range record.PinnedRepositories {
315 if i < 6 {
316 pinned[i] = syntax.ATURI(r)
317 }
318 }
319
320 profile := models.Profile{
321 Did: did,
322 Description: description,
323 IncludeBluesky: includeBluesky,
324 Location: location,
325 Links: links,
326 Stats: stats,
327 PinnedRepos: pinned,
328 }
329
330 ddb, ok := i.Db.Execer.(*db.DB)
331 if !ok {
332 return fmt.Errorf("failed to index profile record, invalid db cast")
333 }
334
335 tx, err := ddb.Begin()
336 if err != nil {
337 return fmt.Errorf("failed to start transaction")
338 }
339
340 err = db.ValidateProfile(tx, &profile)
341 if err != nil {
342 return fmt.Errorf("invalid profile record")
343 }
344
345 err = db.UpsertProfile(tx, &profile)
346 case jmodels.CommitOperationDelete:
347 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
348 }
349
350 if err != nil {
351 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
352 }
353
354 return nil
355}
356
357func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
358 did := e.Did
359 var err error
360
361 l := i.Logger.With("handler", "ingestSpindleMember")
362 l = l.With("nsid", e.Commit.Collection)
363
364 switch e.Commit.Operation {
365 case jmodels.CommitOperationCreate:
366 raw := json.RawMessage(e.Commit.Record)
367 record := tangled.SpindleMember{}
368 err = json.Unmarshal(raw, &record)
369 if err != nil {
370 l.Error("invalid record", "err", err)
371 return err
372 }
373
374 // only spindle owner can invite to spindles
375 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
376 if err != nil || !ok {
377 return fmt.Errorf("failed to enforce permissions: %w", err)
378 }
379
380 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
381 if err != nil {
382 return err
383 }
384
385 if memberId.Handle.IsInvalidHandle() {
386 return err
387 }
388
389 ddb, ok := i.Db.Execer.(*db.DB)
390 if !ok {
391 return fmt.Errorf("failed to index profile record, invalid db cast")
392 }
393
394 err = db.AddSpindleMember(ddb, models.SpindleMember{
395 Did: syntax.DID(did),
396 Rkey: e.Commit.RKey,
397 Instance: record.Instance,
398 Subject: memberId.DID,
399 })
400 if !ok {
401 return fmt.Errorf("failed to add to db: %w", err)
402 }
403
404 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
405 if err != nil {
406 return fmt.Errorf("failed to update ACLs: %w", err)
407 }
408
409 l.Info("added spindle member")
410 case jmodels.CommitOperationDelete:
411 rkey := e.Commit.RKey
412
413 ddb, ok := i.Db.Execer.(*db.DB)
414 if !ok {
415 return fmt.Errorf("failed to index profile record, invalid db cast")
416 }
417
418 // get record from db first
419 members, err := db.GetSpindleMembers(
420 ddb,
421 db.FilterEq("did", did),
422 db.FilterEq("rkey", rkey),
423 )
424 if err != nil || len(members) != 1 {
425 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
426 }
427 member := members[0]
428
429 tx, err := ddb.Begin()
430 if err != nil {
431 return fmt.Errorf("failed to start txn: %w", err)
432 }
433
434 // remove record by rkey && update enforcer
435 if err = db.RemoveSpindleMember(
436 tx,
437 db.FilterEq("did", did),
438 db.FilterEq("rkey", rkey),
439 ); err != nil {
440 return fmt.Errorf("failed to remove from db: %w", err)
441 }
442
443 // update enforcer
444 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
445 if err != nil {
446 return fmt.Errorf("failed to update ACLs: %w", err)
447 }
448
449 if err = tx.Commit(); err != nil {
450 return fmt.Errorf("failed to commit txn: %w", err)
451 }
452
453 if err = i.Enforcer.E.SavePolicy(); err != nil {
454 return fmt.Errorf("failed to save ACLs: %w", err)
455 }
456
457 l.Info("removed spindle member")
458 }
459
460 return nil
461}
462
463func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
464 did := e.Did
465 var err error
466
467 l := i.Logger.With("handler", "ingestSpindle")
468 l = l.With("nsid", e.Commit.Collection)
469
470 switch e.Commit.Operation {
471 case jmodels.CommitOperationCreate:
472 raw := json.RawMessage(e.Commit.Record)
473 record := tangled.Spindle{}
474 err = json.Unmarshal(raw, &record)
475 if err != nil {
476 l.Error("invalid record", "err", err)
477 return err
478 }
479
480 instance := e.Commit.RKey
481
482 ddb, ok := i.Db.Execer.(*db.DB)
483 if !ok {
484 return fmt.Errorf("failed to index profile record, invalid db cast")
485 }
486
487 err := db.AddSpindle(ddb, models.Spindle{
488 Owner: syntax.DID(did),
489 Instance: instance,
490 })
491 if err != nil {
492 l.Error("failed to add spindle to db", "err", err, "instance", instance)
493 return err
494 }
495
496 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
497 if err != nil {
498 l.Error("failed to add spindle to db", "err", err, "instance", instance)
499 return err
500 }
501
502 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
503 if err != nil {
504 return fmt.Errorf("failed to mark verified: %w", err)
505 }
506
507 return nil
508
509 case jmodels.CommitOperationDelete:
510 instance := e.Commit.RKey
511
512 ddb, ok := i.Db.Execer.(*db.DB)
513 if !ok {
514 return fmt.Errorf("failed to index profile record, invalid db cast")
515 }
516
517 // get record from db first
518 spindles, err := db.GetSpindles(
519 ddb,
520 db.FilterEq("owner", did),
521 db.FilterEq("instance", instance),
522 )
523 if err != nil || len(spindles) != 1 {
524 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
525 }
526 spindle := spindles[0]
527
528 tx, err := ddb.Begin()
529 if err != nil {
530 return err
531 }
532 defer func() {
533 tx.Rollback()
534 i.Enforcer.E.LoadPolicy()
535 }()
536
537 // remove spindle members first
538 err = db.RemoveSpindleMember(
539 tx,
540 db.FilterEq("owner", did),
541 db.FilterEq("instance", instance),
542 )
543 if err != nil {
544 return err
545 }
546
547 err = db.DeleteSpindle(
548 tx,
549 db.FilterEq("owner", did),
550 db.FilterEq("instance", instance),
551 )
552 if err != nil {
553 return err
554 }
555
556 if spindle.Verified != nil {
557 err = i.Enforcer.RemoveSpindle(instance)
558 if err != nil {
559 return err
560 }
561 }
562
563 err = tx.Commit()
564 if err != nil {
565 return err
566 }
567
568 err = i.Enforcer.E.SavePolicy()
569 if err != nil {
570 return err
571 }
572 }
573
574 return nil
575}
576
577func (i *Ingester) ingestString(e *jmodels.Event) error {
578 did := e.Did
579 rkey := e.Commit.RKey
580
581 var err error
582
583 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
584 l.Info("ingesting record")
585
586 ddb, ok := i.Db.Execer.(*db.DB)
587 if !ok {
588 return fmt.Errorf("failed to index string record, invalid db cast")
589 }
590
591 switch e.Commit.Operation {
592 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
593 raw := json.RawMessage(e.Commit.Record)
594 record := tangled.String{}
595 err = json.Unmarshal(raw, &record)
596 if err != nil {
597 l.Error("invalid record", "err", err)
598 return err
599 }
600
601 string := models.StringFromRecord(did, rkey, record)
602
603 if err = i.Validator.ValidateString(&string); err != nil {
604 l.Error("invalid record", "err", err)
605 return err
606 }
607
608 if err = db.AddString(ddb, string); err != nil {
609 l.Error("failed to add string", "err", err)
610 return err
611 }
612
613 return nil
614
615 case jmodels.CommitOperationDelete:
616 if err := db.DeleteString(
617 ddb,
618 db.FilterEq("did", did),
619 db.FilterEq("rkey", rkey),
620 ); err != nil {
621 l.Error("failed to delete", "err", err)
622 return fmt.Errorf("failed to delete string record: %w", err)
623 }
624
625 return nil
626 }
627
628 return nil
629}
630
631func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
632 did := e.Did
633 var err error
634
635 l := i.Logger.With("handler", "ingestKnotMember")
636 l = l.With("nsid", e.Commit.Collection)
637
638 switch e.Commit.Operation {
639 case jmodels.CommitOperationCreate:
640 raw := json.RawMessage(e.Commit.Record)
641 record := tangled.KnotMember{}
642 err = json.Unmarshal(raw, &record)
643 if err != nil {
644 l.Error("invalid record", "err", err)
645 return err
646 }
647
648 // only knot owner can invite to knots
649 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
650 if err != nil || !ok {
651 return fmt.Errorf("failed to enforce permissions: %w", err)
652 }
653
654 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
655 if err != nil {
656 return err
657 }
658
659 if memberId.Handle.IsInvalidHandle() {
660 return err
661 }
662
663 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
664 if err != nil {
665 return fmt.Errorf("failed to update ACLs: %w", err)
666 }
667
668 l.Info("added knot member")
669 case jmodels.CommitOperationDelete:
670 // we don't store knot members in a table (like we do for spindle)
671 // and we can't remove this just yet. possibly fixed if we switch
672 // to either:
673 // 1. a knot_members table like with spindle and store the rkey
674 // 2. use the knot host as the rkey
675 //
676 // TODO: implement member deletion
677 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
678 }
679
680 return nil
681}
682
683func (i *Ingester) ingestKnot(e *jmodels.Event) error {
684 did := e.Did
685 var err error
686
687 l := i.Logger.With("handler", "ingestKnot")
688 l = l.With("nsid", e.Commit.Collection)
689
690 switch e.Commit.Operation {
691 case jmodels.CommitOperationCreate:
692 raw := json.RawMessage(e.Commit.Record)
693 record := tangled.Knot{}
694 err = json.Unmarshal(raw, &record)
695 if err != nil {
696 l.Error("invalid record", "err", err)
697 return err
698 }
699
700 domain := e.Commit.RKey
701
702 ddb, ok := i.Db.Execer.(*db.DB)
703 if !ok {
704 return fmt.Errorf("failed to index profile record, invalid db cast")
705 }
706
707 err := db.AddKnot(ddb, domain, did)
708 if err != nil {
709 l.Error("failed to add knot to db", "err", err, "domain", domain)
710 return err
711 }
712
713 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
714 if err != nil {
715 l.Error("failed to verify knot", "err", err, "domain", domain)
716 return err
717 }
718
719 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
720 if err != nil {
721 return fmt.Errorf("failed to mark verified: %w", err)
722 }
723
724 return nil
725
726 case jmodels.CommitOperationDelete:
727 domain := e.Commit.RKey
728
729 ddb, ok := i.Db.Execer.(*db.DB)
730 if !ok {
731 return fmt.Errorf("failed to index knot record, invalid db cast")
732 }
733
734 // get record from db first
735 registrations, err := db.GetRegistrations(
736 ddb,
737 db.FilterEq("domain", domain),
738 db.FilterEq("did", did),
739 )
740 if err != nil {
741 return fmt.Errorf("failed to get registration: %w", err)
742 }
743 if len(registrations) != 1 {
744 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
745 }
746 registration := registrations[0]
747
748 tx, err := ddb.Begin()
749 if err != nil {
750 return err
751 }
752 defer func() {
753 tx.Rollback()
754 i.Enforcer.E.LoadPolicy()
755 }()
756
757 err = db.DeleteKnot(
758 tx,
759 db.FilterEq("did", did),
760 db.FilterEq("domain", domain),
761 )
762 if err != nil {
763 return err
764 }
765
766 if registration.Registered != nil {
767 err = i.Enforcer.RemoveKnot(domain)
768 if err != nil {
769 return err
770 }
771 }
772
773 err = tx.Commit()
774 if err != nil {
775 return err
776 }
777
778 err = i.Enforcer.E.SavePolicy()
779 if err != nil {
780 return err
781 }
782 }
783
784 return nil
785}
786func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
787 did := e.Did
788 rkey := e.Commit.RKey
789
790 var err error
791
792 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
793 l.Info("ingesting record")
794
795 ddb, ok := i.Db.Execer.(*db.DB)
796 if !ok {
797 return fmt.Errorf("failed to index issue record, invalid db cast")
798 }
799
800 switch e.Commit.Operation {
801 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
802 raw := json.RawMessage(e.Commit.Record)
803 record := tangled.RepoIssue{}
804 err = json.Unmarshal(raw, &record)
805 if err != nil {
806 l.Error("invalid record", "err", err)
807 return err
808 }
809
810 issue := models.IssueFromRecord(did, rkey, record)
811
812 if err := i.Validator.ValidateIssue(&issue); err != nil {
813 return fmt.Errorf("failed to validate issue: %w", err)
814 }
815
816 tx, err := ddb.BeginTx(ctx, nil)
817 if err != nil {
818 l.Error("failed to begin transaction", "err", err)
819 return err
820 }
821 defer tx.Rollback()
822
823 err = db.PutIssue(tx, &issue)
824 if err != nil {
825 l.Error("failed to create issue", "err", err)
826 return err
827 }
828
829 err = tx.Commit()
830 if err != nil {
831 l.Error("failed to commit txn", "err", err)
832 return err
833 }
834
835 return nil
836
837 case jmodels.CommitOperationDelete:
838 if err := db.DeleteIssues(
839 ddb,
840 db.FilterEq("did", did),
841 db.FilterEq("rkey", rkey),
842 ); err != nil {
843 l.Error("failed to delete", "err", err)
844 return fmt.Errorf("failed to delete issue record: %w", err)
845 }
846
847 return nil
848 }
849
850 return nil
851}
852
853func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
854 did := e.Did
855 rkey := e.Commit.RKey
856
857 var err error
858
859 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
860 l.Info("ingesting record")
861
862 ddb, ok := i.Db.Execer.(*db.DB)
863 if !ok {
864 return fmt.Errorf("failed to index issue comment record, invalid db cast")
865 }
866
867 switch e.Commit.Operation {
868 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
869 raw := json.RawMessage(e.Commit.Record)
870 record := tangled.RepoIssueComment{}
871 err = json.Unmarshal(raw, &record)
872 if err != nil {
873 return fmt.Errorf("invalid record: %w", err)
874 }
875
876 comment, err := models.IssueCommentFromRecord(did, rkey, record)
877 if err != nil {
878 return fmt.Errorf("failed to parse comment from record: %w", err)
879 }
880
881 if err := i.Validator.ValidateIssueComment(comment); err != nil {
882 return fmt.Errorf("failed to validate comment: %w", err)
883 }
884
885 _, err = db.AddIssueComment(ddb, *comment)
886 if err != nil {
887 return fmt.Errorf("failed to create issue comment: %w", err)
888 }
889
890 return nil
891
892 case jmodels.CommitOperationDelete:
893 if err := db.DeleteIssueComments(
894 ddb,
895 db.FilterEq("did", did),
896 db.FilterEq("rkey", rkey),
897 ); err != nil {
898 return fmt.Errorf("failed to delete issue comment record: %w", err)
899 }
900
901 return nil
902 }
903
904 return nil
905}
906
907func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
908 did := e.Did
909 rkey := e.Commit.RKey
910
911 var err error
912
913 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
914 l.Info("ingesting record")
915
916 ddb, ok := i.Db.Execer.(*db.DB)
917 if !ok {
918 return fmt.Errorf("failed to index label definition, invalid db cast")
919 }
920
921 switch e.Commit.Operation {
922 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
923 raw := json.RawMessage(e.Commit.Record)
924 record := tangled.LabelDefinition{}
925 err = json.Unmarshal(raw, &record)
926 if err != nil {
927 return fmt.Errorf("invalid record: %w", err)
928 }
929
930 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
931 if err != nil {
932 return fmt.Errorf("failed to parse labeldef from record: %w", err)
933 }
934
935 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
936 return fmt.Errorf("failed to validate labeldef: %w", err)
937 }
938
939 _, err = db.AddLabelDefinition(ddb, def)
940 if err != nil {
941 return fmt.Errorf("failed to create labeldef: %w", err)
942 }
943
944 return nil
945
946 case jmodels.CommitOperationDelete:
947 if err := db.DeleteLabelDefinition(
948 ddb,
949 db.FilterEq("did", did),
950 db.FilterEq("rkey", rkey),
951 ); err != nil {
952 return fmt.Errorf("failed to delete labeldef record: %w", err)
953 }
954
955 return nil
956 }
957
958 return nil
959}
960
961func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
962 did := e.Did
963 rkey := e.Commit.RKey
964
965 var err error
966
967 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
968 l.Info("ingesting record")
969
970 ddb, ok := i.Db.Execer.(*db.DB)
971 if !ok {
972 return fmt.Errorf("failed to index label op, invalid db cast")
973 }
974
975 switch e.Commit.Operation {
976 case jmodels.CommitOperationCreate:
977 raw := json.RawMessage(e.Commit.Record)
978 record := tangled.LabelOp{}
979 err = json.Unmarshal(raw, &record)
980 if err != nil {
981 return fmt.Errorf("invalid record: %w", err)
982 }
983
984 subject := syntax.ATURI(record.Subject)
985 collection := subject.Collection()
986
987 var repo *models.Repo
988 switch collection {
989 case tangled.RepoIssueNSID:
990 i, err := db.GetIssues(ddb, db.FilterEq("at_uri", subject))
991 if err != nil || len(i) != 1 {
992 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
993 }
994 repo = i[0].Repo
995 default:
996 return fmt.Errorf("unsupport label subject: %s", collection)
997 }
998
999 actx, err := db.NewLabelApplicationCtx(ddb, db.FilterIn("at_uri", repo.Labels))
1000 if err != nil {
1001 return fmt.Errorf("failed to build label application ctx: %w", err)
1002 }
1003
1004 ops := models.LabelOpsFromRecord(did, rkey, record)
1005
1006 for _, o := range ops {
1007 def, ok := actx.Defs[o.OperandKey]
1008 if !ok {
1009 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1010 }
1011 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1012 return fmt.Errorf("failed to validate labelop: %w", err)
1013 }
1014 }
1015
1016 tx, err := ddb.Begin()
1017 if err != nil {
1018 return err
1019 }
1020 defer tx.Rollback()
1021
1022 for _, o := range ops {
1023 _, err = db.AddLabelOp(tx, &o)
1024 if err != nil {
1025 return fmt.Errorf("failed to add labelop: %w", err)
1026 }
1027 }
1028
1029 if err = tx.Commit(); err != nil {
1030 return err
1031 }
1032 }
1033
1034 return nil
1035}