forked from tangled.org/core
this repo has no description
at master 21 kB view raw
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "github.com/bluesky-social/jetstream/pkg/models" 13 "github.com/go-git/go-git/v5/plumbing" 14 "github.com/ipfs/go-cid" 15 "tangled.sh/tangled.sh/core/api/tangled" 16 "tangled.sh/tangled.sh/core/appview/config" 17 "tangled.sh/tangled.sh/core/appview/db" 18 "tangled.sh/tangled.sh/core/appview/serververify" 19 "tangled.sh/tangled.sh/core/appview/validator" 20 "tangled.sh/tangled.sh/core/idresolver" 21 "tangled.sh/tangled.sh/core/rbac" 22) 23 24type Ingester struct { 25 Db db.DbWrapper 26 Enforcer *rbac.Enforcer 27 IdResolver *idresolver.Resolver 28 Config *config.Config 29 Logger *slog.Logger 30 Validator *validator.Validator 31} 32 33type processFunc func(ctx context.Context, e *models.Event) error 34 35func (i *Ingester) Ingest() processFunc { 36 return func(ctx context.Context, e *models.Event) error { 37 var err error 38 defer func() { 39 eventTime := e.TimeUS 40 lastTimeUs := eventTime + 1 41 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 42 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 43 } 44 }() 45 46 l := i.Logger.With("kind", e.Kind) 47 switch e.Kind { 48 case models.EventKindAccount: 49 if !e.Account.Active && *e.Account.Status == "deactivated" { 50 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 51 } 52 case models.EventKindIdentity: 53 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 54 case models.EventKindCommit: 55 switch e.Commit.Collection { 56 case tangled.GraphFollowNSID: 57 err = i.ingestFollow(e) 58 case tangled.FeedStarNSID: 59 err = i.ingestStar(e) 60 case tangled.PublicKeyNSID: 61 err = i.ingestPublicKey(e) 62 case tangled.RepoArtifactNSID: 63 err = i.ingestArtifact(e) 64 case tangled.ActorProfileNSID: 65 err = i.ingestProfile(e) 66 case tangled.SpindleMemberNSID: 67 err = i.ingestSpindleMember(ctx, e) 68 case tangled.SpindleNSID: 69 err = i.ingestSpindle(ctx, e) 70 case tangled.KnotMemberNSID: 71 err = i.ingestKnotMember(e) 72 case tangled.KnotNSID: 73 err = i.ingestKnot(e) 74 case tangled.StringNSID: 75 err = i.ingestString(e) 76 case tangled.RepoIssueNSID: 77 err = i.ingestIssue(ctx, e) 78 case tangled.RepoIssueCommentNSID: 79 err = i.ingestIssueComment(e) 80 } 81 l = i.Logger.With("nsid", e.Commit.Collection) 82 } 83 84 if err != nil { 85 l.Debug("error ingesting record", "err", err) 86 } 87 88 return nil 89 } 90} 91 92func (i *Ingester) ingestStar(e *models.Event) error { 93 var err error 94 did := e.Did 95 96 l := i.Logger.With("handler", "ingestStar") 97 l = l.With("nsid", e.Commit.Collection) 98 99 switch e.Commit.Operation { 100 case models.CommitOperationCreate, models.CommitOperationUpdate: 101 var subjectUri syntax.ATURI 102 103 raw := json.RawMessage(e.Commit.Record) 104 record := tangled.FeedStar{} 105 err := json.Unmarshal(raw, &record) 106 if err != nil { 107 l.Error("invalid record", "err", err) 108 return err 109 } 110 111 subjectUri, err = syntax.ParseATURI(record.Subject) 112 if err != nil { 113 l.Error("invalid record", "err", err) 114 return err 115 } 116 err = db.AddStar(i.Db, &db.Star{ 117 StarredByDid: did, 118 RepoAt: subjectUri, 119 Rkey: e.Commit.RKey, 120 }) 121 case models.CommitOperationDelete: 122 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 123 } 124 125 if err != nil { 126 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 127 } 128 129 return nil 130} 131 132func (i *Ingester) ingestFollow(e *models.Event) error { 133 var err error 134 did := e.Did 135 136 l := i.Logger.With("handler", "ingestFollow") 137 l = l.With("nsid", e.Commit.Collection) 138 139 switch e.Commit.Operation { 140 case models.CommitOperationCreate, models.CommitOperationUpdate: 141 raw := json.RawMessage(e.Commit.Record) 142 record := tangled.GraphFollow{} 143 err = json.Unmarshal(raw, &record) 144 if err != nil { 145 l.Error("invalid record", "err", err) 146 return err 147 } 148 149 err = db.AddFollow(i.Db, &db.Follow{ 150 UserDid: did, 151 SubjectDid: record.Subject, 152 Rkey: e.Commit.RKey, 153 }) 154 case models.CommitOperationDelete: 155 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 156 } 157 158 if err != nil { 159 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 160 } 161 162 return nil 163} 164 165func (i *Ingester) ingestPublicKey(e *models.Event) error { 166 did := e.Did 167 var err error 168 169 l := i.Logger.With("handler", "ingestPublicKey") 170 l = l.With("nsid", e.Commit.Collection) 171 172 switch e.Commit.Operation { 173 case models.CommitOperationCreate, models.CommitOperationUpdate: 174 l.Debug("processing add of pubkey") 175 raw := json.RawMessage(e.Commit.Record) 176 record := tangled.PublicKey{} 177 err = json.Unmarshal(raw, &record) 178 if err != nil { 179 l.Error("invalid record", "err", err) 180 return err 181 } 182 183 name := record.Name 184 key := record.Key 185 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 186 case models.CommitOperationDelete: 187 l.Debug("processing delete of pubkey") 188 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 189 } 190 191 if err != nil { 192 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 193 } 194 195 return nil 196} 197 198func (i *Ingester) ingestArtifact(e *models.Event) error { 199 did := e.Did 200 var err error 201 202 l := i.Logger.With("handler", "ingestArtifact") 203 l = l.With("nsid", e.Commit.Collection) 204 205 switch e.Commit.Operation { 206 case models.CommitOperationCreate, models.CommitOperationUpdate: 207 raw := json.RawMessage(e.Commit.Record) 208 record := tangled.RepoArtifact{} 209 err = json.Unmarshal(raw, &record) 210 if err != nil { 211 l.Error("invalid record", "err", err) 212 return err 213 } 214 215 repoAt, err := syntax.ParseATURI(record.Repo) 216 if err != nil { 217 return err 218 } 219 220 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 221 if err != nil { 222 return err 223 } 224 225 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 226 if err != nil || !ok { 227 return err 228 } 229 230 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 231 if err != nil { 232 createdAt = time.Now() 233 } 234 235 artifact := db.Artifact{ 236 Did: did, 237 Rkey: e.Commit.RKey, 238 RepoAt: repoAt, 239 Tag: plumbing.Hash(record.Tag), 240 CreatedAt: createdAt, 241 BlobCid: cid.Cid(record.Artifact.Ref), 242 Name: record.Name, 243 Size: uint64(record.Artifact.Size), 244 MimeType: record.Artifact.MimeType, 245 } 246 247 err = db.AddArtifact(i.Db, artifact) 248 case models.CommitOperationDelete: 249 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 250 } 251 252 if err != nil { 253 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 254 } 255 256 return nil 257} 258 259func (i *Ingester) ingestProfile(e *models.Event) error { 260 did := e.Did 261 var err error 262 263 l := i.Logger.With("handler", "ingestProfile") 264 l = l.With("nsid", e.Commit.Collection) 265 266 if e.Commit.RKey != "self" { 267 return fmt.Errorf("ingestProfile only ingests `self` record") 268 } 269 270 switch e.Commit.Operation { 271 case models.CommitOperationCreate, models.CommitOperationUpdate: 272 raw := json.RawMessage(e.Commit.Record) 273 record := tangled.ActorProfile{} 274 err = json.Unmarshal(raw, &record) 275 if err != nil { 276 l.Error("invalid record", "err", err) 277 return err 278 } 279 280 description := "" 281 if record.Description != nil { 282 description = *record.Description 283 } 284 285 includeBluesky := record.Bluesky 286 287 location := "" 288 if record.Location != nil { 289 location = *record.Location 290 } 291 292 var links [5]string 293 for i, l := range record.Links { 294 if i < 5 { 295 links[i] = l 296 } 297 } 298 299 var stats [2]db.VanityStat 300 for i, s := range record.Stats { 301 if i < 2 { 302 stats[i].Kind = db.VanityStatKind(s) 303 } 304 } 305 306 var pinned [6]syntax.ATURI 307 for i, r := range record.PinnedRepositories { 308 if i < 6 { 309 pinned[i] = syntax.ATURI(r) 310 } 311 } 312 313 profile := db.Profile{ 314 Did: did, 315 Description: description, 316 IncludeBluesky: includeBluesky, 317 Location: location, 318 Links: links, 319 Stats: stats, 320 PinnedRepos: pinned, 321 } 322 323 ddb, ok := i.Db.Execer.(*db.DB) 324 if !ok { 325 return fmt.Errorf("failed to index profile record, invalid db cast") 326 } 327 328 tx, err := ddb.Begin() 329 if err != nil { 330 return fmt.Errorf("failed to start transaction") 331 } 332 333 err = db.ValidateProfile(tx, &profile) 334 if err != nil { 335 return fmt.Errorf("invalid profile record") 336 } 337 338 err = db.UpsertProfile(tx, &profile) 339 case models.CommitOperationDelete: 340 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 341 } 342 343 if err != nil { 344 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 345 } 346 347 return nil 348} 349 350func (i *Ingester) ingestSpindleMember(ctx context.Context, e *models.Event) error { 351 did := e.Did 352 var err error 353 354 l := i.Logger.With("handler", "ingestSpindleMember") 355 l = l.With("nsid", e.Commit.Collection) 356 357 switch e.Commit.Operation { 358 case models.CommitOperationCreate: 359 raw := json.RawMessage(e.Commit.Record) 360 record := tangled.SpindleMember{} 361 err = json.Unmarshal(raw, &record) 362 if err != nil { 363 l.Error("invalid record", "err", err) 364 return err 365 } 366 367 // only spindle owner can invite to spindles 368 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 369 if err != nil || !ok { 370 return fmt.Errorf("failed to enforce permissions: %w", err) 371 } 372 373 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 374 if err != nil { 375 return err 376 } 377 378 if memberId.Handle.IsInvalidHandle() { 379 return err 380 } 381 382 ddb, ok := i.Db.Execer.(*db.DB) 383 if !ok { 384 return fmt.Errorf("failed to index profile record, invalid db cast") 385 } 386 387 err = db.AddSpindleMember(ddb, db.SpindleMember{ 388 Did: syntax.DID(did), 389 Rkey: e.Commit.RKey, 390 Instance: record.Instance, 391 Subject: memberId.DID, 392 }) 393 if !ok { 394 return fmt.Errorf("failed to add to db: %w", err) 395 } 396 397 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 398 if err != nil { 399 return fmt.Errorf("failed to update ACLs: %w", err) 400 } 401 402 l.Info("added spindle member") 403 case models.CommitOperationDelete: 404 rkey := e.Commit.RKey 405 406 ddb, ok := i.Db.Execer.(*db.DB) 407 if !ok { 408 return fmt.Errorf("failed to index profile record, invalid db cast") 409 } 410 411 // get record from db first 412 members, err := db.GetSpindleMembers( 413 ddb, 414 db.FilterEq("did", did), 415 db.FilterEq("rkey", rkey), 416 ) 417 if err != nil || len(members) != 1 { 418 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 419 } 420 member := members[0] 421 422 tx, err := ddb.Begin() 423 if err != nil { 424 return fmt.Errorf("failed to start txn: %w", err) 425 } 426 427 // remove record by rkey && update enforcer 428 if err = db.RemoveSpindleMember( 429 tx, 430 db.FilterEq("did", did), 431 db.FilterEq("rkey", rkey), 432 ); err != nil { 433 return fmt.Errorf("failed to remove from db: %w", err) 434 } 435 436 // update enforcer 437 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 438 if err != nil { 439 return fmt.Errorf("failed to update ACLs: %w", err) 440 } 441 442 if err = tx.Commit(); err != nil { 443 return fmt.Errorf("failed to commit txn: %w", err) 444 } 445 446 if err = i.Enforcer.E.SavePolicy(); err != nil { 447 return fmt.Errorf("failed to save ACLs: %w", err) 448 } 449 450 l.Info("removed spindle member") 451 } 452 453 return nil 454} 455 456func (i *Ingester) ingestSpindle(ctx context.Context, e *models.Event) error { 457 did := e.Did 458 var err error 459 460 l := i.Logger.With("handler", "ingestSpindle") 461 l = l.With("nsid", e.Commit.Collection) 462 463 switch e.Commit.Operation { 464 case models.CommitOperationCreate: 465 raw := json.RawMessage(e.Commit.Record) 466 record := tangled.Spindle{} 467 err = json.Unmarshal(raw, &record) 468 if err != nil { 469 l.Error("invalid record", "err", err) 470 return err 471 } 472 473 instance := e.Commit.RKey 474 475 ddb, ok := i.Db.Execer.(*db.DB) 476 if !ok { 477 return fmt.Errorf("failed to index profile record, invalid db cast") 478 } 479 480 err := db.AddSpindle(ddb, db.Spindle{ 481 Owner: syntax.DID(did), 482 Instance: instance, 483 }) 484 if err != nil { 485 l.Error("failed to add spindle to db", "err", err, "instance", instance) 486 return err 487 } 488 489 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 490 if err != nil { 491 l.Error("failed to add spindle to db", "err", err, "instance", instance) 492 return err 493 } 494 495 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 496 if err != nil { 497 return fmt.Errorf("failed to mark verified: %w", err) 498 } 499 500 return nil 501 502 case models.CommitOperationDelete: 503 instance := e.Commit.RKey 504 505 ddb, ok := i.Db.Execer.(*db.DB) 506 if !ok { 507 return fmt.Errorf("failed to index profile record, invalid db cast") 508 } 509 510 // get record from db first 511 spindles, err := db.GetSpindles( 512 ddb, 513 db.FilterEq("owner", did), 514 db.FilterEq("instance", instance), 515 ) 516 if err != nil || len(spindles) != 1 { 517 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 518 } 519 spindle := spindles[0] 520 521 tx, err := ddb.Begin() 522 if err != nil { 523 return err 524 } 525 defer func() { 526 tx.Rollback() 527 i.Enforcer.E.LoadPolicy() 528 }() 529 530 // remove spindle members first 531 err = db.RemoveSpindleMember( 532 tx, 533 db.FilterEq("owner", did), 534 db.FilterEq("instance", instance), 535 ) 536 if err != nil { 537 return err 538 } 539 540 err = db.DeleteSpindle( 541 tx, 542 db.FilterEq("owner", did), 543 db.FilterEq("instance", instance), 544 ) 545 if err != nil { 546 return err 547 } 548 549 if spindle.Verified != nil { 550 err = i.Enforcer.RemoveSpindle(instance) 551 if err != nil { 552 return err 553 } 554 } 555 556 err = tx.Commit() 557 if err != nil { 558 return err 559 } 560 561 err = i.Enforcer.E.SavePolicy() 562 if err != nil { 563 return err 564 } 565 } 566 567 return nil 568} 569 570func (i *Ingester) ingestString(e *models.Event) error { 571 did := e.Did 572 rkey := e.Commit.RKey 573 574 var err error 575 576 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 577 l.Info("ingesting record") 578 579 ddb, ok := i.Db.Execer.(*db.DB) 580 if !ok { 581 return fmt.Errorf("failed to index string record, invalid db cast") 582 } 583 584 switch e.Commit.Operation { 585 case models.CommitOperationCreate, models.CommitOperationUpdate: 586 raw := json.RawMessage(e.Commit.Record) 587 record := tangled.String{} 588 err = json.Unmarshal(raw, &record) 589 if err != nil { 590 l.Error("invalid record", "err", err) 591 return err 592 } 593 594 string := db.StringFromRecord(did, rkey, record) 595 596 if err = string.Validate(); err != nil { 597 l.Error("invalid record", "err", err) 598 return err 599 } 600 601 if err = db.AddString(ddb, string); err != nil { 602 l.Error("failed to add string", "err", err) 603 return err 604 } 605 606 return nil 607 608 case models.CommitOperationDelete: 609 if err := db.DeleteString( 610 ddb, 611 db.FilterEq("did", did), 612 db.FilterEq("rkey", rkey), 613 ); err != nil { 614 l.Error("failed to delete", "err", err) 615 return fmt.Errorf("failed to delete string record: %w", err) 616 } 617 618 return nil 619 } 620 621 return nil 622} 623 624func (i *Ingester) ingestKnotMember(e *models.Event) error { 625 did := e.Did 626 var err error 627 628 l := i.Logger.With("handler", "ingestKnotMember") 629 l = l.With("nsid", e.Commit.Collection) 630 631 switch e.Commit.Operation { 632 case models.CommitOperationCreate: 633 raw := json.RawMessage(e.Commit.Record) 634 record := tangled.KnotMember{} 635 err = json.Unmarshal(raw, &record) 636 if err != nil { 637 l.Error("invalid record", "err", err) 638 return err 639 } 640 641 // only knot owner can invite to knots 642 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 643 if err != nil || !ok { 644 return fmt.Errorf("failed to enforce permissions: %w", err) 645 } 646 647 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 648 if err != nil { 649 return err 650 } 651 652 if memberId.Handle.IsInvalidHandle() { 653 return err 654 } 655 656 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 657 if err != nil { 658 return fmt.Errorf("failed to update ACLs: %w", err) 659 } 660 661 l.Info("added knot member") 662 case models.CommitOperationDelete: 663 // we don't store knot members in a table (like we do for spindle) 664 // and we can't remove this just yet. possibly fixed if we switch 665 // to either: 666 // 1. a knot_members table like with spindle and store the rkey 667 // 2. use the knot host as the rkey 668 // 669 // TODO: implement member deletion 670 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 671 } 672 673 return nil 674} 675 676func (i *Ingester) ingestKnot(e *models.Event) error { 677 did := e.Did 678 var err error 679 680 l := i.Logger.With("handler", "ingestKnot") 681 l = l.With("nsid", e.Commit.Collection) 682 683 switch e.Commit.Operation { 684 case models.CommitOperationCreate: 685 raw := json.RawMessage(e.Commit.Record) 686 record := tangled.Knot{} 687 err = json.Unmarshal(raw, &record) 688 if err != nil { 689 l.Error("invalid record", "err", err) 690 return err 691 } 692 693 domain := e.Commit.RKey 694 695 ddb, ok := i.Db.Execer.(*db.DB) 696 if !ok { 697 return fmt.Errorf("failed to index profile record, invalid db cast") 698 } 699 700 err := db.AddKnot(ddb, domain, did) 701 if err != nil { 702 l.Error("failed to add knot to db", "err", err, "domain", domain) 703 return err 704 } 705 706 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 707 if err != nil { 708 l.Error("failed to verify knot", "err", err, "domain", domain) 709 return err 710 } 711 712 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 713 if err != nil { 714 return fmt.Errorf("failed to mark verified: %w", err) 715 } 716 717 return nil 718 719 case models.CommitOperationDelete: 720 domain := e.Commit.RKey 721 722 ddb, ok := i.Db.Execer.(*db.DB) 723 if !ok { 724 return fmt.Errorf("failed to index knot record, invalid db cast") 725 } 726 727 // get record from db first 728 registrations, err := db.GetRegistrations( 729 ddb, 730 db.FilterEq("domain", domain), 731 db.FilterEq("did", did), 732 ) 733 if err != nil { 734 return fmt.Errorf("failed to get registration: %w", err) 735 } 736 if len(registrations) != 1 { 737 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 738 } 739 registration := registrations[0] 740 741 tx, err := ddb.Begin() 742 if err != nil { 743 return err 744 } 745 defer func() { 746 tx.Rollback() 747 i.Enforcer.E.LoadPolicy() 748 }() 749 750 err = db.DeleteKnot( 751 tx, 752 db.FilterEq("did", did), 753 db.FilterEq("domain", domain), 754 ) 755 if err != nil { 756 return err 757 } 758 759 if registration.Registered != nil { 760 err = i.Enforcer.RemoveKnot(domain) 761 if err != nil { 762 return err 763 } 764 } 765 766 err = tx.Commit() 767 if err != nil { 768 return err 769 } 770 771 err = i.Enforcer.E.SavePolicy() 772 if err != nil { 773 return err 774 } 775 } 776 777 return nil 778} 779func (i *Ingester) ingestIssue(ctx context.Context, e *models.Event) error { 780 did := e.Did 781 rkey := e.Commit.RKey 782 783 var err error 784 785 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 786 l.Info("ingesting record") 787 788 ddb, ok := i.Db.Execer.(*db.DB) 789 if !ok { 790 return fmt.Errorf("failed to index issue record, invalid db cast") 791 } 792 793 switch e.Commit.Operation { 794 case models.CommitOperationCreate, models.CommitOperationUpdate: 795 raw := json.RawMessage(e.Commit.Record) 796 record := tangled.RepoIssue{} 797 err = json.Unmarshal(raw, &record) 798 if err != nil { 799 l.Error("invalid record", "err", err) 800 return err 801 } 802 803 issue := db.IssueFromRecord(did, rkey, record) 804 805 if err := i.Validator.ValidateIssue(&issue); err != nil { 806 return fmt.Errorf("failed to validate issue: %w", err) 807 } 808 809 tx, err := ddb.BeginTx(ctx, nil) 810 if err != nil { 811 l.Error("failed to begin transaction", "err", err) 812 return err 813 } 814 defer tx.Rollback() 815 816 err = db.PutIssue(tx, &issue) 817 if err != nil { 818 l.Error("failed to create issue", "err", err) 819 return err 820 } 821 822 err = tx.Commit() 823 if err != nil { 824 l.Error("failed to commit txn", "err", err) 825 return err 826 } 827 828 return nil 829 830 case models.CommitOperationDelete: 831 if err := db.DeleteIssues( 832 ddb, 833 db.FilterEq("did", did), 834 db.FilterEq("rkey", rkey), 835 ); err != nil { 836 l.Error("failed to delete", "err", err) 837 return fmt.Errorf("failed to delete issue record: %w", err) 838 } 839 840 return nil 841 } 842 843 return nil 844} 845 846func (i *Ingester) ingestIssueComment(e *models.Event) error { 847 did := e.Did 848 rkey := e.Commit.RKey 849 850 var err error 851 852 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 853 l.Info("ingesting record") 854 855 ddb, ok := i.Db.Execer.(*db.DB) 856 if !ok { 857 return fmt.Errorf("failed to index issue comment record, invalid db cast") 858 } 859 860 switch e.Commit.Operation { 861 case models.CommitOperationCreate, models.CommitOperationUpdate: 862 raw := json.RawMessage(e.Commit.Record) 863 record := tangled.RepoIssueComment{} 864 err = json.Unmarshal(raw, &record) 865 if err != nil { 866 return fmt.Errorf("invalid record: %w", err) 867 } 868 869 comment, err := db.IssueCommentFromRecord(ddb, did, rkey, record) 870 if err != nil { 871 return fmt.Errorf("failed to parse comment from record: %w", err) 872 } 873 874 if err := i.Validator.ValidateIssueComment(comment); err != nil { 875 return fmt.Errorf("failed to validate comment: %w", err) 876 } 877 878 _, err = db.AddIssueComment(ddb, *comment) 879 if err != nil { 880 return fmt.Errorf("failed to create issue comment: %w", err) 881 } 882 883 return nil 884 885 case models.CommitOperationDelete: 886 if err := db.DeleteIssueComments( 887 ddb, 888 db.FilterEq("did", did), 889 db.FilterEq("rkey", rkey), 890 ); err != nil { 891 return fmt.Errorf("failed to delete issue comment record: %w", err) 892 } 893 894 return nil 895 } 896 897 return nil 898}