forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "github.com/bluesky-social/jetstream/pkg/models"
13 "github.com/go-git/go-git/v5/plumbing"
14 "github.com/ipfs/go-cid"
15 "tangled.sh/tangled.sh/core/api/tangled"
16 "tangled.sh/tangled.sh/core/appview/config"
17 "tangled.sh/tangled.sh/core/appview/db"
18 "tangled.sh/tangled.sh/core/appview/serververify"
19 "tangled.sh/tangled.sh/core/appview/validator"
20 "tangled.sh/tangled.sh/core/idresolver"
21 "tangled.sh/tangled.sh/core/rbac"
22)
23
24type Ingester struct {
25 Db db.DbWrapper
26 Enforcer *rbac.Enforcer
27 IdResolver *idresolver.Resolver
28 Config *config.Config
29 Logger *slog.Logger
30 Validator *validator.Validator
31}
32
33type processFunc func(ctx context.Context, e *models.Event) error
34
35func (i *Ingester) Ingest() processFunc {
36 return func(ctx context.Context, e *models.Event) error {
37 var err error
38 defer func() {
39 eventTime := e.TimeUS
40 lastTimeUs := eventTime + 1
41 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
42 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
43 }
44 }()
45
46 l := i.Logger.With("kind", e.Kind)
47 switch e.Kind {
48 case models.EventKindAccount:
49 if !e.Account.Active && *e.Account.Status == "deactivated" {
50 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
51 }
52 case models.EventKindIdentity:
53 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
54 case models.EventKindCommit:
55 switch e.Commit.Collection {
56 case tangled.GraphFollowNSID:
57 err = i.ingestFollow(e)
58 case tangled.FeedStarNSID:
59 err = i.ingestStar(e)
60 case tangled.PublicKeyNSID:
61 err = i.ingestPublicKey(e)
62 case tangled.RepoArtifactNSID:
63 err = i.ingestArtifact(e)
64 case tangled.ActorProfileNSID:
65 err = i.ingestProfile(e)
66 case tangled.SpindleMemberNSID:
67 err = i.ingestSpindleMember(ctx, e)
68 case tangled.SpindleNSID:
69 err = i.ingestSpindle(ctx, e)
70 case tangled.KnotMemberNSID:
71 err = i.ingestKnotMember(e)
72 case tangled.KnotNSID:
73 err = i.ingestKnot(e)
74 case tangled.StringNSID:
75 err = i.ingestString(e)
76 case tangled.RepoIssueNSID:
77 err = i.ingestIssue(ctx, e)
78 case tangled.RepoIssueCommentNSID:
79 err = i.ingestIssueComment(e)
80 case tangled.LabelDefinitionNSID:
81 err = i.ingestLabelDefinition(e)
82 }
83 l = i.Logger.With("nsid", e.Commit.Collection)
84 }
85
86 if err != nil {
87 l.Debug("error ingesting record", "err", err)
88 }
89
90 return nil
91 }
92}
93
94func (i *Ingester) ingestStar(e *models.Event) error {
95 var err error
96 did := e.Did
97
98 l := i.Logger.With("handler", "ingestStar")
99 l = l.With("nsid", e.Commit.Collection)
100
101 switch e.Commit.Operation {
102 case models.CommitOperationCreate, models.CommitOperationUpdate:
103 var subjectUri syntax.ATURI
104
105 raw := json.RawMessage(e.Commit.Record)
106 record := tangled.FeedStar{}
107 err := json.Unmarshal(raw, &record)
108 if err != nil {
109 l.Error("invalid record", "err", err)
110 return err
111 }
112
113 subjectUri, err = syntax.ParseATURI(record.Subject)
114 if err != nil {
115 l.Error("invalid record", "err", err)
116 return err
117 }
118 err = db.AddStar(i.Db, &db.Star{
119 StarredByDid: did,
120 RepoAt: subjectUri,
121 Rkey: e.Commit.RKey,
122 })
123 case models.CommitOperationDelete:
124 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
125 }
126
127 if err != nil {
128 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
129 }
130
131 return nil
132}
133
134func (i *Ingester) ingestFollow(e *models.Event) error {
135 var err error
136 did := e.Did
137
138 l := i.Logger.With("handler", "ingestFollow")
139 l = l.With("nsid", e.Commit.Collection)
140
141 switch e.Commit.Operation {
142 case models.CommitOperationCreate, models.CommitOperationUpdate:
143 raw := json.RawMessage(e.Commit.Record)
144 record := tangled.GraphFollow{}
145 err = json.Unmarshal(raw, &record)
146 if err != nil {
147 l.Error("invalid record", "err", err)
148 return err
149 }
150
151 err = db.AddFollow(i.Db, &db.Follow{
152 UserDid: did,
153 SubjectDid: record.Subject,
154 Rkey: e.Commit.RKey,
155 })
156 case models.CommitOperationDelete:
157 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
158 }
159
160 if err != nil {
161 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
162 }
163
164 return nil
165}
166
167func (i *Ingester) ingestPublicKey(e *models.Event) error {
168 did := e.Did
169 var err error
170
171 l := i.Logger.With("handler", "ingestPublicKey")
172 l = l.With("nsid", e.Commit.Collection)
173
174 switch e.Commit.Operation {
175 case models.CommitOperationCreate, models.CommitOperationUpdate:
176 l.Debug("processing add of pubkey")
177 raw := json.RawMessage(e.Commit.Record)
178 record := tangled.PublicKey{}
179 err = json.Unmarshal(raw, &record)
180 if err != nil {
181 l.Error("invalid record", "err", err)
182 return err
183 }
184
185 name := record.Name
186 key := record.Key
187 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
188 case models.CommitOperationDelete:
189 l.Debug("processing delete of pubkey")
190 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
191 }
192
193 if err != nil {
194 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
195 }
196
197 return nil
198}
199
200func (i *Ingester) ingestArtifact(e *models.Event) error {
201 did := e.Did
202 var err error
203
204 l := i.Logger.With("handler", "ingestArtifact")
205 l = l.With("nsid", e.Commit.Collection)
206
207 switch e.Commit.Operation {
208 case models.CommitOperationCreate, models.CommitOperationUpdate:
209 raw := json.RawMessage(e.Commit.Record)
210 record := tangled.RepoArtifact{}
211 err = json.Unmarshal(raw, &record)
212 if err != nil {
213 l.Error("invalid record", "err", err)
214 return err
215 }
216
217 repoAt, err := syntax.ParseATURI(record.Repo)
218 if err != nil {
219 return err
220 }
221
222 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
223 if err != nil {
224 return err
225 }
226
227 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
228 if err != nil || !ok {
229 return err
230 }
231
232 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
233 if err != nil {
234 createdAt = time.Now()
235 }
236
237 artifact := db.Artifact{
238 Did: did,
239 Rkey: e.Commit.RKey,
240 RepoAt: repoAt,
241 Tag: plumbing.Hash(record.Tag),
242 CreatedAt: createdAt,
243 BlobCid: cid.Cid(record.Artifact.Ref),
244 Name: record.Name,
245 Size: uint64(record.Artifact.Size),
246 MimeType: record.Artifact.MimeType,
247 }
248
249 err = db.AddArtifact(i.Db, artifact)
250 case models.CommitOperationDelete:
251 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
252 }
253
254 if err != nil {
255 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
256 }
257
258 return nil
259}
260
261func (i *Ingester) ingestProfile(e *models.Event) error {
262 did := e.Did
263 var err error
264
265 l := i.Logger.With("handler", "ingestProfile")
266 l = l.With("nsid", e.Commit.Collection)
267
268 if e.Commit.RKey != "self" {
269 return fmt.Errorf("ingestProfile only ingests `self` record")
270 }
271
272 switch e.Commit.Operation {
273 case models.CommitOperationCreate, models.CommitOperationUpdate:
274 raw := json.RawMessage(e.Commit.Record)
275 record := tangled.ActorProfile{}
276 err = json.Unmarshal(raw, &record)
277 if err != nil {
278 l.Error("invalid record", "err", err)
279 return err
280 }
281
282 description := ""
283 if record.Description != nil {
284 description = *record.Description
285 }
286
287 includeBluesky := record.Bluesky
288
289 location := ""
290 if record.Location != nil {
291 location = *record.Location
292 }
293
294 var links [5]string
295 for i, l := range record.Links {
296 if i < 5 {
297 links[i] = l
298 }
299 }
300
301 var stats [2]db.VanityStat
302 for i, s := range record.Stats {
303 if i < 2 {
304 stats[i].Kind = db.VanityStatKind(s)
305 }
306 }
307
308 var pinned [6]syntax.ATURI
309 for i, r := range record.PinnedRepositories {
310 if i < 6 {
311 pinned[i] = syntax.ATURI(r)
312 }
313 }
314
315 profile := db.Profile{
316 Did: did,
317 Description: description,
318 IncludeBluesky: includeBluesky,
319 Location: location,
320 Links: links,
321 Stats: stats,
322 PinnedRepos: pinned,
323 }
324
325 ddb, ok := i.Db.Execer.(*db.DB)
326 if !ok {
327 return fmt.Errorf("failed to index profile record, invalid db cast")
328 }
329
330 tx, err := ddb.Begin()
331 if err != nil {
332 return fmt.Errorf("failed to start transaction")
333 }
334
335 err = db.ValidateProfile(tx, &profile)
336 if err != nil {
337 return fmt.Errorf("invalid profile record")
338 }
339
340 err = db.UpsertProfile(tx, &profile)
341 case models.CommitOperationDelete:
342 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
343 }
344
345 if err != nil {
346 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
347 }
348
349 return nil
350}
351
352func (i *Ingester) ingestSpindleMember(ctx context.Context, e *models.Event) error {
353 did := e.Did
354 var err error
355
356 l := i.Logger.With("handler", "ingestSpindleMember")
357 l = l.With("nsid", e.Commit.Collection)
358
359 switch e.Commit.Operation {
360 case models.CommitOperationCreate:
361 raw := json.RawMessage(e.Commit.Record)
362 record := tangled.SpindleMember{}
363 err = json.Unmarshal(raw, &record)
364 if err != nil {
365 l.Error("invalid record", "err", err)
366 return err
367 }
368
369 // only spindle owner can invite to spindles
370 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
371 if err != nil || !ok {
372 return fmt.Errorf("failed to enforce permissions: %w", err)
373 }
374
375 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
376 if err != nil {
377 return err
378 }
379
380 if memberId.Handle.IsInvalidHandle() {
381 return err
382 }
383
384 ddb, ok := i.Db.Execer.(*db.DB)
385 if !ok {
386 return fmt.Errorf("failed to index profile record, invalid db cast")
387 }
388
389 err = db.AddSpindleMember(ddb, db.SpindleMember{
390 Did: syntax.DID(did),
391 Rkey: e.Commit.RKey,
392 Instance: record.Instance,
393 Subject: memberId.DID,
394 })
395 if !ok {
396 return fmt.Errorf("failed to add to db: %w", err)
397 }
398
399 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
400 if err != nil {
401 return fmt.Errorf("failed to update ACLs: %w", err)
402 }
403
404 l.Info("added spindle member")
405 case models.CommitOperationDelete:
406 rkey := e.Commit.RKey
407
408 ddb, ok := i.Db.Execer.(*db.DB)
409 if !ok {
410 return fmt.Errorf("failed to index profile record, invalid db cast")
411 }
412
413 // get record from db first
414 members, err := db.GetSpindleMembers(
415 ddb,
416 db.FilterEq("did", did),
417 db.FilterEq("rkey", rkey),
418 )
419 if err != nil || len(members) != 1 {
420 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
421 }
422 member := members[0]
423
424 tx, err := ddb.Begin()
425 if err != nil {
426 return fmt.Errorf("failed to start txn: %w", err)
427 }
428
429 // remove record by rkey && update enforcer
430 if err = db.RemoveSpindleMember(
431 tx,
432 db.FilterEq("did", did),
433 db.FilterEq("rkey", rkey),
434 ); err != nil {
435 return fmt.Errorf("failed to remove from db: %w", err)
436 }
437
438 // update enforcer
439 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
440 if err != nil {
441 return fmt.Errorf("failed to update ACLs: %w", err)
442 }
443
444 if err = tx.Commit(); err != nil {
445 return fmt.Errorf("failed to commit txn: %w", err)
446 }
447
448 if err = i.Enforcer.E.SavePolicy(); err != nil {
449 return fmt.Errorf("failed to save ACLs: %w", err)
450 }
451
452 l.Info("removed spindle member")
453 }
454
455 return nil
456}
457
458func (i *Ingester) ingestSpindle(ctx context.Context, e *models.Event) error {
459 did := e.Did
460 var err error
461
462 l := i.Logger.With("handler", "ingestSpindle")
463 l = l.With("nsid", e.Commit.Collection)
464
465 switch e.Commit.Operation {
466 case models.CommitOperationCreate:
467 raw := json.RawMessage(e.Commit.Record)
468 record := tangled.Spindle{}
469 err = json.Unmarshal(raw, &record)
470 if err != nil {
471 l.Error("invalid record", "err", err)
472 return err
473 }
474
475 instance := e.Commit.RKey
476
477 ddb, ok := i.Db.Execer.(*db.DB)
478 if !ok {
479 return fmt.Errorf("failed to index profile record, invalid db cast")
480 }
481
482 err := db.AddSpindle(ddb, db.Spindle{
483 Owner: syntax.DID(did),
484 Instance: instance,
485 })
486 if err != nil {
487 l.Error("failed to add spindle to db", "err", err, "instance", instance)
488 return err
489 }
490
491 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
492 if err != nil {
493 l.Error("failed to add spindle to db", "err", err, "instance", instance)
494 return err
495 }
496
497 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
498 if err != nil {
499 return fmt.Errorf("failed to mark verified: %w", err)
500 }
501
502 return nil
503
504 case models.CommitOperationDelete:
505 instance := e.Commit.RKey
506
507 ddb, ok := i.Db.Execer.(*db.DB)
508 if !ok {
509 return fmt.Errorf("failed to index profile record, invalid db cast")
510 }
511
512 // get record from db first
513 spindles, err := db.GetSpindles(
514 ddb,
515 db.FilterEq("owner", did),
516 db.FilterEq("instance", instance),
517 )
518 if err != nil || len(spindles) != 1 {
519 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
520 }
521 spindle := spindles[0]
522
523 tx, err := ddb.Begin()
524 if err != nil {
525 return err
526 }
527 defer func() {
528 tx.Rollback()
529 i.Enforcer.E.LoadPolicy()
530 }()
531
532 // remove spindle members first
533 err = db.RemoveSpindleMember(
534 tx,
535 db.FilterEq("owner", did),
536 db.FilterEq("instance", instance),
537 )
538 if err != nil {
539 return err
540 }
541
542 err = db.DeleteSpindle(
543 tx,
544 db.FilterEq("owner", did),
545 db.FilterEq("instance", instance),
546 )
547 if err != nil {
548 return err
549 }
550
551 if spindle.Verified != nil {
552 err = i.Enforcer.RemoveSpindle(instance)
553 if err != nil {
554 return err
555 }
556 }
557
558 err = tx.Commit()
559 if err != nil {
560 return err
561 }
562
563 err = i.Enforcer.E.SavePolicy()
564 if err != nil {
565 return err
566 }
567 }
568
569 return nil
570}
571
572func (i *Ingester) ingestString(e *models.Event) error {
573 did := e.Did
574 rkey := e.Commit.RKey
575
576 var err error
577
578 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
579 l.Info("ingesting record")
580
581 ddb, ok := i.Db.Execer.(*db.DB)
582 if !ok {
583 return fmt.Errorf("failed to index string record, invalid db cast")
584 }
585
586 switch e.Commit.Operation {
587 case models.CommitOperationCreate, models.CommitOperationUpdate:
588 raw := json.RawMessage(e.Commit.Record)
589 record := tangled.String{}
590 err = json.Unmarshal(raw, &record)
591 if err != nil {
592 l.Error("invalid record", "err", err)
593 return err
594 }
595
596 string := db.StringFromRecord(did, rkey, record)
597
598 if err = string.Validate(); err != nil {
599 l.Error("invalid record", "err", err)
600 return err
601 }
602
603 if err = db.AddString(ddb, string); err != nil {
604 l.Error("failed to add string", "err", err)
605 return err
606 }
607
608 return nil
609
610 case models.CommitOperationDelete:
611 if err := db.DeleteString(
612 ddb,
613 db.FilterEq("did", did),
614 db.FilterEq("rkey", rkey),
615 ); err != nil {
616 l.Error("failed to delete", "err", err)
617 return fmt.Errorf("failed to delete string record: %w", err)
618 }
619
620 return nil
621 }
622
623 return nil
624}
625
626func (i *Ingester) ingestKnotMember(e *models.Event) error {
627 did := e.Did
628 var err error
629
630 l := i.Logger.With("handler", "ingestKnotMember")
631 l = l.With("nsid", e.Commit.Collection)
632
633 switch e.Commit.Operation {
634 case models.CommitOperationCreate:
635 raw := json.RawMessage(e.Commit.Record)
636 record := tangled.KnotMember{}
637 err = json.Unmarshal(raw, &record)
638 if err != nil {
639 l.Error("invalid record", "err", err)
640 return err
641 }
642
643 // only knot owner can invite to knots
644 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
645 if err != nil || !ok {
646 return fmt.Errorf("failed to enforce permissions: %w", err)
647 }
648
649 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
650 if err != nil {
651 return err
652 }
653
654 if memberId.Handle.IsInvalidHandle() {
655 return err
656 }
657
658 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
659 if err != nil {
660 return fmt.Errorf("failed to update ACLs: %w", err)
661 }
662
663 l.Info("added knot member")
664 case models.CommitOperationDelete:
665 // we don't store knot members in a table (like we do for spindle)
666 // and we can't remove this just yet. possibly fixed if we switch
667 // to either:
668 // 1. a knot_members table like with spindle and store the rkey
669 // 2. use the knot host as the rkey
670 //
671 // TODO: implement member deletion
672 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
673 }
674
675 return nil
676}
677
678func (i *Ingester) ingestKnot(e *models.Event) error {
679 did := e.Did
680 var err error
681
682 l := i.Logger.With("handler", "ingestKnot")
683 l = l.With("nsid", e.Commit.Collection)
684
685 switch e.Commit.Operation {
686 case models.CommitOperationCreate:
687 raw := json.RawMessage(e.Commit.Record)
688 record := tangled.Knot{}
689 err = json.Unmarshal(raw, &record)
690 if err != nil {
691 l.Error("invalid record", "err", err)
692 return err
693 }
694
695 domain := e.Commit.RKey
696
697 ddb, ok := i.Db.Execer.(*db.DB)
698 if !ok {
699 return fmt.Errorf("failed to index profile record, invalid db cast")
700 }
701
702 err := db.AddKnot(ddb, domain, did)
703 if err != nil {
704 l.Error("failed to add knot to db", "err", err, "domain", domain)
705 return err
706 }
707
708 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
709 if err != nil {
710 l.Error("failed to verify knot", "err", err, "domain", domain)
711 return err
712 }
713
714 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
715 if err != nil {
716 return fmt.Errorf("failed to mark verified: %w", err)
717 }
718
719 return nil
720
721 case models.CommitOperationDelete:
722 domain := e.Commit.RKey
723
724 ddb, ok := i.Db.Execer.(*db.DB)
725 if !ok {
726 return fmt.Errorf("failed to index knot record, invalid db cast")
727 }
728
729 // get record from db first
730 registrations, err := db.GetRegistrations(
731 ddb,
732 db.FilterEq("domain", domain),
733 db.FilterEq("did", did),
734 )
735 if err != nil {
736 return fmt.Errorf("failed to get registration: %w", err)
737 }
738 if len(registrations) != 1 {
739 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
740 }
741 registration := registrations[0]
742
743 tx, err := ddb.Begin()
744 if err != nil {
745 return err
746 }
747 defer func() {
748 tx.Rollback()
749 i.Enforcer.E.LoadPolicy()
750 }()
751
752 err = db.DeleteKnot(
753 tx,
754 db.FilterEq("did", did),
755 db.FilterEq("domain", domain),
756 )
757 if err != nil {
758 return err
759 }
760
761 if registration.Registered != nil {
762 err = i.Enforcer.RemoveKnot(domain)
763 if err != nil {
764 return err
765 }
766 }
767
768 err = tx.Commit()
769 if err != nil {
770 return err
771 }
772
773 err = i.Enforcer.E.SavePolicy()
774 if err != nil {
775 return err
776 }
777 }
778
779 return nil
780}
781func (i *Ingester) ingestIssue(ctx context.Context, e *models.Event) error {
782 did := e.Did
783 rkey := e.Commit.RKey
784
785 var err error
786
787 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
788 l.Info("ingesting record")
789
790 ddb, ok := i.Db.Execer.(*db.DB)
791 if !ok {
792 return fmt.Errorf("failed to index issue record, invalid db cast")
793 }
794
795 switch e.Commit.Operation {
796 case models.CommitOperationCreate, models.CommitOperationUpdate:
797 raw := json.RawMessage(e.Commit.Record)
798 record := tangled.RepoIssue{}
799 err = json.Unmarshal(raw, &record)
800 if err != nil {
801 l.Error("invalid record", "err", err)
802 return err
803 }
804
805 issue := db.IssueFromRecord(did, rkey, record)
806
807 if err := i.Validator.ValidateIssue(&issue); err != nil {
808 return fmt.Errorf("failed to validate issue: %w", err)
809 }
810
811 tx, err := ddb.BeginTx(ctx, nil)
812 if err != nil {
813 l.Error("failed to begin transaction", "err", err)
814 return err
815 }
816 defer tx.Rollback()
817
818 err = db.PutIssue(tx, &issue)
819 if err != nil {
820 l.Error("failed to create issue", "err", err)
821 return err
822 }
823
824 err = tx.Commit()
825 if err != nil {
826 l.Error("failed to commit txn", "err", err)
827 return err
828 }
829
830 return nil
831
832 case models.CommitOperationDelete:
833 if err := db.DeleteIssues(
834 ddb,
835 db.FilterEq("did", did),
836 db.FilterEq("rkey", rkey),
837 ); err != nil {
838 l.Error("failed to delete", "err", err)
839 return fmt.Errorf("failed to delete issue record: %w", err)
840 }
841
842 return nil
843 }
844
845 return nil
846}
847
848func (i *Ingester) ingestIssueComment(e *models.Event) error {
849 did := e.Did
850 rkey := e.Commit.RKey
851
852 var err error
853
854 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
855 l.Info("ingesting record")
856
857 ddb, ok := i.Db.Execer.(*db.DB)
858 if !ok {
859 return fmt.Errorf("failed to index issue comment record, invalid db cast")
860 }
861
862 switch e.Commit.Operation {
863 case models.CommitOperationCreate, models.CommitOperationUpdate:
864 raw := json.RawMessage(e.Commit.Record)
865 record := tangled.RepoIssueComment{}
866 err = json.Unmarshal(raw, &record)
867 if err != nil {
868 return fmt.Errorf("invalid record: %w", err)
869 }
870
871 comment, err := db.IssueCommentFromRecord(did, rkey, record)
872 if err != nil {
873 return fmt.Errorf("failed to parse comment from record: %w", err)
874 }
875
876 if err := i.Validator.ValidateIssueComment(comment); err != nil {
877 return fmt.Errorf("failed to validate comment: %w", err)
878 }
879
880 _, err = db.AddIssueComment(ddb, *comment)
881 if err != nil {
882 return fmt.Errorf("failed to create issue comment: %w", err)
883 }
884
885 return nil
886
887 case models.CommitOperationDelete:
888 if err := db.DeleteIssueComments(
889 ddb,
890 db.FilterEq("did", did),
891 db.FilterEq("rkey", rkey),
892 ); err != nil {
893 return fmt.Errorf("failed to delete issue comment record: %w", err)
894 }
895
896 return nil
897 }
898
899 return nil
900}
901
902func (i *Ingester) ingestLabelDefinition(e *models.Event) error {
903 did := e.Did
904 rkey := e.Commit.RKey
905
906 var err error
907
908 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
909 l.Info("ingesting record")
910
911 ddb, ok := i.Db.Execer.(*db.DB)
912 if !ok {
913 return fmt.Errorf("failed to index label definition, invalid db cast")
914 }
915
916 switch e.Commit.Operation {
917 case models.CommitOperationCreate, models.CommitOperationUpdate:
918 raw := json.RawMessage(e.Commit.Record)
919 record := tangled.LabelDefinition{}
920 err = json.Unmarshal(raw, &record)
921 if err != nil {
922 return fmt.Errorf("invalid record: %w", err)
923 }
924
925 def, err := db.LabelDefinitionFromRecord(did, rkey, record)
926 if err != nil {
927 return fmt.Errorf("failed to parse labeldef from record: %w", err)
928 }
929
930 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
931 return fmt.Errorf("failed to validate labeldef: %w", err)
932 }
933
934 _, err = db.AddLabelDefinition(ddb, def)
935 if err != nil {
936 return fmt.Errorf("failed to create labeldef: %w", err)
937 }
938
939 return nil
940
941 case models.CommitOperationDelete:
942 if err := db.DeleteLabelDefinition(
943 ddb,
944 db.FilterEq("did", did),
945 db.FilterEq("rkey", rkey),
946 ); err != nil {
947 return fmt.Errorf("failed to delete labeldef record: %w", err)
948 }
949
950 return nil
951 }
952
953 return nil
954}