1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "github.com/bluesky-social/jetstream/pkg/models"
13 "github.com/go-git/go-git/v5/plumbing"
14 "github.com/ipfs/go-cid"
15 "tangled.sh/tangled.sh/core/api/tangled"
16 "tangled.sh/tangled.sh/core/appview/config"
17 "tangled.sh/tangled.sh/core/appview/db"
18 "tangled.sh/tangled.sh/core/appview/serververify"
19 "tangled.sh/tangled.sh/core/appview/validator"
20 "tangled.sh/tangled.sh/core/idresolver"
21 "tangled.sh/tangled.sh/core/rbac"
22)
23
24type Ingester struct {
25 Db db.DbWrapper
26 Enforcer *rbac.Enforcer
27 IdResolver *idresolver.Resolver
28 Config *config.Config
29 Logger *slog.Logger
30 Validator *validator.Validator
31}
32
33type processFunc func(ctx context.Context, e *models.Event) error
34
35func (i *Ingester) Ingest() processFunc {
36 return func(ctx context.Context, e *models.Event) error {
37 var err error
38 defer func() {
39 eventTime := e.TimeUS
40 lastTimeUs := eventTime + 1
41 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
42 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
43 }
44 }()
45
46 l := i.Logger.With("kind", e.Kind)
47 switch e.Kind {
48 case models.EventKindAccount:
49 if !e.Account.Active && *e.Account.Status == "deactivated" {
50 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
51 }
52 case models.EventKindIdentity:
53 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
54 case models.EventKindCommit:
55 switch e.Commit.Collection {
56 case tangled.GraphFollowNSID:
57 err = i.ingestFollow(e)
58 case tangled.FeedStarNSID:
59 err = i.ingestStar(e)
60 case tangled.PublicKeyNSID:
61 err = i.ingestPublicKey(e)
62 case tangled.RepoArtifactNSID:
63 err = i.ingestArtifact(e)
64 case tangled.ActorProfileNSID:
65 err = i.ingestProfile(e)
66 case tangled.SpindleMemberNSID:
67 err = i.ingestSpindleMember(ctx, e)
68 case tangled.SpindleNSID:
69 err = i.ingestSpindle(ctx, e)
70 case tangled.KnotMemberNSID:
71 err = i.ingestKnotMember(e)
72 case tangled.KnotNSID:
73 err = i.ingestKnot(e)
74 case tangled.StringNSID:
75 err = i.ingestString(e)
76 case tangled.RepoIssueNSID:
77 err = i.ingestIssue(ctx, e)
78 case tangled.RepoIssueCommentNSID:
79 err = i.ingestIssueComment(e)
80 }
81 l = i.Logger.With("nsid", e.Commit.Collection)
82 }
83
84 if err != nil {
85 l.Debug("error ingesting record", "err", err)
86 }
87
88 return nil
89 }
90}
91
92func (i *Ingester) ingestStar(e *models.Event) error {
93 var err error
94 did := e.Did
95
96 l := i.Logger.With("handler", "ingestStar")
97 l = l.With("nsid", e.Commit.Collection)
98
99 switch e.Commit.Operation {
100 case models.CommitOperationCreate, models.CommitOperationUpdate:
101 var subjectUri syntax.ATURI
102
103 raw := json.RawMessage(e.Commit.Record)
104 record := tangled.FeedStar{}
105 err := json.Unmarshal(raw, &record)
106 if err != nil {
107 l.Error("invalid record", "err", err)
108 return err
109 }
110
111 subjectUri, err = syntax.ParseATURI(record.Subject)
112 if err != nil {
113 l.Error("invalid record", "err", err)
114 return err
115 }
116 err = db.AddStar(i.Db, &db.Star{
117 StarredByDid: did,
118 RepoAt: subjectUri,
119 Rkey: e.Commit.RKey,
120 })
121 case models.CommitOperationDelete:
122 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
123 }
124
125 if err != nil {
126 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
127 }
128
129 return nil
130}
131
132func (i *Ingester) ingestFollow(e *models.Event) error {
133 var err error
134 did := e.Did
135
136 l := i.Logger.With("handler", "ingestFollow")
137 l = l.With("nsid", e.Commit.Collection)
138
139 switch e.Commit.Operation {
140 case models.CommitOperationCreate, models.CommitOperationUpdate:
141 raw := json.RawMessage(e.Commit.Record)
142 record := tangled.GraphFollow{}
143 err = json.Unmarshal(raw, &record)
144 if err != nil {
145 l.Error("invalid record", "err", err)
146 return err
147 }
148
149 err = db.AddFollow(i.Db, &db.Follow{
150 UserDid: did,
151 SubjectDid: record.Subject,
152 Rkey: e.Commit.RKey,
153 })
154 case models.CommitOperationDelete:
155 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
156 }
157
158 if err != nil {
159 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
160 }
161
162 return nil
163}
164
165func (i *Ingester) ingestPublicKey(e *models.Event) error {
166 did := e.Did
167 var err error
168
169 l := i.Logger.With("handler", "ingestPublicKey")
170 l = l.With("nsid", e.Commit.Collection)
171
172 switch e.Commit.Operation {
173 case models.CommitOperationCreate, models.CommitOperationUpdate:
174 l.Debug("processing add of pubkey")
175 raw := json.RawMessage(e.Commit.Record)
176 record := tangled.PublicKey{}
177 err = json.Unmarshal(raw, &record)
178 if err != nil {
179 l.Error("invalid record", "err", err)
180 return err
181 }
182
183 name := record.Name
184 key := record.Key
185 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
186 case models.CommitOperationDelete:
187 l.Debug("processing delete of pubkey")
188 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
189 }
190
191 if err != nil {
192 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
193 }
194
195 return nil
196}
197
198func (i *Ingester) ingestArtifact(e *models.Event) error {
199 did := e.Did
200 var err error
201
202 l := i.Logger.With("handler", "ingestArtifact")
203 l = l.With("nsid", e.Commit.Collection)
204
205 switch e.Commit.Operation {
206 case models.CommitOperationCreate, models.CommitOperationUpdate:
207 raw := json.RawMessage(e.Commit.Record)
208 record := tangled.RepoArtifact{}
209 err = json.Unmarshal(raw, &record)
210 if err != nil {
211 l.Error("invalid record", "err", err)
212 return err
213 }
214
215 repoAt, err := syntax.ParseATURI(record.Repo)
216 if err != nil {
217 return err
218 }
219
220 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
221 if err != nil {
222 return err
223 }
224
225 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
226 if err != nil || !ok {
227 return err
228 }
229
230 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
231 if err != nil {
232 createdAt = time.Now()
233 }
234
235 artifact := db.Artifact{
236 Did: did,
237 Rkey: e.Commit.RKey,
238 RepoAt: repoAt,
239 Tag: plumbing.Hash(record.Tag),
240 CreatedAt: createdAt,
241 BlobCid: cid.Cid(record.Artifact.Ref),
242 Name: record.Name,
243 Size: uint64(record.Artifact.Size),
244 MimeType: record.Artifact.MimeType,
245 }
246
247 err = db.AddArtifact(i.Db, artifact)
248 case models.CommitOperationDelete:
249 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
250 }
251
252 if err != nil {
253 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
254 }
255
256 return nil
257}
258
259func (i *Ingester) ingestProfile(e *models.Event) error {
260 did := e.Did
261 var err error
262
263 l := i.Logger.With("handler", "ingestProfile")
264 l = l.With("nsid", e.Commit.Collection)
265
266 if e.Commit.RKey != "self" {
267 return fmt.Errorf("ingestProfile only ingests `self` record")
268 }
269
270 switch e.Commit.Operation {
271 case models.CommitOperationCreate, models.CommitOperationUpdate:
272 raw := json.RawMessage(e.Commit.Record)
273 record := tangled.ActorProfile{}
274 err = json.Unmarshal(raw, &record)
275 if err != nil {
276 l.Error("invalid record", "err", err)
277 return err
278 }
279
280 description := ""
281 if record.Description != nil {
282 description = *record.Description
283 }
284
285 includeBluesky := record.Bluesky
286
287 location := ""
288 if record.Location != nil {
289 location = *record.Location
290 }
291
292 var links [5]string
293 for i, l := range record.Links {
294 if i < 5 {
295 links[i] = l
296 }
297 }
298
299 var stats [2]db.VanityStat
300 for i, s := range record.Stats {
301 if i < 2 {
302 stats[i].Kind = db.VanityStatKind(s)
303 }
304 }
305
306 var pinned [6]syntax.ATURI
307 for i, r := range record.PinnedRepositories {
308 if i < 6 {
309 pinned[i] = syntax.ATURI(r)
310 }
311 }
312
313 profile := db.Profile{
314 Did: did,
315 Description: description,
316 IncludeBluesky: includeBluesky,
317 Location: location,
318 Links: links,
319 Stats: stats,
320 PinnedRepos: pinned,
321 }
322
323 ddb, ok := i.Db.Execer.(*db.DB)
324 if !ok {
325 return fmt.Errorf("failed to index profile record, invalid db cast")
326 }
327
328 tx, err := ddb.Begin()
329 if err != nil {
330 return fmt.Errorf("failed to start transaction")
331 }
332
333 err = db.ValidateProfile(tx, &profile)
334 if err != nil {
335 return fmt.Errorf("invalid profile record")
336 }
337
338 err = db.UpsertProfile(tx, &profile)
339 case models.CommitOperationDelete:
340 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
341 }
342
343 if err != nil {
344 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
345 }
346
347 return nil
348}
349
350func (i *Ingester) ingestSpindleMember(ctx context.Context, e *models.Event) error {
351 did := e.Did
352 var err error
353
354 l := i.Logger.With("handler", "ingestSpindleMember")
355 l = l.With("nsid", e.Commit.Collection)
356
357 switch e.Commit.Operation {
358 case models.CommitOperationCreate:
359 raw := json.RawMessage(e.Commit.Record)
360 record := tangled.SpindleMember{}
361 err = json.Unmarshal(raw, &record)
362 if err != nil {
363 l.Error("invalid record", "err", err)
364 return err
365 }
366
367 // only spindle owner can invite to spindles
368 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
369 if err != nil || !ok {
370 return fmt.Errorf("failed to enforce permissions: %w", err)
371 }
372
373 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
374 if err != nil {
375 return err
376 }
377
378 if memberId.Handle.IsInvalidHandle() {
379 return err
380 }
381
382 ddb, ok := i.Db.Execer.(*db.DB)
383 if !ok {
384 return fmt.Errorf("failed to index profile record, invalid db cast")
385 }
386
387 err = db.AddSpindleMember(ddb, db.SpindleMember{
388 Did: syntax.DID(did),
389 Rkey: e.Commit.RKey,
390 Instance: record.Instance,
391 Subject: memberId.DID,
392 })
393 if !ok {
394 return fmt.Errorf("failed to add to db: %w", err)
395 }
396
397 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
398 if err != nil {
399 return fmt.Errorf("failed to update ACLs: %w", err)
400 }
401
402 l.Info("added spindle member")
403 case models.CommitOperationDelete:
404 rkey := e.Commit.RKey
405
406 ddb, ok := i.Db.Execer.(*db.DB)
407 if !ok {
408 return fmt.Errorf("failed to index profile record, invalid db cast")
409 }
410
411 // get record from db first
412 members, err := db.GetSpindleMembers(
413 ddb,
414 db.FilterEq("did", did),
415 db.FilterEq("rkey", rkey),
416 )
417 if err != nil || len(members) != 1 {
418 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
419 }
420 member := members[0]
421
422 tx, err := ddb.Begin()
423 if err != nil {
424 return fmt.Errorf("failed to start txn: %w", err)
425 }
426
427 // remove record by rkey && update enforcer
428 if err = db.RemoveSpindleMember(
429 tx,
430 db.FilterEq("did", did),
431 db.FilterEq("rkey", rkey),
432 ); err != nil {
433 return fmt.Errorf("failed to remove from db: %w", err)
434 }
435
436 // update enforcer
437 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
438 if err != nil {
439 return fmt.Errorf("failed to update ACLs: %w", err)
440 }
441
442 if err = tx.Commit(); err != nil {
443 return fmt.Errorf("failed to commit txn: %w", err)
444 }
445
446 if err = i.Enforcer.E.SavePolicy(); err != nil {
447 return fmt.Errorf("failed to save ACLs: %w", err)
448 }
449
450 l.Info("removed spindle member")
451 }
452
453 return nil
454}
455
456func (i *Ingester) ingestSpindle(ctx context.Context, e *models.Event) error {
457 did := e.Did
458 var err error
459
460 l := i.Logger.With("handler", "ingestSpindle")
461 l = l.With("nsid", e.Commit.Collection)
462
463 switch e.Commit.Operation {
464 case models.CommitOperationCreate:
465 raw := json.RawMessage(e.Commit.Record)
466 record := tangled.Spindle{}
467 err = json.Unmarshal(raw, &record)
468 if err != nil {
469 l.Error("invalid record", "err", err)
470 return err
471 }
472
473 instance := e.Commit.RKey
474
475 ddb, ok := i.Db.Execer.(*db.DB)
476 if !ok {
477 return fmt.Errorf("failed to index profile record, invalid db cast")
478 }
479
480 err := db.AddSpindle(ddb, db.Spindle{
481 Owner: syntax.DID(did),
482 Instance: instance,
483 })
484 if err != nil {
485 l.Error("failed to add spindle to db", "err", err, "instance", instance)
486 return err
487 }
488
489 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
490 if err != nil {
491 l.Error("failed to add spindle to db", "err", err, "instance", instance)
492 return err
493 }
494
495 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
496 if err != nil {
497 return fmt.Errorf("failed to mark verified: %w", err)
498 }
499
500 return nil
501
502 case models.CommitOperationDelete:
503 instance := e.Commit.RKey
504
505 ddb, ok := i.Db.Execer.(*db.DB)
506 if !ok {
507 return fmt.Errorf("failed to index profile record, invalid db cast")
508 }
509
510 // get record from db first
511 spindles, err := db.GetSpindles(
512 ddb,
513 db.FilterEq("owner", did),
514 db.FilterEq("instance", instance),
515 )
516 if err != nil || len(spindles) != 1 {
517 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
518 }
519 spindle := spindles[0]
520
521 tx, err := ddb.Begin()
522 if err != nil {
523 return err
524 }
525 defer func() {
526 tx.Rollback()
527 i.Enforcer.E.LoadPolicy()
528 }()
529
530 // remove spindle members first
531 err = db.RemoveSpindleMember(
532 tx,
533 db.FilterEq("owner", did),
534 db.FilterEq("instance", instance),
535 )
536 if err != nil {
537 return err
538 }
539
540 err = db.DeleteSpindle(
541 tx,
542 db.FilterEq("owner", did),
543 db.FilterEq("instance", instance),
544 )
545 if err != nil {
546 return err
547 }
548
549 if spindle.Verified != nil {
550 err = i.Enforcer.RemoveSpindle(instance)
551 if err != nil {
552 return err
553 }
554 }
555
556 err = tx.Commit()
557 if err != nil {
558 return err
559 }
560
561 err = i.Enforcer.E.SavePolicy()
562 if err != nil {
563 return err
564 }
565 }
566
567 return nil
568}
569
570func (i *Ingester) ingestString(e *models.Event) error {
571 did := e.Did
572 rkey := e.Commit.RKey
573
574 var err error
575
576 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
577 l.Info("ingesting record")
578
579 ddb, ok := i.Db.Execer.(*db.DB)
580 if !ok {
581 return fmt.Errorf("failed to index string record, invalid db cast")
582 }
583
584 switch e.Commit.Operation {
585 case models.CommitOperationCreate, models.CommitOperationUpdate:
586 raw := json.RawMessage(e.Commit.Record)
587 record := tangled.String{}
588 err = json.Unmarshal(raw, &record)
589 if err != nil {
590 l.Error("invalid record", "err", err)
591 return err
592 }
593
594 string := db.StringFromRecord(did, rkey, record)
595
596 if err = string.Validate(); err != nil {
597 l.Error("invalid record", "err", err)
598 return err
599 }
600
601 if err = db.AddString(ddb, string); err != nil {
602 l.Error("failed to add string", "err", err)
603 return err
604 }
605
606 return nil
607
608 case models.CommitOperationDelete:
609 if err := db.DeleteString(
610 ddb,
611 db.FilterEq("did", did),
612 db.FilterEq("rkey", rkey),
613 ); err != nil {
614 l.Error("failed to delete", "err", err)
615 return fmt.Errorf("failed to delete string record: %w", err)
616 }
617
618 return nil
619 }
620
621 return nil
622}
623
624func (i *Ingester) ingestKnotMember(e *models.Event) error {
625 did := e.Did
626 var err error
627
628 l := i.Logger.With("handler", "ingestKnotMember")
629 l = l.With("nsid", e.Commit.Collection)
630
631 switch e.Commit.Operation {
632 case models.CommitOperationCreate:
633 raw := json.RawMessage(e.Commit.Record)
634 record := tangled.KnotMember{}
635 err = json.Unmarshal(raw, &record)
636 if err != nil {
637 l.Error("invalid record", "err", err)
638 return err
639 }
640
641 // only knot owner can invite to knots
642 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
643 if err != nil || !ok {
644 return fmt.Errorf("failed to enforce permissions: %w", err)
645 }
646
647 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
648 if err != nil {
649 return err
650 }
651
652 if memberId.Handle.IsInvalidHandle() {
653 return err
654 }
655
656 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
657 if err != nil {
658 return fmt.Errorf("failed to update ACLs: %w", err)
659 }
660
661 l.Info("added knot member")
662 case models.CommitOperationDelete:
663 // we don't store knot members in a table (like we do for spindle)
664 // and we can't remove this just yet. possibly fixed if we switch
665 // to either:
666 // 1. a knot_members table like with spindle and store the rkey
667 // 2. use the knot host as the rkey
668 //
669 // TODO: implement member deletion
670 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
671 }
672
673 return nil
674}
675
676func (i *Ingester) ingestKnot(e *models.Event) error {
677 did := e.Did
678 var err error
679
680 l := i.Logger.With("handler", "ingestKnot")
681 l = l.With("nsid", e.Commit.Collection)
682
683 switch e.Commit.Operation {
684 case models.CommitOperationCreate:
685 raw := json.RawMessage(e.Commit.Record)
686 record := tangled.Knot{}
687 err = json.Unmarshal(raw, &record)
688 if err != nil {
689 l.Error("invalid record", "err", err)
690 return err
691 }
692
693 domain := e.Commit.RKey
694
695 ddb, ok := i.Db.Execer.(*db.DB)
696 if !ok {
697 return fmt.Errorf("failed to index profile record, invalid db cast")
698 }
699
700 err := db.AddKnot(ddb, domain, did)
701 if err != nil {
702 l.Error("failed to add knot to db", "err", err, "domain", domain)
703 return err
704 }
705
706 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
707 if err != nil {
708 l.Error("failed to verify knot", "err", err, "domain", domain)
709 return err
710 }
711
712 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
713 if err != nil {
714 return fmt.Errorf("failed to mark verified: %w", err)
715 }
716
717 return nil
718
719 case models.CommitOperationDelete:
720 domain := e.Commit.RKey
721
722 ddb, ok := i.Db.Execer.(*db.DB)
723 if !ok {
724 return fmt.Errorf("failed to index knot record, invalid db cast")
725 }
726
727 // get record from db first
728 registrations, err := db.GetRegistrations(
729 ddb,
730 db.FilterEq("domain", domain),
731 db.FilterEq("did", did),
732 )
733 if err != nil {
734 return fmt.Errorf("failed to get registration: %w", err)
735 }
736 if len(registrations) != 1 {
737 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
738 }
739 registration := registrations[0]
740
741 tx, err := ddb.Begin()
742 if err != nil {
743 return err
744 }
745 defer func() {
746 tx.Rollback()
747 i.Enforcer.E.LoadPolicy()
748 }()
749
750 err = db.DeleteKnot(
751 tx,
752 db.FilterEq("did", did),
753 db.FilterEq("domain", domain),
754 )
755 if err != nil {
756 return err
757 }
758
759 if registration.Registered != nil {
760 err = i.Enforcer.RemoveKnot(domain)
761 if err != nil {
762 return err
763 }
764 }
765
766 err = tx.Commit()
767 if err != nil {
768 return err
769 }
770
771 err = i.Enforcer.E.SavePolicy()
772 if err != nil {
773 return err
774 }
775 }
776
777 return nil
778}
779func (i *Ingester) ingestIssue(ctx context.Context, e *models.Event) error {
780 did := e.Did
781 rkey := e.Commit.RKey
782
783 var err error
784
785 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
786 l.Info("ingesting record")
787
788 ddb, ok := i.Db.Execer.(*db.DB)
789 if !ok {
790 return fmt.Errorf("failed to index issue record, invalid db cast")
791 }
792
793 switch e.Commit.Operation {
794 case models.CommitOperationCreate, models.CommitOperationUpdate:
795 raw := json.RawMessage(e.Commit.Record)
796 record := tangled.RepoIssue{}
797 err = json.Unmarshal(raw, &record)
798 if err != nil {
799 l.Error("invalid record", "err", err)
800 return err
801 }
802
803 issue := db.IssueFromRecord(did, rkey, record)
804
805 if err := i.Validator.ValidateIssue(&issue); err != nil {
806 return fmt.Errorf("failed to validate issue: %w", err)
807 }
808
809 tx, err := ddb.BeginTx(ctx, nil)
810 if err != nil {
811 l.Error("failed to begin transaction", "err", err)
812 return err
813 }
814 defer tx.Rollback()
815
816 err = db.PutIssue(tx, &issue)
817 if err != nil {
818 l.Error("failed to create issue", "err", err)
819 return err
820 }
821
822 err = tx.Commit()
823 if err != nil {
824 l.Error("failed to commit txn", "err", err)
825 return err
826 }
827
828 return nil
829
830 case models.CommitOperationDelete:
831 if err := db.DeleteIssues(
832 ddb,
833 db.FilterEq("did", did),
834 db.FilterEq("rkey", rkey),
835 ); err != nil {
836 l.Error("failed to delete", "err", err)
837 return fmt.Errorf("failed to delete issue record: %w", err)
838 }
839
840 return nil
841 }
842
843 return nil
844}
845
846func (i *Ingester) ingestIssueComment(e *models.Event) error {
847 did := e.Did
848 rkey := e.Commit.RKey
849
850 var err error
851
852 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
853 l.Info("ingesting record")
854
855 ddb, ok := i.Db.Execer.(*db.DB)
856 if !ok {
857 return fmt.Errorf("failed to index issue comment record, invalid db cast")
858 }
859
860 switch e.Commit.Operation {
861 case models.CommitOperationCreate, models.CommitOperationUpdate:
862 raw := json.RawMessage(e.Commit.Record)
863 record := tangled.RepoIssueComment{}
864 err = json.Unmarshal(raw, &record)
865 if err != nil {
866 return fmt.Errorf("invalid record: %w", err)
867 }
868
869 comment, err := db.IssueCommentFromRecord(ddb, did, rkey, record)
870 if err != nil {
871 return fmt.Errorf("failed to parse comment from record: %w", err)
872 }
873
874 if err := i.Validator.ValidateIssueComment(comment); err != nil {
875 return fmt.Errorf("failed to validate comment: %w", err)
876 }
877
878 _, err = db.AddIssueComment(ddb, *comment)
879 if err != nil {
880 return fmt.Errorf("failed to create issue comment: %w", err)
881 }
882
883 return nil
884
885 case models.CommitOperationDelete:
886 if err := db.DeleteIssueComments(
887 ddb,
888 db.FilterEq("did", did),
889 db.FilterEq("rkey", rkey),
890 ); err != nil {
891 return fmt.Errorf("failed to delete issue comment record: %w", err)
892 }
893
894 return nil
895 }
896
897 return nil
898}