1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "github.com/bluesky-social/jetstream/pkg/models"
13 "github.com/go-git/go-git/v5/plumbing"
14 "github.com/ipfs/go-cid"
15 "tangled.sh/tangled.sh/core/api/tangled"
16 "tangled.sh/tangled.sh/core/appview/config"
17 "tangled.sh/tangled.sh/core/appview/db"
18 "tangled.sh/tangled.sh/core/appview/pages/markup"
19 "tangled.sh/tangled.sh/core/appview/serververify"
20 "tangled.sh/tangled.sh/core/idresolver"
21 "tangled.sh/tangled.sh/core/rbac"
22)
23
24type Ingester struct {
25 Db db.DbWrapper
26 Enforcer *rbac.Enforcer
27 IdResolver *idresolver.Resolver
28 Config *config.Config
29 Logger *slog.Logger
30}
31
32type processFunc func(ctx context.Context, e *models.Event) error
33
34func (i *Ingester) Ingest() processFunc {
35 return func(ctx context.Context, e *models.Event) error {
36 var err error
37 defer func() {
38 eventTime := e.TimeUS
39 lastTimeUs := eventTime + 1
40 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
41 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
42 }
43 }()
44
45 l := i.Logger.With("kind", e.Kind)
46 switch e.Kind {
47 case models.EventKindAccount:
48 if !e.Account.Active && *e.Account.Status == "deactivated" {
49 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
50 }
51 case models.EventKindIdentity:
52 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
53 case models.EventKindCommit:
54 switch e.Commit.Collection {
55 case tangled.GraphFollowNSID:
56 err = i.ingestFollow(e)
57 case tangled.FeedStarNSID:
58 err = i.ingestStar(e)
59 case tangled.PublicKeyNSID:
60 err = i.ingestPublicKey(e)
61 case tangled.RepoArtifactNSID:
62 err = i.ingestArtifact(e)
63 case tangled.ActorProfileNSID:
64 err = i.ingestProfile(e)
65 case tangled.SpindleMemberNSID:
66 err = i.ingestSpindleMember(ctx, e)
67 case tangled.SpindleNSID:
68 err = i.ingestSpindle(ctx, e)
69 case tangled.KnotMemberNSID:
70 err = i.ingestKnotMember(e)
71 case tangled.KnotNSID:
72 err = i.ingestKnot(e)
73 case tangled.StringNSID:
74 err = i.ingestString(e)
75 case tangled.RepoIssueNSID:
76 err = i.ingestIssue(ctx, e)
77 case tangled.RepoIssueCommentNSID:
78 err = i.ingestIssueComment(e)
79 }
80 l = i.Logger.With("nsid", e.Commit.Collection)
81 }
82
83 if err != nil {
84 l.Debug("error ingesting record", "err", err)
85 }
86
87 return nil
88 }
89}
90
91func (i *Ingester) ingestStar(e *models.Event) error {
92 var err error
93 did := e.Did
94
95 l := i.Logger.With("handler", "ingestStar")
96 l = l.With("nsid", e.Commit.Collection)
97
98 switch e.Commit.Operation {
99 case models.CommitOperationCreate, models.CommitOperationUpdate:
100 var subjectUri syntax.ATURI
101
102 raw := json.RawMessage(e.Commit.Record)
103 record := tangled.FeedStar{}
104 err := json.Unmarshal(raw, &record)
105 if err != nil {
106 l.Error("invalid record", "err", err)
107 return err
108 }
109
110 subjectUri, err = syntax.ParseATURI(record.Subject)
111 if err != nil {
112 l.Error("invalid record", "err", err)
113 return err
114 }
115 err = db.AddStar(i.Db, &db.Star{
116 StarredByDid: did,
117 RepoAt: subjectUri,
118 Rkey: e.Commit.RKey,
119 })
120 case models.CommitOperationDelete:
121 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
122 }
123
124 if err != nil {
125 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
126 }
127
128 return nil
129}
130
131func (i *Ingester) ingestFollow(e *models.Event) error {
132 var err error
133 did := e.Did
134
135 l := i.Logger.With("handler", "ingestFollow")
136 l = l.With("nsid", e.Commit.Collection)
137
138 switch e.Commit.Operation {
139 case models.CommitOperationCreate, models.CommitOperationUpdate:
140 raw := json.RawMessage(e.Commit.Record)
141 record := tangled.GraphFollow{}
142 err = json.Unmarshal(raw, &record)
143 if err != nil {
144 l.Error("invalid record", "err", err)
145 return err
146 }
147
148 err = db.AddFollow(i.Db, &db.Follow{
149 UserDid: did,
150 SubjectDid: record.Subject,
151 Rkey: e.Commit.RKey,
152 })
153 case models.CommitOperationDelete:
154 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
155 }
156
157 if err != nil {
158 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
159 }
160
161 return nil
162}
163
164func (i *Ingester) ingestPublicKey(e *models.Event) error {
165 did := e.Did
166 var err error
167
168 l := i.Logger.With("handler", "ingestPublicKey")
169 l = l.With("nsid", e.Commit.Collection)
170
171 switch e.Commit.Operation {
172 case models.CommitOperationCreate, models.CommitOperationUpdate:
173 l.Debug("processing add of pubkey")
174 raw := json.RawMessage(e.Commit.Record)
175 record := tangled.PublicKey{}
176 err = json.Unmarshal(raw, &record)
177 if err != nil {
178 l.Error("invalid record", "err", err)
179 return err
180 }
181
182 name := record.Name
183 key := record.Key
184 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
185 case models.CommitOperationDelete:
186 l.Debug("processing delete of pubkey")
187 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
188 }
189
190 if err != nil {
191 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
192 }
193
194 return nil
195}
196
197func (i *Ingester) ingestArtifact(e *models.Event) error {
198 did := e.Did
199 var err error
200
201 l := i.Logger.With("handler", "ingestArtifact")
202 l = l.With("nsid", e.Commit.Collection)
203
204 switch e.Commit.Operation {
205 case models.CommitOperationCreate, models.CommitOperationUpdate:
206 raw := json.RawMessage(e.Commit.Record)
207 record := tangled.RepoArtifact{}
208 err = json.Unmarshal(raw, &record)
209 if err != nil {
210 l.Error("invalid record", "err", err)
211 return err
212 }
213
214 repoAt, err := syntax.ParseATURI(record.Repo)
215 if err != nil {
216 return err
217 }
218
219 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
220 if err != nil {
221 return err
222 }
223
224 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
225 if err != nil || !ok {
226 return err
227 }
228
229 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
230 if err != nil {
231 createdAt = time.Now()
232 }
233
234 artifact := db.Artifact{
235 Did: did,
236 Rkey: e.Commit.RKey,
237 RepoAt: repoAt,
238 Tag: plumbing.Hash(record.Tag),
239 CreatedAt: createdAt,
240 BlobCid: cid.Cid(record.Artifact.Ref),
241 Name: record.Name,
242 Size: uint64(record.Artifact.Size),
243 MimeType: record.Artifact.MimeType,
244 }
245
246 err = db.AddArtifact(i.Db, artifact)
247 case models.CommitOperationDelete:
248 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
249 }
250
251 if err != nil {
252 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
253 }
254
255 return nil
256}
257
258func (i *Ingester) ingestProfile(e *models.Event) error {
259 did := e.Did
260 var err error
261
262 l := i.Logger.With("handler", "ingestProfile")
263 l = l.With("nsid", e.Commit.Collection)
264
265 if e.Commit.RKey != "self" {
266 return fmt.Errorf("ingestProfile only ingests `self` record")
267 }
268
269 switch e.Commit.Operation {
270 case models.CommitOperationCreate, models.CommitOperationUpdate:
271 raw := json.RawMessage(e.Commit.Record)
272 record := tangled.ActorProfile{}
273 err = json.Unmarshal(raw, &record)
274 if err != nil {
275 l.Error("invalid record", "err", err)
276 return err
277 }
278
279 description := ""
280 if record.Description != nil {
281 description = *record.Description
282 }
283
284 includeBluesky := record.Bluesky
285
286 location := ""
287 if record.Location != nil {
288 location = *record.Location
289 }
290
291 var links [5]string
292 for i, l := range record.Links {
293 if i < 5 {
294 links[i] = l
295 }
296 }
297
298 var stats [2]db.VanityStat
299 for i, s := range record.Stats {
300 if i < 2 {
301 stats[i].Kind = db.VanityStatKind(s)
302 }
303 }
304
305 var pinned [6]syntax.ATURI
306 for i, r := range record.PinnedRepositories {
307 if i < 6 {
308 pinned[i] = syntax.ATURI(r)
309 }
310 }
311
312 profile := db.Profile{
313 Did: did,
314 Description: description,
315 IncludeBluesky: includeBluesky,
316 Location: location,
317 Links: links,
318 Stats: stats,
319 PinnedRepos: pinned,
320 }
321
322 ddb, ok := i.Db.Execer.(*db.DB)
323 if !ok {
324 return fmt.Errorf("failed to index profile record, invalid db cast")
325 }
326
327 tx, err := ddb.Begin()
328 if err != nil {
329 return fmt.Errorf("failed to start transaction")
330 }
331
332 err = db.ValidateProfile(tx, &profile)
333 if err != nil {
334 return fmt.Errorf("invalid profile record")
335 }
336
337 err = db.UpsertProfile(tx, &profile)
338 case models.CommitOperationDelete:
339 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
340 }
341
342 if err != nil {
343 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
344 }
345
346 return nil
347}
348
349func (i *Ingester) ingestSpindleMember(ctx context.Context, e *models.Event) error {
350 did := e.Did
351 var err error
352
353 l := i.Logger.With("handler", "ingestSpindleMember")
354 l = l.With("nsid", e.Commit.Collection)
355
356 switch e.Commit.Operation {
357 case models.CommitOperationCreate:
358 raw := json.RawMessage(e.Commit.Record)
359 record := tangled.SpindleMember{}
360 err = json.Unmarshal(raw, &record)
361 if err != nil {
362 l.Error("invalid record", "err", err)
363 return err
364 }
365
366 // only spindle owner can invite to spindles
367 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
368 if err != nil || !ok {
369 return fmt.Errorf("failed to enforce permissions: %w", err)
370 }
371
372 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
373 if err != nil {
374 return err
375 }
376
377 if memberId.Handle.IsInvalidHandle() {
378 return err
379 }
380
381 ddb, ok := i.Db.Execer.(*db.DB)
382 if !ok {
383 return fmt.Errorf("failed to index profile record, invalid db cast")
384 }
385
386 err = db.AddSpindleMember(ddb, db.SpindleMember{
387 Did: syntax.DID(did),
388 Rkey: e.Commit.RKey,
389 Instance: record.Instance,
390 Subject: memberId.DID,
391 })
392 if !ok {
393 return fmt.Errorf("failed to add to db: %w", err)
394 }
395
396 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
397 if err != nil {
398 return fmt.Errorf("failed to update ACLs: %w", err)
399 }
400
401 l.Info("added spindle member")
402 case models.CommitOperationDelete:
403 rkey := e.Commit.RKey
404
405 ddb, ok := i.Db.Execer.(*db.DB)
406 if !ok {
407 return fmt.Errorf("failed to index profile record, invalid db cast")
408 }
409
410 // get record from db first
411 members, err := db.GetSpindleMembers(
412 ddb,
413 db.FilterEq("did", did),
414 db.FilterEq("rkey", rkey),
415 )
416 if err != nil || len(members) != 1 {
417 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
418 }
419 member := members[0]
420
421 tx, err := ddb.Begin()
422 if err != nil {
423 return fmt.Errorf("failed to start txn: %w", err)
424 }
425
426 // remove record by rkey && update enforcer
427 if err = db.RemoveSpindleMember(
428 tx,
429 db.FilterEq("did", did),
430 db.FilterEq("rkey", rkey),
431 ); err != nil {
432 return fmt.Errorf("failed to remove from db: %w", err)
433 }
434
435 // update enforcer
436 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
437 if err != nil {
438 return fmt.Errorf("failed to update ACLs: %w", err)
439 }
440
441 if err = tx.Commit(); err != nil {
442 return fmt.Errorf("failed to commit txn: %w", err)
443 }
444
445 if err = i.Enforcer.E.SavePolicy(); err != nil {
446 return fmt.Errorf("failed to save ACLs: %w", err)
447 }
448
449 l.Info("removed spindle member")
450 }
451
452 return nil
453}
454
455func (i *Ingester) ingestSpindle(ctx context.Context, e *models.Event) error {
456 did := e.Did
457 var err error
458
459 l := i.Logger.With("handler", "ingestSpindle")
460 l = l.With("nsid", e.Commit.Collection)
461
462 switch e.Commit.Operation {
463 case models.CommitOperationCreate:
464 raw := json.RawMessage(e.Commit.Record)
465 record := tangled.Spindle{}
466 err = json.Unmarshal(raw, &record)
467 if err != nil {
468 l.Error("invalid record", "err", err)
469 return err
470 }
471
472 instance := e.Commit.RKey
473
474 ddb, ok := i.Db.Execer.(*db.DB)
475 if !ok {
476 return fmt.Errorf("failed to index profile record, invalid db cast")
477 }
478
479 err := db.AddSpindle(ddb, db.Spindle{
480 Owner: syntax.DID(did),
481 Instance: instance,
482 })
483 if err != nil {
484 l.Error("failed to add spindle to db", "err", err, "instance", instance)
485 return err
486 }
487
488 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
489 if err != nil {
490 l.Error("failed to add spindle to db", "err", err, "instance", instance)
491 return err
492 }
493
494 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
495 if err != nil {
496 return fmt.Errorf("failed to mark verified: %w", err)
497 }
498
499 return nil
500
501 case models.CommitOperationDelete:
502 instance := e.Commit.RKey
503
504 ddb, ok := i.Db.Execer.(*db.DB)
505 if !ok {
506 return fmt.Errorf("failed to index profile record, invalid db cast")
507 }
508
509 // get record from db first
510 spindles, err := db.GetSpindles(
511 ddb,
512 db.FilterEq("owner", did),
513 db.FilterEq("instance", instance),
514 )
515 if err != nil || len(spindles) != 1 {
516 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
517 }
518 spindle := spindles[0]
519
520 tx, err := ddb.Begin()
521 if err != nil {
522 return err
523 }
524 defer func() {
525 tx.Rollback()
526 i.Enforcer.E.LoadPolicy()
527 }()
528
529 // remove spindle members first
530 err = db.RemoveSpindleMember(
531 tx,
532 db.FilterEq("owner", did),
533 db.FilterEq("instance", instance),
534 )
535 if err != nil {
536 return err
537 }
538
539 err = db.DeleteSpindle(
540 tx,
541 db.FilterEq("owner", did),
542 db.FilterEq("instance", instance),
543 )
544 if err != nil {
545 return err
546 }
547
548 if spindle.Verified != nil {
549 err = i.Enforcer.RemoveSpindle(instance)
550 if err != nil {
551 return err
552 }
553 }
554
555 err = tx.Commit()
556 if err != nil {
557 return err
558 }
559
560 err = i.Enforcer.E.SavePolicy()
561 if err != nil {
562 return err
563 }
564 }
565
566 return nil
567}
568
569func (i *Ingester) ingestString(e *models.Event) error {
570 did := e.Did
571 rkey := e.Commit.RKey
572
573 var err error
574
575 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
576 l.Info("ingesting record")
577
578 ddb, ok := i.Db.Execer.(*db.DB)
579 if !ok {
580 return fmt.Errorf("failed to index string record, invalid db cast")
581 }
582
583 switch e.Commit.Operation {
584 case models.CommitOperationCreate, models.CommitOperationUpdate:
585 raw := json.RawMessage(e.Commit.Record)
586 record := tangled.String{}
587 err = json.Unmarshal(raw, &record)
588 if err != nil {
589 l.Error("invalid record", "err", err)
590 return err
591 }
592
593 string := db.StringFromRecord(did, rkey, record)
594
595 if err = string.Validate(); err != nil {
596 l.Error("invalid record", "err", err)
597 return err
598 }
599
600 if err = db.AddString(ddb, string); err != nil {
601 l.Error("failed to add string", "err", err)
602 return err
603 }
604
605 return nil
606
607 case models.CommitOperationDelete:
608 if err := db.DeleteString(
609 ddb,
610 db.FilterEq("did", did),
611 db.FilterEq("rkey", rkey),
612 ); err != nil {
613 l.Error("failed to delete", "err", err)
614 return fmt.Errorf("failed to delete string record: %w", err)
615 }
616
617 return nil
618 }
619
620 return nil
621}
622
623func (i *Ingester) ingestKnotMember(e *models.Event) error {
624 did := e.Did
625 var err error
626
627 l := i.Logger.With("handler", "ingestKnotMember")
628 l = l.With("nsid", e.Commit.Collection)
629
630 switch e.Commit.Operation {
631 case models.CommitOperationCreate:
632 raw := json.RawMessage(e.Commit.Record)
633 record := tangled.KnotMember{}
634 err = json.Unmarshal(raw, &record)
635 if err != nil {
636 l.Error("invalid record", "err", err)
637 return err
638 }
639
640 // only knot owner can invite to knots
641 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
642 if err != nil || !ok {
643 return fmt.Errorf("failed to enforce permissions: %w", err)
644 }
645
646 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
647 if err != nil {
648 return err
649 }
650
651 if memberId.Handle.IsInvalidHandle() {
652 return err
653 }
654
655 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
656 if err != nil {
657 return fmt.Errorf("failed to update ACLs: %w", err)
658 }
659
660 l.Info("added knot member")
661 case models.CommitOperationDelete:
662 // we don't store knot members in a table (like we do for spindle)
663 // and we can't remove this just yet. possibly fixed if we switch
664 // to either:
665 // 1. a knot_members table like with spindle and store the rkey
666 // 2. use the knot host as the rkey
667 //
668 // TODO: implement member deletion
669 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
670 }
671
672 return nil
673}
674
675func (i *Ingester) ingestKnot(e *models.Event) error {
676 did := e.Did
677 var err error
678
679 l := i.Logger.With("handler", "ingestKnot")
680 l = l.With("nsid", e.Commit.Collection)
681
682 switch e.Commit.Operation {
683 case models.CommitOperationCreate:
684 raw := json.RawMessage(e.Commit.Record)
685 record := tangled.Knot{}
686 err = json.Unmarshal(raw, &record)
687 if err != nil {
688 l.Error("invalid record", "err", err)
689 return err
690 }
691
692 domain := e.Commit.RKey
693
694 ddb, ok := i.Db.Execer.(*db.DB)
695 if !ok {
696 return fmt.Errorf("failed to index profile record, invalid db cast")
697 }
698
699 err := db.AddKnot(ddb, domain, did)
700 if err != nil {
701 l.Error("failed to add knot to db", "err", err, "domain", domain)
702 return err
703 }
704
705 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
706 if err != nil {
707 l.Error("failed to verify knot", "err", err, "domain", domain)
708 return err
709 }
710
711 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
712 if err != nil {
713 return fmt.Errorf("failed to mark verified: %w", err)
714 }
715
716 return nil
717
718 case models.CommitOperationDelete:
719 domain := e.Commit.RKey
720
721 ddb, ok := i.Db.Execer.(*db.DB)
722 if !ok {
723 return fmt.Errorf("failed to index knot record, invalid db cast")
724 }
725
726 // get record from db first
727 registrations, err := db.GetRegistrations(
728 ddb,
729 db.FilterEq("domain", domain),
730 db.FilterEq("did", did),
731 )
732 if err != nil {
733 return fmt.Errorf("failed to get registration: %w", err)
734 }
735 if len(registrations) != 1 {
736 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
737 }
738 registration := registrations[0]
739
740 tx, err := ddb.Begin()
741 if err != nil {
742 return err
743 }
744 defer func() {
745 tx.Rollback()
746 i.Enforcer.E.LoadPolicy()
747 }()
748
749 err = db.DeleteKnot(
750 tx,
751 db.FilterEq("did", did),
752 db.FilterEq("domain", domain),
753 )
754 if err != nil {
755 return err
756 }
757
758 if registration.Registered != nil {
759 err = i.Enforcer.RemoveKnot(domain)
760 if err != nil {
761 return err
762 }
763 }
764
765 err = tx.Commit()
766 if err != nil {
767 return err
768 }
769
770 err = i.Enforcer.E.SavePolicy()
771 if err != nil {
772 return err
773 }
774 }
775
776 return nil
777}
778func (i *Ingester) ingestIssue(ctx context.Context, e *models.Event) error {
779 did := e.Did
780 rkey := e.Commit.RKey
781
782 var err error
783
784 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
785 l.Info("ingesting record")
786
787 ddb, ok := i.Db.Execer.(*db.DB)
788 if !ok {
789 return fmt.Errorf("failed to index issue record, invalid db cast")
790 }
791
792 switch e.Commit.Operation {
793 case models.CommitOperationCreate:
794 raw := json.RawMessage(e.Commit.Record)
795 record := tangled.RepoIssue{}
796 err = json.Unmarshal(raw, &record)
797 if err != nil {
798 l.Error("invalid record", "err", err)
799 return err
800 }
801
802 issue := db.IssueFromRecord(did, rkey, record)
803
804 sanitizer := markup.NewSanitizer()
805 if st := strings.TrimSpace(sanitizer.SanitizeDescription(issue.Title)); st == "" {
806 return fmt.Errorf("title is empty after HTML sanitization")
807 }
808 if sb := strings.TrimSpace(sanitizer.SanitizeDefault(issue.Body)); sb == "" {
809 return fmt.Errorf("body is empty after HTML sanitization")
810 }
811
812 tx, err := ddb.BeginTx(ctx, nil)
813 if err != nil {
814 l.Error("failed to begin transaction", "err", err)
815 return err
816 }
817
818 err = db.NewIssue(tx, &issue)
819 if err != nil {
820 l.Error("failed to create issue", "err", err)
821 return err
822 }
823
824 return nil
825
826 case models.CommitOperationUpdate:
827 raw := json.RawMessage(e.Commit.Record)
828 record := tangled.RepoIssue{}
829 err = json.Unmarshal(raw, &record)
830 if err != nil {
831 l.Error("invalid record", "err", err)
832 return err
833 }
834
835 body := ""
836 if record.Body != nil {
837 body = *record.Body
838 }
839
840 sanitizer := markup.NewSanitizer()
841 if st := strings.TrimSpace(sanitizer.SanitizeDescription(record.Title)); st == "" {
842 return fmt.Errorf("title is empty after HTML sanitization")
843 }
844 if sb := strings.TrimSpace(sanitizer.SanitizeDefault(body)); sb == "" {
845 return fmt.Errorf("body is empty after HTML sanitization")
846 }
847
848 err = db.UpdateIssueByRkey(ddb, did, rkey, record.Title, body)
849 if err != nil {
850 l.Error("failed to update issue", "err", err)
851 return err
852 }
853
854 return nil
855
856 case models.CommitOperationDelete:
857 if err := db.DeleteIssueByRkey(ddb, did, rkey); err != nil {
858 l.Error("failed to delete", "err", err)
859 return fmt.Errorf("failed to delete issue record: %w", err)
860 }
861
862 return nil
863 }
864
865 return fmt.Errorf("unknown operation: %s", e.Commit.Operation)
866}
867
868func (i *Ingester) ingestIssueComment(e *models.Event) error {
869 did := e.Did
870 rkey := e.Commit.RKey
871
872 var err error
873
874 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
875 l.Info("ingesting record")
876
877 ddb, ok := i.Db.Execer.(*db.DB)
878 if !ok {
879 return fmt.Errorf("failed to index issue comment record, invalid db cast")
880 }
881
882 switch e.Commit.Operation {
883 case models.CommitOperationCreate:
884 raw := json.RawMessage(e.Commit.Record)
885 record := tangled.RepoIssueComment{}
886 err = json.Unmarshal(raw, &record)
887 if err != nil {
888 l.Error("invalid record", "err", err)
889 return err
890 }
891
892 comment, err := db.IssueCommentFromRecord(ddb, did, rkey, record)
893 if err != nil {
894 l.Error("failed to parse comment from record", "err", err)
895 return err
896 }
897
898 sanitizer := markup.NewSanitizer()
899 if sb := strings.TrimSpace(sanitizer.SanitizeDefault(comment.Body)); sb == "" {
900 return fmt.Errorf("body is empty after HTML sanitization")
901 }
902
903 err = db.NewIssueComment(ddb, &comment)
904 if err != nil {
905 l.Error("failed to create issue comment", "err", err)
906 return err
907 }
908
909 return nil
910
911 case models.CommitOperationUpdate:
912 raw := json.RawMessage(e.Commit.Record)
913 record := tangled.RepoIssueComment{}
914 err = json.Unmarshal(raw, &record)
915 if err != nil {
916 l.Error("invalid record", "err", err)
917 return err
918 }
919
920 sanitizer := markup.NewSanitizer()
921 if sb := strings.TrimSpace(sanitizer.SanitizeDefault(record.Body)); sb == "" {
922 return fmt.Errorf("body is empty after HTML sanitization")
923 }
924
925 err = db.UpdateCommentByRkey(ddb, did, rkey, record.Body)
926 if err != nil {
927 l.Error("failed to update issue comment", "err", err)
928 return err
929 }
930
931 return nil
932
933 case models.CommitOperationDelete:
934 if err := db.DeleteCommentByRkey(ddb, did, rkey); err != nil {
935 l.Error("failed to delete", "err", err)
936 return fmt.Errorf("failed to delete issue comment record: %w", err)
937 }
938
939 return nil
940 }
941
942 return fmt.Errorf("unknown operation: %s", e.Commit.Operation)
943}