forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 jmodels "github.com/bluesky-social/jetstream/pkg/models"
13 "github.com/go-git/go-git/v5/plumbing"
14 "github.com/ipfs/go-cid"
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/appview/serververify"
20 "tangled.org/core/appview/validator"
21 "tangled.org/core/idresolver"
22 "tangled.org/core/rbac"
23)
24
25type Ingester struct {
26 Db db.DbWrapper
27 Enforcer *rbac.Enforcer
28 IdResolver *idresolver.Resolver
29 Config *config.Config
30 Logger *slog.Logger
31 Validator *validator.Validator
32}
33
34type processFunc func(ctx context.Context, e *jmodels.Event) error
35
36func (i *Ingester) Ingest() processFunc {
37 return func(ctx context.Context, e *jmodels.Event) error {
38 var err error
39 defer func() {
40 eventTime := e.TimeUS
41 lastTimeUs := eventTime + 1
42 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
43 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
44 }
45 }()
46
47 l := i.Logger.With("kind", e.Kind)
48 switch e.Kind {
49 case jmodels.EventKindAccount:
50 if !e.Account.Active && *e.Account.Status == "deactivated" {
51 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
52 }
53 case jmodels.EventKindIdentity:
54 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
55 case jmodels.EventKindCommit:
56 switch e.Commit.Collection {
57 case tangled.GraphFollowNSID:
58 err = i.ingestFollow(e)
59 case tangled.FeedStarNSID:
60 err = i.ingestStar(e)
61 case tangled.PublicKeyNSID:
62 err = i.ingestPublicKey(e)
63 case tangled.RepoArtifactNSID:
64 err = i.ingestArtifact(e)
65 case tangled.ActorProfileNSID:
66 err = i.ingestProfile(e)
67 case tangled.SpindleMemberNSID:
68 err = i.ingestSpindleMember(ctx, e)
69 case tangled.SpindleNSID:
70 err = i.ingestSpindle(ctx, e)
71 case tangled.KnotMemberNSID:
72 err = i.ingestKnotMember(e)
73 case tangled.KnotNSID:
74 err = i.ingestKnot(e)
75 case tangled.StringNSID:
76 err = i.ingestString(e)
77 case tangled.RepoIssueNSID:
78 err = i.ingestIssue(ctx, e)
79 case tangled.RepoIssueCommentNSID:
80 err = i.ingestIssueComment(e)
81 case tangled.LabelDefinitionNSID:
82 err = i.ingestLabelDefinition(e)
83 }
84 l = i.Logger.With("nsid", e.Commit.Collection)
85 }
86
87 if err != nil {
88 l.Debug("error ingesting record", "err", err)
89 }
90
91 return nil
92 }
93}
94
95func (i *Ingester) ingestStar(e *jmodels.Event) error {
96 var err error
97 did := e.Did
98
99 l := i.Logger.With("handler", "ingestStar")
100 l = l.With("nsid", e.Commit.Collection)
101
102 switch e.Commit.Operation {
103 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
104 var subjectUri syntax.ATURI
105
106 raw := json.RawMessage(e.Commit.Record)
107 record := tangled.FeedStar{}
108 err := json.Unmarshal(raw, &record)
109 if err != nil {
110 l.Error("invalid record", "err", err)
111 return err
112 }
113
114 subjectUri, err = syntax.ParseATURI(record.Subject)
115 if err != nil {
116 l.Error("invalid record", "err", err)
117 return err
118 }
119 err = db.AddStar(i.Db, &db.Star{
120 StarredByDid: did,
121 RepoAt: subjectUri,
122 Rkey: e.Commit.RKey,
123 })
124 case jmodels.CommitOperationDelete:
125 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
126 }
127
128 if err != nil {
129 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
130 }
131
132 return nil
133}
134
135func (i *Ingester) ingestFollow(e *jmodels.Event) error {
136 var err error
137 did := e.Did
138
139 l := i.Logger.With("handler", "ingestFollow")
140 l = l.With("nsid", e.Commit.Collection)
141
142 switch e.Commit.Operation {
143 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
144 raw := json.RawMessage(e.Commit.Record)
145 record := tangled.GraphFollow{}
146 err = json.Unmarshal(raw, &record)
147 if err != nil {
148 l.Error("invalid record", "err", err)
149 return err
150 }
151
152 err = db.AddFollow(i.Db, &models.Follow{
153 UserDid: did,
154 SubjectDid: record.Subject,
155 Rkey: e.Commit.RKey,
156 })
157 case jmodels.CommitOperationDelete:
158 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
159 }
160
161 if err != nil {
162 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
163 }
164
165 return nil
166}
167
168func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
169 did := e.Did
170 var err error
171
172 l := i.Logger.With("handler", "ingestPublicKey")
173 l = l.With("nsid", e.Commit.Collection)
174
175 switch e.Commit.Operation {
176 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
177 l.Debug("processing add of pubkey")
178 raw := json.RawMessage(e.Commit.Record)
179 record := tangled.PublicKey{}
180 err = json.Unmarshal(raw, &record)
181 if err != nil {
182 l.Error("invalid record", "err", err)
183 return err
184 }
185
186 name := record.Name
187 key := record.Key
188 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
189 case jmodels.CommitOperationDelete:
190 l.Debug("processing delete of pubkey")
191 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
192 }
193
194 if err != nil {
195 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
196 }
197
198 return nil
199}
200
201func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
202 did := e.Did
203 var err error
204
205 l := i.Logger.With("handler", "ingestArtifact")
206 l = l.With("nsid", e.Commit.Collection)
207
208 switch e.Commit.Operation {
209 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
210 raw := json.RawMessage(e.Commit.Record)
211 record := tangled.RepoArtifact{}
212 err = json.Unmarshal(raw, &record)
213 if err != nil {
214 l.Error("invalid record", "err", err)
215 return err
216 }
217
218 repoAt, err := syntax.ParseATURI(record.Repo)
219 if err != nil {
220 return err
221 }
222
223 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
224 if err != nil {
225 return err
226 }
227
228 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
229 if err != nil || !ok {
230 return err
231 }
232
233 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
234 if err != nil {
235 createdAt = time.Now()
236 }
237
238 artifact := models.Artifact{
239 Did: did,
240 Rkey: e.Commit.RKey,
241 RepoAt: repoAt,
242 Tag: plumbing.Hash(record.Tag),
243 CreatedAt: createdAt,
244 BlobCid: cid.Cid(record.Artifact.Ref),
245 Name: record.Name,
246 Size: uint64(record.Artifact.Size),
247 MimeType: record.Artifact.MimeType,
248 }
249
250 err = db.AddArtifact(i.Db, artifact)
251 case jmodels.CommitOperationDelete:
252 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
253 }
254
255 if err != nil {
256 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
257 }
258
259 return nil
260}
261
262func (i *Ingester) ingestProfile(e *jmodels.Event) error {
263 did := e.Did
264 var err error
265
266 l := i.Logger.With("handler", "ingestProfile")
267 l = l.With("nsid", e.Commit.Collection)
268
269 if e.Commit.RKey != "self" {
270 return fmt.Errorf("ingestProfile only ingests `self` record")
271 }
272
273 switch e.Commit.Operation {
274 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
275 raw := json.RawMessage(e.Commit.Record)
276 record := tangled.ActorProfile{}
277 err = json.Unmarshal(raw, &record)
278 if err != nil {
279 l.Error("invalid record", "err", err)
280 return err
281 }
282
283 description := ""
284 if record.Description != nil {
285 description = *record.Description
286 }
287
288 includeBluesky := record.Bluesky
289
290 location := ""
291 if record.Location != nil {
292 location = *record.Location
293 }
294
295 var links [5]string
296 for i, l := range record.Links {
297 if i < 5 {
298 links[i] = l
299 }
300 }
301
302 var stats [2]models.VanityStat
303 for i, s := range record.Stats {
304 if i < 2 {
305 stats[i].Kind = models.VanityStatKind(s)
306 }
307 }
308
309 var pinned [6]syntax.ATURI
310 for i, r := range record.PinnedRepositories {
311 if i < 6 {
312 pinned[i] = syntax.ATURI(r)
313 }
314 }
315
316 profile := models.Profile{
317 Did: did,
318 Description: description,
319 IncludeBluesky: includeBluesky,
320 Location: location,
321 Links: links,
322 Stats: stats,
323 PinnedRepos: pinned,
324 }
325
326 ddb, ok := i.Db.Execer.(*db.DB)
327 if !ok {
328 return fmt.Errorf("failed to index profile record, invalid db cast")
329 }
330
331 tx, err := ddb.Begin()
332 if err != nil {
333 return fmt.Errorf("failed to start transaction")
334 }
335
336 err = db.ValidateProfile(tx, &profile)
337 if err != nil {
338 return fmt.Errorf("invalid profile record")
339 }
340
341 err = db.UpsertProfile(tx, &profile)
342 case jmodels.CommitOperationDelete:
343 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
344 }
345
346 if err != nil {
347 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
348 }
349
350 return nil
351}
352
353func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
354 did := e.Did
355 var err error
356
357 l := i.Logger.With("handler", "ingestSpindleMember")
358 l = l.With("nsid", e.Commit.Collection)
359
360 switch e.Commit.Operation {
361 case jmodels.CommitOperationCreate:
362 raw := json.RawMessage(e.Commit.Record)
363 record := tangled.SpindleMember{}
364 err = json.Unmarshal(raw, &record)
365 if err != nil {
366 l.Error("invalid record", "err", err)
367 return err
368 }
369
370 // only spindle owner can invite to spindles
371 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
372 if err != nil || !ok {
373 return fmt.Errorf("failed to enforce permissions: %w", err)
374 }
375
376 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
377 if err != nil {
378 return err
379 }
380
381 if memberId.Handle.IsInvalidHandle() {
382 return err
383 }
384
385 ddb, ok := i.Db.Execer.(*db.DB)
386 if !ok {
387 return fmt.Errorf("failed to index profile record, invalid db cast")
388 }
389
390 err = db.AddSpindleMember(ddb, db.SpindleMember{
391 Did: syntax.DID(did),
392 Rkey: e.Commit.RKey,
393 Instance: record.Instance,
394 Subject: memberId.DID,
395 })
396 if !ok {
397 return fmt.Errorf("failed to add to db: %w", err)
398 }
399
400 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
401 if err != nil {
402 return fmt.Errorf("failed to update ACLs: %w", err)
403 }
404
405 l.Info("added spindle member")
406 case jmodels.CommitOperationDelete:
407 rkey := e.Commit.RKey
408
409 ddb, ok := i.Db.Execer.(*db.DB)
410 if !ok {
411 return fmt.Errorf("failed to index profile record, invalid db cast")
412 }
413
414 // get record from db first
415 members, err := db.GetSpindleMembers(
416 ddb,
417 db.FilterEq("did", did),
418 db.FilterEq("rkey", rkey),
419 )
420 if err != nil || len(members) != 1 {
421 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
422 }
423 member := members[0]
424
425 tx, err := ddb.Begin()
426 if err != nil {
427 return fmt.Errorf("failed to start txn: %w", err)
428 }
429
430 // remove record by rkey && update enforcer
431 if err = db.RemoveSpindleMember(
432 tx,
433 db.FilterEq("did", did),
434 db.FilterEq("rkey", rkey),
435 ); err != nil {
436 return fmt.Errorf("failed to remove from db: %w", err)
437 }
438
439 // update enforcer
440 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
441 if err != nil {
442 return fmt.Errorf("failed to update ACLs: %w", err)
443 }
444
445 if err = tx.Commit(); err != nil {
446 return fmt.Errorf("failed to commit txn: %w", err)
447 }
448
449 if err = i.Enforcer.E.SavePolicy(); err != nil {
450 return fmt.Errorf("failed to save ACLs: %w", err)
451 }
452
453 l.Info("removed spindle member")
454 }
455
456 return nil
457}
458
459func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
460 did := e.Did
461 var err error
462
463 l := i.Logger.With("handler", "ingestSpindle")
464 l = l.With("nsid", e.Commit.Collection)
465
466 switch e.Commit.Operation {
467 case jmodels.CommitOperationCreate:
468 raw := json.RawMessage(e.Commit.Record)
469 record := tangled.Spindle{}
470 err = json.Unmarshal(raw, &record)
471 if err != nil {
472 l.Error("invalid record", "err", err)
473 return err
474 }
475
476 instance := e.Commit.RKey
477
478 ddb, ok := i.Db.Execer.(*db.DB)
479 if !ok {
480 return fmt.Errorf("failed to index profile record, invalid db cast")
481 }
482
483 err := db.AddSpindle(ddb, db.Spindle{
484 Owner: syntax.DID(did),
485 Instance: instance,
486 })
487 if err != nil {
488 l.Error("failed to add spindle to db", "err", err, "instance", instance)
489 return err
490 }
491
492 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
493 if err != nil {
494 l.Error("failed to add spindle to db", "err", err, "instance", instance)
495 return err
496 }
497
498 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
499 if err != nil {
500 return fmt.Errorf("failed to mark verified: %w", err)
501 }
502
503 return nil
504
505 case jmodels.CommitOperationDelete:
506 instance := e.Commit.RKey
507
508 ddb, ok := i.Db.Execer.(*db.DB)
509 if !ok {
510 return fmt.Errorf("failed to index profile record, invalid db cast")
511 }
512
513 // get record from db first
514 spindles, err := db.GetSpindles(
515 ddb,
516 db.FilterEq("owner", did),
517 db.FilterEq("instance", instance),
518 )
519 if err != nil || len(spindles) != 1 {
520 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
521 }
522 spindle := spindles[0]
523
524 tx, err := ddb.Begin()
525 if err != nil {
526 return err
527 }
528 defer func() {
529 tx.Rollback()
530 i.Enforcer.E.LoadPolicy()
531 }()
532
533 // remove spindle members first
534 err = db.RemoveSpindleMember(
535 tx,
536 db.FilterEq("owner", did),
537 db.FilterEq("instance", instance),
538 )
539 if err != nil {
540 return err
541 }
542
543 err = db.DeleteSpindle(
544 tx,
545 db.FilterEq("owner", did),
546 db.FilterEq("instance", instance),
547 )
548 if err != nil {
549 return err
550 }
551
552 if spindle.Verified != nil {
553 err = i.Enforcer.RemoveSpindle(instance)
554 if err != nil {
555 return err
556 }
557 }
558
559 err = tx.Commit()
560 if err != nil {
561 return err
562 }
563
564 err = i.Enforcer.E.SavePolicy()
565 if err != nil {
566 return err
567 }
568 }
569
570 return nil
571}
572
573func (i *Ingester) ingestString(e *jmodels.Event) error {
574 did := e.Did
575 rkey := e.Commit.RKey
576
577 var err error
578
579 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
580 l.Info("ingesting record")
581
582 ddb, ok := i.Db.Execer.(*db.DB)
583 if !ok {
584 return fmt.Errorf("failed to index string record, invalid db cast")
585 }
586
587 switch e.Commit.Operation {
588 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
589 raw := json.RawMessage(e.Commit.Record)
590 record := tangled.String{}
591 err = json.Unmarshal(raw, &record)
592 if err != nil {
593 l.Error("invalid record", "err", err)
594 return err
595 }
596
597 string := db.StringFromRecord(did, rkey, record)
598
599 if err = string.Validate(); err != nil {
600 l.Error("invalid record", "err", err)
601 return err
602 }
603
604 if err = db.AddString(ddb, string); err != nil {
605 l.Error("failed to add string", "err", err)
606 return err
607 }
608
609 return nil
610
611 case jmodels.CommitOperationDelete:
612 if err := db.DeleteString(
613 ddb,
614 db.FilterEq("did", did),
615 db.FilterEq("rkey", rkey),
616 ); err != nil {
617 l.Error("failed to delete", "err", err)
618 return fmt.Errorf("failed to delete string record: %w", err)
619 }
620
621 return nil
622 }
623
624 return nil
625}
626
627func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
628 did := e.Did
629 var err error
630
631 l := i.Logger.With("handler", "ingestKnotMember")
632 l = l.With("nsid", e.Commit.Collection)
633
634 switch e.Commit.Operation {
635 case jmodels.CommitOperationCreate:
636 raw := json.RawMessage(e.Commit.Record)
637 record := tangled.KnotMember{}
638 err = json.Unmarshal(raw, &record)
639 if err != nil {
640 l.Error("invalid record", "err", err)
641 return err
642 }
643
644 // only knot owner can invite to knots
645 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
646 if err != nil || !ok {
647 return fmt.Errorf("failed to enforce permissions: %w", err)
648 }
649
650 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
651 if err != nil {
652 return err
653 }
654
655 if memberId.Handle.IsInvalidHandle() {
656 return err
657 }
658
659 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
660 if err != nil {
661 return fmt.Errorf("failed to update ACLs: %w", err)
662 }
663
664 l.Info("added knot member")
665 case jmodels.CommitOperationDelete:
666 // we don't store knot members in a table (like we do for spindle)
667 // and we can't remove this just yet. possibly fixed if we switch
668 // to either:
669 // 1. a knot_members table like with spindle and store the rkey
670 // 2. use the knot host as the rkey
671 //
672 // TODO: implement member deletion
673 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
674 }
675
676 return nil
677}
678
679func (i *Ingester) ingestKnot(e *jmodels.Event) error {
680 did := e.Did
681 var err error
682
683 l := i.Logger.With("handler", "ingestKnot")
684 l = l.With("nsid", e.Commit.Collection)
685
686 switch e.Commit.Operation {
687 case jmodels.CommitOperationCreate:
688 raw := json.RawMessage(e.Commit.Record)
689 record := tangled.Knot{}
690 err = json.Unmarshal(raw, &record)
691 if err != nil {
692 l.Error("invalid record", "err", err)
693 return err
694 }
695
696 domain := e.Commit.RKey
697
698 ddb, ok := i.Db.Execer.(*db.DB)
699 if !ok {
700 return fmt.Errorf("failed to index profile record, invalid db cast")
701 }
702
703 err := db.AddKnot(ddb, domain, did)
704 if err != nil {
705 l.Error("failed to add knot to db", "err", err, "domain", domain)
706 return err
707 }
708
709 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
710 if err != nil {
711 l.Error("failed to verify knot", "err", err, "domain", domain)
712 return err
713 }
714
715 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
716 if err != nil {
717 return fmt.Errorf("failed to mark verified: %w", err)
718 }
719
720 return nil
721
722 case jmodels.CommitOperationDelete:
723 domain := e.Commit.RKey
724
725 ddb, ok := i.Db.Execer.(*db.DB)
726 if !ok {
727 return fmt.Errorf("failed to index knot record, invalid db cast")
728 }
729
730 // get record from db first
731 registrations, err := db.GetRegistrations(
732 ddb,
733 db.FilterEq("domain", domain),
734 db.FilterEq("did", did),
735 )
736 if err != nil {
737 return fmt.Errorf("failed to get registration: %w", err)
738 }
739 if len(registrations) != 1 {
740 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
741 }
742 registration := registrations[0]
743
744 tx, err := ddb.Begin()
745 if err != nil {
746 return err
747 }
748 defer func() {
749 tx.Rollback()
750 i.Enforcer.E.LoadPolicy()
751 }()
752
753 err = db.DeleteKnot(
754 tx,
755 db.FilterEq("did", did),
756 db.FilterEq("domain", domain),
757 )
758 if err != nil {
759 return err
760 }
761
762 if registration.Registered != nil {
763 err = i.Enforcer.RemoveKnot(domain)
764 if err != nil {
765 return err
766 }
767 }
768
769 err = tx.Commit()
770 if err != nil {
771 return err
772 }
773
774 err = i.Enforcer.E.SavePolicy()
775 if err != nil {
776 return err
777 }
778 }
779
780 return nil
781}
782func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
783 did := e.Did
784 rkey := e.Commit.RKey
785
786 var err error
787
788 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
789 l.Info("ingesting record")
790
791 ddb, ok := i.Db.Execer.(*db.DB)
792 if !ok {
793 return fmt.Errorf("failed to index issue record, invalid db cast")
794 }
795
796 switch e.Commit.Operation {
797 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
798 raw := json.RawMessage(e.Commit.Record)
799 record := tangled.RepoIssue{}
800 err = json.Unmarshal(raw, &record)
801 if err != nil {
802 l.Error("invalid record", "err", err)
803 return err
804 }
805
806 issue := models.IssueFromRecord(did, rkey, record)
807
808 if err := i.Validator.ValidateIssue(&issue); err != nil {
809 return fmt.Errorf("failed to validate issue: %w", err)
810 }
811
812 tx, err := ddb.BeginTx(ctx, nil)
813 if err != nil {
814 l.Error("failed to begin transaction", "err", err)
815 return err
816 }
817 defer tx.Rollback()
818
819 err = db.PutIssue(tx, &issue)
820 if err != nil {
821 l.Error("failed to create issue", "err", err)
822 return err
823 }
824
825 err = tx.Commit()
826 if err != nil {
827 l.Error("failed to commit txn", "err", err)
828 return err
829 }
830
831 return nil
832
833 case jmodels.CommitOperationDelete:
834 if err := db.DeleteIssues(
835 ddb,
836 db.FilterEq("did", did),
837 db.FilterEq("rkey", rkey),
838 ); err != nil {
839 l.Error("failed to delete", "err", err)
840 return fmt.Errorf("failed to delete issue record: %w", err)
841 }
842
843 return nil
844 }
845
846 return nil
847}
848
849func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
850 did := e.Did
851 rkey := e.Commit.RKey
852
853 var err error
854
855 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
856 l.Info("ingesting record")
857
858 ddb, ok := i.Db.Execer.(*db.DB)
859 if !ok {
860 return fmt.Errorf("failed to index issue comment record, invalid db cast")
861 }
862
863 switch e.Commit.Operation {
864 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
865 raw := json.RawMessage(e.Commit.Record)
866 record := tangled.RepoIssueComment{}
867 err = json.Unmarshal(raw, &record)
868 if err != nil {
869 return fmt.Errorf("invalid record: %w", err)
870 }
871
872 comment, err := models.IssueCommentFromRecord(did, rkey, record)
873 if err != nil {
874 return fmt.Errorf("failed to parse comment from record: %w", err)
875 }
876
877 if err := i.Validator.ValidateIssueComment(comment); err != nil {
878 return fmt.Errorf("failed to validate comment: %w", err)
879 }
880
881 _, err = db.AddIssueComment(ddb, *comment)
882 if err != nil {
883 return fmt.Errorf("failed to create issue comment: %w", err)
884 }
885
886 return nil
887
888 case jmodels.CommitOperationDelete:
889 if err := db.DeleteIssueComments(
890 ddb,
891 db.FilterEq("did", did),
892 db.FilterEq("rkey", rkey),
893 ); err != nil {
894 return fmt.Errorf("failed to delete issue comment record: %w", err)
895 }
896
897 return nil
898 }
899
900 return nil
901}
902
903func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
904 did := e.Did
905 rkey := e.Commit.RKey
906
907 var err error
908
909 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
910 l.Info("ingesting record")
911
912 ddb, ok := i.Db.Execer.(*db.DB)
913 if !ok {
914 return fmt.Errorf("failed to index label definition, invalid db cast")
915 }
916
917 switch e.Commit.Operation {
918 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
919 raw := json.RawMessage(e.Commit.Record)
920 record := tangled.LabelDefinition{}
921 err = json.Unmarshal(raw, &record)
922 if err != nil {
923 return fmt.Errorf("invalid record: %w", err)
924 }
925
926 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
927 if err != nil {
928 return fmt.Errorf("failed to parse labeldef from record: %w", err)
929 }
930
931 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
932 return fmt.Errorf("failed to validate labeldef: %w", err)
933 }
934
935 _, err = db.AddLabelDefinition(ddb, def)
936 if err != nil {
937 return fmt.Errorf("failed to create labeldef: %w", err)
938 }
939
940 return nil
941
942 case jmodels.CommitOperationDelete:
943 if err := db.DeleteLabelDefinition(
944 ddb,
945 db.FilterEq("did", did),
946 db.FilterEq("rkey", rkey),
947 ); err != nil {
948 return fmt.Errorf("failed to delete labeldef record: %w", err)
949 }
950
951 return nil
952 }
953
954 return nil
955}