forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "github.com/bluesky-social/jetstream/pkg/models" 13 "github.com/go-git/go-git/v5/plumbing" 14 "github.com/ipfs/go-cid" 15 "tangled.sh/tangled.sh/core/api/tangled" 16 "tangled.sh/tangled.sh/core/appview/config" 17 "tangled.sh/tangled.sh/core/appview/db" 18 "tangled.sh/tangled.sh/core/appview/serververify" 19 "tangled.sh/tangled.sh/core/appview/validator" 20 "tangled.sh/tangled.sh/core/idresolver" 21 "tangled.sh/tangled.sh/core/rbac" 22) 23 24type Ingester struct { 25 Db db.DbWrapper 26 Enforcer *rbac.Enforcer 27 IdResolver *idresolver.Resolver 28 Config *config.Config 29 Logger *slog.Logger 30 Validator *validator.Validator 31} 32 33type processFunc func(ctx context.Context, e *models.Event) error 34 35func (i *Ingester) Ingest() processFunc { 36 return func(ctx context.Context, e *models.Event) error { 37 var err error 38 defer func() { 39 eventTime := e.TimeUS 40 lastTimeUs := eventTime + 1 41 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 42 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 43 } 44 }() 45 46 l := i.Logger.With("kind", e.Kind) 47 switch e.Kind { 48 case models.EventKindAccount: 49 if !e.Account.Active && *e.Account.Status == "deactivated" { 50 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 51 } 52 case models.EventKindIdentity: 53 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 54 case models.EventKindCommit: 55 switch e.Commit.Collection { 56 case tangled.GraphFollowNSID: 57 err = i.ingestFollow(e) 58 case tangled.FeedStarNSID: 59 err = i.ingestStar(e) 60 case tangled.PublicKeyNSID: 61 err = i.ingestPublicKey(e) 62 case tangled.RepoArtifactNSID: 63 err = i.ingestArtifact(e) 64 case tangled.ActorProfileNSID: 65 err = i.ingestProfile(e) 66 case tangled.SpindleMemberNSID: 67 err = i.ingestSpindleMember(ctx, e) 68 case tangled.SpindleNSID: 69 err = i.ingestSpindle(ctx, e) 70 case tangled.KnotMemberNSID: 71 err = i.ingestKnotMember(e) 72 case tangled.KnotNSID: 73 err = i.ingestKnot(e) 74 case tangled.StringNSID: 75 err = i.ingestString(e) 76 case tangled.RepoIssueNSID: 77 err = i.ingestIssue(ctx, e) 78 case tangled.RepoIssueCommentNSID: 79 err = i.ingestIssueComment(e) 80 case tangled.LabelDefinitionNSID: 81 err = i.ingestLabelDefinition(e) 82 } 83 l = i.Logger.With("nsid", e.Commit.Collection) 84 } 85 86 if err != nil { 87 l.Debug("error ingesting record", "err", err) 88 } 89 90 return nil 91 } 92} 93 94func (i *Ingester) ingestStar(e *models.Event) error { 95 var err error 96 did := e.Did 97 98 l := i.Logger.With("handler", "ingestStar") 99 l = l.With("nsid", e.Commit.Collection) 100 101 switch e.Commit.Operation { 102 case models.CommitOperationCreate, models.CommitOperationUpdate: 103 var subjectUri syntax.ATURI 104 105 raw := json.RawMessage(e.Commit.Record) 106 record := tangled.FeedStar{} 107 err := json.Unmarshal(raw, &record) 108 if err != nil { 109 l.Error("invalid record", "err", err) 110 return err 111 } 112 113 subjectUri, err = syntax.ParseATURI(record.Subject) 114 if err != nil { 115 l.Error("invalid record", "err", err) 116 return err 117 } 118 err = db.AddStar(i.Db, &db.Star{ 119 StarredByDid: did, 120 RepoAt: subjectUri, 121 Rkey: e.Commit.RKey, 122 }) 123 case models.CommitOperationDelete: 124 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 125 } 126 127 if err != nil { 128 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 129 } 130 131 return nil 132} 133 134func (i *Ingester) ingestFollow(e *models.Event) error { 135 var err error 136 did := e.Did 137 138 l := i.Logger.With("handler", "ingestFollow") 139 l = l.With("nsid", e.Commit.Collection) 140 141 switch e.Commit.Operation { 142 case models.CommitOperationCreate, models.CommitOperationUpdate: 143 raw := json.RawMessage(e.Commit.Record) 144 record := tangled.GraphFollow{} 145 err = json.Unmarshal(raw, &record) 146 if err != nil { 147 l.Error("invalid record", "err", err) 148 return err 149 } 150 151 err = db.AddFollow(i.Db, &db.Follow{ 152 UserDid: did, 153 SubjectDid: record.Subject, 154 Rkey: e.Commit.RKey, 155 }) 156 case models.CommitOperationDelete: 157 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 158 } 159 160 if err != nil { 161 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 162 } 163 164 return nil 165} 166 167func (i *Ingester) ingestPublicKey(e *models.Event) error { 168 did := e.Did 169 var err error 170 171 l := i.Logger.With("handler", "ingestPublicKey") 172 l = l.With("nsid", e.Commit.Collection) 173 174 switch e.Commit.Operation { 175 case models.CommitOperationCreate, models.CommitOperationUpdate: 176 l.Debug("processing add of pubkey") 177 raw := json.RawMessage(e.Commit.Record) 178 record := tangled.PublicKey{} 179 err = json.Unmarshal(raw, &record) 180 if err != nil { 181 l.Error("invalid record", "err", err) 182 return err 183 } 184 185 name := record.Name 186 key := record.Key 187 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 188 case models.CommitOperationDelete: 189 l.Debug("processing delete of pubkey") 190 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 191 } 192 193 if err != nil { 194 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 195 } 196 197 return nil 198} 199 200func (i *Ingester) ingestArtifact(e *models.Event) error { 201 did := e.Did 202 var err error 203 204 l := i.Logger.With("handler", "ingestArtifact") 205 l = l.With("nsid", e.Commit.Collection) 206 207 switch e.Commit.Operation { 208 case models.CommitOperationCreate, models.CommitOperationUpdate: 209 raw := json.RawMessage(e.Commit.Record) 210 record := tangled.RepoArtifact{} 211 err = json.Unmarshal(raw, &record) 212 if err != nil { 213 l.Error("invalid record", "err", err) 214 return err 215 } 216 217 repoAt, err := syntax.ParseATURI(record.Repo) 218 if err != nil { 219 return err 220 } 221 222 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 223 if err != nil { 224 return err 225 } 226 227 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 228 if err != nil || !ok { 229 return err 230 } 231 232 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 233 if err != nil { 234 createdAt = time.Now() 235 } 236 237 artifact := db.Artifact{ 238 Did: did, 239 Rkey: e.Commit.RKey, 240 RepoAt: repoAt, 241 Tag: plumbing.Hash(record.Tag), 242 CreatedAt: createdAt, 243 BlobCid: cid.Cid(record.Artifact.Ref), 244 Name: record.Name, 245 Size: uint64(record.Artifact.Size), 246 MimeType: record.Artifact.MimeType, 247 } 248 249 err = db.AddArtifact(i.Db, artifact) 250 case models.CommitOperationDelete: 251 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 252 } 253 254 if err != nil { 255 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 256 } 257 258 return nil 259} 260 261func (i *Ingester) ingestProfile(e *models.Event) error { 262 did := e.Did 263 var err error 264 265 l := i.Logger.With("handler", "ingestProfile") 266 l = l.With("nsid", e.Commit.Collection) 267 268 if e.Commit.RKey != "self" { 269 return fmt.Errorf("ingestProfile only ingests `self` record") 270 } 271 272 switch e.Commit.Operation { 273 case models.CommitOperationCreate, models.CommitOperationUpdate: 274 raw := json.RawMessage(e.Commit.Record) 275 record := tangled.ActorProfile{} 276 err = json.Unmarshal(raw, &record) 277 if err != nil { 278 l.Error("invalid record", "err", err) 279 return err 280 } 281 282 description := "" 283 if record.Description != nil { 284 description = *record.Description 285 } 286 287 includeBluesky := record.Bluesky 288 289 location := "" 290 if record.Location != nil { 291 location = *record.Location 292 } 293 294 var links [5]string 295 for i, l := range record.Links { 296 if i < 5 { 297 links[i] = l 298 } 299 } 300 301 var stats [2]db.VanityStat 302 for i, s := range record.Stats { 303 if i < 2 { 304 stats[i].Kind = db.VanityStatKind(s) 305 } 306 } 307 308 var pinned [6]syntax.ATURI 309 for i, r := range record.PinnedRepositories { 310 if i < 6 { 311 pinned[i] = syntax.ATURI(r) 312 } 313 } 314 315 profile := db.Profile{ 316 Did: did, 317 Description: description, 318 IncludeBluesky: includeBluesky, 319 Location: location, 320 Links: links, 321 Stats: stats, 322 PinnedRepos: pinned, 323 } 324 325 ddb, ok := i.Db.Execer.(*db.DB) 326 if !ok { 327 return fmt.Errorf("failed to index profile record, invalid db cast") 328 } 329 330 tx, err := ddb.Begin() 331 if err != nil { 332 return fmt.Errorf("failed to start transaction") 333 } 334 335 err = db.ValidateProfile(tx, &profile) 336 if err != nil { 337 return fmt.Errorf("invalid profile record") 338 } 339 340 err = db.UpsertProfile(tx, &profile) 341 case models.CommitOperationDelete: 342 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 343 } 344 345 if err != nil { 346 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 347 } 348 349 return nil 350} 351 352func (i *Ingester) ingestSpindleMember(ctx context.Context, e *models.Event) error { 353 did := e.Did 354 var err error 355 356 l := i.Logger.With("handler", "ingestSpindleMember") 357 l = l.With("nsid", e.Commit.Collection) 358 359 switch e.Commit.Operation { 360 case models.CommitOperationCreate: 361 raw := json.RawMessage(e.Commit.Record) 362 record := tangled.SpindleMember{} 363 err = json.Unmarshal(raw, &record) 364 if err != nil { 365 l.Error("invalid record", "err", err) 366 return err 367 } 368 369 // only spindle owner can invite to spindles 370 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 371 if err != nil || !ok { 372 return fmt.Errorf("failed to enforce permissions: %w", err) 373 } 374 375 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 376 if err != nil { 377 return err 378 } 379 380 if memberId.Handle.IsInvalidHandle() { 381 return err 382 } 383 384 ddb, ok := i.Db.Execer.(*db.DB) 385 if !ok { 386 return fmt.Errorf("failed to index profile record, invalid db cast") 387 } 388 389 err = db.AddSpindleMember(ddb, db.SpindleMember{ 390 Did: syntax.DID(did), 391 Rkey: e.Commit.RKey, 392 Instance: record.Instance, 393 Subject: memberId.DID, 394 }) 395 if !ok { 396 return fmt.Errorf("failed to add to db: %w", err) 397 } 398 399 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 400 if err != nil { 401 return fmt.Errorf("failed to update ACLs: %w", err) 402 } 403 404 l.Info("added spindle member") 405 case models.CommitOperationDelete: 406 rkey := e.Commit.RKey 407 408 ddb, ok := i.Db.Execer.(*db.DB) 409 if !ok { 410 return fmt.Errorf("failed to index profile record, invalid db cast") 411 } 412 413 // get record from db first 414 members, err := db.GetSpindleMembers( 415 ddb, 416 db.FilterEq("did", did), 417 db.FilterEq("rkey", rkey), 418 ) 419 if err != nil || len(members) != 1 { 420 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 421 } 422 member := members[0] 423 424 tx, err := ddb.Begin() 425 if err != nil { 426 return fmt.Errorf("failed to start txn: %w", err) 427 } 428 429 // remove record by rkey && update enforcer 430 if err = db.RemoveSpindleMember( 431 tx, 432 db.FilterEq("did", did), 433 db.FilterEq("rkey", rkey), 434 ); err != nil { 435 return fmt.Errorf("failed to remove from db: %w", err) 436 } 437 438 // update enforcer 439 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 440 if err != nil { 441 return fmt.Errorf("failed to update ACLs: %w", err) 442 } 443 444 if err = tx.Commit(); err != nil { 445 return fmt.Errorf("failed to commit txn: %w", err) 446 } 447 448 if err = i.Enforcer.E.SavePolicy(); err != nil { 449 return fmt.Errorf("failed to save ACLs: %w", err) 450 } 451 452 l.Info("removed spindle member") 453 } 454 455 return nil 456} 457 458func (i *Ingester) ingestSpindle(ctx context.Context, e *models.Event) error { 459 did := e.Did 460 var err error 461 462 l := i.Logger.With("handler", "ingestSpindle") 463 l = l.With("nsid", e.Commit.Collection) 464 465 switch e.Commit.Operation { 466 case models.CommitOperationCreate: 467 raw := json.RawMessage(e.Commit.Record) 468 record := tangled.Spindle{} 469 err = json.Unmarshal(raw, &record) 470 if err != nil { 471 l.Error("invalid record", "err", err) 472 return err 473 } 474 475 instance := e.Commit.RKey 476 477 ddb, ok := i.Db.Execer.(*db.DB) 478 if !ok { 479 return fmt.Errorf("failed to index profile record, invalid db cast") 480 } 481 482 err := db.AddSpindle(ddb, db.Spindle{ 483 Owner: syntax.DID(did), 484 Instance: instance, 485 }) 486 if err != nil { 487 l.Error("failed to add spindle to db", "err", err, "instance", instance) 488 return err 489 } 490 491 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 492 if err != nil { 493 l.Error("failed to add spindle to db", "err", err, "instance", instance) 494 return err 495 } 496 497 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 498 if err != nil { 499 return fmt.Errorf("failed to mark verified: %w", err) 500 } 501 502 return nil 503 504 case models.CommitOperationDelete: 505 instance := e.Commit.RKey 506 507 ddb, ok := i.Db.Execer.(*db.DB) 508 if !ok { 509 return fmt.Errorf("failed to index profile record, invalid db cast") 510 } 511 512 // get record from db first 513 spindles, err := db.GetSpindles( 514 ddb, 515 db.FilterEq("owner", did), 516 db.FilterEq("instance", instance), 517 ) 518 if err != nil || len(spindles) != 1 { 519 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 520 } 521 spindle := spindles[0] 522 523 tx, err := ddb.Begin() 524 if err != nil { 525 return err 526 } 527 defer func() { 528 tx.Rollback() 529 i.Enforcer.E.LoadPolicy() 530 }() 531 532 // remove spindle members first 533 err = db.RemoveSpindleMember( 534 tx, 535 db.FilterEq("owner", did), 536 db.FilterEq("instance", instance), 537 ) 538 if err != nil { 539 return err 540 } 541 542 err = db.DeleteSpindle( 543 tx, 544 db.FilterEq("owner", did), 545 db.FilterEq("instance", instance), 546 ) 547 if err != nil { 548 return err 549 } 550 551 if spindle.Verified != nil { 552 err = i.Enforcer.RemoveSpindle(instance) 553 if err != nil { 554 return err 555 } 556 } 557 558 err = tx.Commit() 559 if err != nil { 560 return err 561 } 562 563 err = i.Enforcer.E.SavePolicy() 564 if err != nil { 565 return err 566 } 567 } 568 569 return nil 570} 571 572func (i *Ingester) ingestString(e *models.Event) error { 573 did := e.Did 574 rkey := e.Commit.RKey 575 576 var err error 577 578 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 579 l.Info("ingesting record") 580 581 ddb, ok := i.Db.Execer.(*db.DB) 582 if !ok { 583 return fmt.Errorf("failed to index string record, invalid db cast") 584 } 585 586 switch e.Commit.Operation { 587 case models.CommitOperationCreate, models.CommitOperationUpdate: 588 raw := json.RawMessage(e.Commit.Record) 589 record := tangled.String{} 590 err = json.Unmarshal(raw, &record) 591 if err != nil { 592 l.Error("invalid record", "err", err) 593 return err 594 } 595 596 string := db.StringFromRecord(did, rkey, record) 597 598 if err = string.Validate(); err != nil { 599 l.Error("invalid record", "err", err) 600 return err 601 } 602 603 if err = db.AddString(ddb, string); err != nil { 604 l.Error("failed to add string", "err", err) 605 return err 606 } 607 608 return nil 609 610 case models.CommitOperationDelete: 611 if err := db.DeleteString( 612 ddb, 613 db.FilterEq("did", did), 614 db.FilterEq("rkey", rkey), 615 ); err != nil { 616 l.Error("failed to delete", "err", err) 617 return fmt.Errorf("failed to delete string record: %w", err) 618 } 619 620 return nil 621 } 622 623 return nil 624} 625 626func (i *Ingester) ingestKnotMember(e *models.Event) error { 627 did := e.Did 628 var err error 629 630 l := i.Logger.With("handler", "ingestKnotMember") 631 l = l.With("nsid", e.Commit.Collection) 632 633 switch e.Commit.Operation { 634 case models.CommitOperationCreate: 635 raw := json.RawMessage(e.Commit.Record) 636 record := tangled.KnotMember{} 637 err = json.Unmarshal(raw, &record) 638 if err != nil { 639 l.Error("invalid record", "err", err) 640 return err 641 } 642 643 // only knot owner can invite to knots 644 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 645 if err != nil || !ok { 646 return fmt.Errorf("failed to enforce permissions: %w", err) 647 } 648 649 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 650 if err != nil { 651 return err 652 } 653 654 if memberId.Handle.IsInvalidHandle() { 655 return err 656 } 657 658 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 659 if err != nil { 660 return fmt.Errorf("failed to update ACLs: %w", err) 661 } 662 663 l.Info("added knot member") 664 case models.CommitOperationDelete: 665 // we don't store knot members in a table (like we do for spindle) 666 // and we can't remove this just yet. possibly fixed if we switch 667 // to either: 668 // 1. a knot_members table like with spindle and store the rkey 669 // 2. use the knot host as the rkey 670 // 671 // TODO: implement member deletion 672 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 673 } 674 675 return nil 676} 677 678func (i *Ingester) ingestKnot(e *models.Event) error { 679 did := e.Did 680 var err error 681 682 l := i.Logger.With("handler", "ingestKnot") 683 l = l.With("nsid", e.Commit.Collection) 684 685 switch e.Commit.Operation { 686 case models.CommitOperationCreate: 687 raw := json.RawMessage(e.Commit.Record) 688 record := tangled.Knot{} 689 err = json.Unmarshal(raw, &record) 690 if err != nil { 691 l.Error("invalid record", "err", err) 692 return err 693 } 694 695 domain := e.Commit.RKey 696 697 ddb, ok := i.Db.Execer.(*db.DB) 698 if !ok { 699 return fmt.Errorf("failed to index profile record, invalid db cast") 700 } 701 702 err := db.AddKnot(ddb, domain, did) 703 if err != nil { 704 l.Error("failed to add knot to db", "err", err, "domain", domain) 705 return err 706 } 707 708 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 709 if err != nil { 710 l.Error("failed to verify knot", "err", err, "domain", domain) 711 return err 712 } 713 714 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 715 if err != nil { 716 return fmt.Errorf("failed to mark verified: %w", err) 717 } 718 719 return nil 720 721 case models.CommitOperationDelete: 722 domain := e.Commit.RKey 723 724 ddb, ok := i.Db.Execer.(*db.DB) 725 if !ok { 726 return fmt.Errorf("failed to index knot record, invalid db cast") 727 } 728 729 // get record from db first 730 registrations, err := db.GetRegistrations( 731 ddb, 732 db.FilterEq("domain", domain), 733 db.FilterEq("did", did), 734 ) 735 if err != nil { 736 return fmt.Errorf("failed to get registration: %w", err) 737 } 738 if len(registrations) != 1 { 739 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 740 } 741 registration := registrations[0] 742 743 tx, err := ddb.Begin() 744 if err != nil { 745 return err 746 } 747 defer func() { 748 tx.Rollback() 749 i.Enforcer.E.LoadPolicy() 750 }() 751 752 err = db.DeleteKnot( 753 tx, 754 db.FilterEq("did", did), 755 db.FilterEq("domain", domain), 756 ) 757 if err != nil { 758 return err 759 } 760 761 if registration.Registered != nil { 762 err = i.Enforcer.RemoveKnot(domain) 763 if err != nil { 764 return err 765 } 766 } 767 768 err = tx.Commit() 769 if err != nil { 770 return err 771 } 772 773 err = i.Enforcer.E.SavePolicy() 774 if err != nil { 775 return err 776 } 777 } 778 779 return nil 780} 781func (i *Ingester) ingestIssue(ctx context.Context, e *models.Event) error { 782 did := e.Did 783 rkey := e.Commit.RKey 784 785 var err error 786 787 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 788 l.Info("ingesting record") 789 790 ddb, ok := i.Db.Execer.(*db.DB) 791 if !ok { 792 return fmt.Errorf("failed to index issue record, invalid db cast") 793 } 794 795 switch e.Commit.Operation { 796 case models.CommitOperationCreate, models.CommitOperationUpdate: 797 raw := json.RawMessage(e.Commit.Record) 798 record := tangled.RepoIssue{} 799 err = json.Unmarshal(raw, &record) 800 if err != nil { 801 l.Error("invalid record", "err", err) 802 return err 803 } 804 805 issue := db.IssueFromRecord(did, rkey, record) 806 807 if err := i.Validator.ValidateIssue(&issue); err != nil { 808 return fmt.Errorf("failed to validate issue: %w", err) 809 } 810 811 tx, err := ddb.BeginTx(ctx, nil) 812 if err != nil { 813 l.Error("failed to begin transaction", "err", err) 814 return err 815 } 816 defer tx.Rollback() 817 818 err = db.PutIssue(tx, &issue) 819 if err != nil { 820 l.Error("failed to create issue", "err", err) 821 return err 822 } 823 824 err = tx.Commit() 825 if err != nil { 826 l.Error("failed to commit txn", "err", err) 827 return err 828 } 829 830 return nil 831 832 case models.CommitOperationDelete: 833 if err := db.DeleteIssues( 834 ddb, 835 db.FilterEq("did", did), 836 db.FilterEq("rkey", rkey), 837 ); err != nil { 838 l.Error("failed to delete", "err", err) 839 return fmt.Errorf("failed to delete issue record: %w", err) 840 } 841 842 return nil 843 } 844 845 return nil 846} 847 848func (i *Ingester) ingestIssueComment(e *models.Event) error { 849 did := e.Did 850 rkey := e.Commit.RKey 851 852 var err error 853 854 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 855 l.Info("ingesting record") 856 857 ddb, ok := i.Db.Execer.(*db.DB) 858 if !ok { 859 return fmt.Errorf("failed to index issue comment record, invalid db cast") 860 } 861 862 switch e.Commit.Operation { 863 case models.CommitOperationCreate, models.CommitOperationUpdate: 864 raw := json.RawMessage(e.Commit.Record) 865 record := tangled.RepoIssueComment{} 866 err = json.Unmarshal(raw, &record) 867 if err != nil { 868 return fmt.Errorf("invalid record: %w", err) 869 } 870 871 comment, err := db.IssueCommentFromRecord(did, rkey, record) 872 if err != nil { 873 return fmt.Errorf("failed to parse comment from record: %w", err) 874 } 875 876 if err := i.Validator.ValidateIssueComment(comment); err != nil { 877 return fmt.Errorf("failed to validate comment: %w", err) 878 } 879 880 _, err = db.AddIssueComment(ddb, *comment) 881 if err != nil { 882 return fmt.Errorf("failed to create issue comment: %w", err) 883 } 884 885 return nil 886 887 case models.CommitOperationDelete: 888 if err := db.DeleteIssueComments( 889 ddb, 890 db.FilterEq("did", did), 891 db.FilterEq("rkey", rkey), 892 ); err != nil { 893 return fmt.Errorf("failed to delete issue comment record: %w", err) 894 } 895 896 return nil 897 } 898 899 return nil 900} 901 902func (i *Ingester) ingestLabelDefinition(e *models.Event) error { 903 did := e.Did 904 rkey := e.Commit.RKey 905 906 var err error 907 908 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 909 l.Info("ingesting record") 910 911 ddb, ok := i.Db.Execer.(*db.DB) 912 if !ok { 913 return fmt.Errorf("failed to index label definition, invalid db cast") 914 } 915 916 switch e.Commit.Operation { 917 case models.CommitOperationCreate, models.CommitOperationUpdate: 918 raw := json.RawMessage(e.Commit.Record) 919 record := tangled.LabelDefinition{} 920 err = json.Unmarshal(raw, &record) 921 if err != nil { 922 return fmt.Errorf("invalid record: %w", err) 923 } 924 925 def, err := db.LabelDefinitionFromRecord(did, rkey, record) 926 if err != nil { 927 return fmt.Errorf("failed to parse labeldef from record: %w", err) 928 } 929 930 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 931 return fmt.Errorf("failed to validate labeldef: %w", err) 932 } 933 934 _, err = db.AddLabelDefinition(ddb, def) 935 if err != nil { 936 return fmt.Errorf("failed to create labeldef: %w", err) 937 } 938 939 return nil 940 941 case models.CommitOperationDelete: 942 if err := db.DeleteLabelDefinition( 943 ddb, 944 db.FilterEq("did", did), 945 db.FilterEq("rkey", rkey), 946 ); err != nil { 947 return fmt.Errorf("failed to delete labeldef record: %w", err) 948 } 949 950 return nil 951 } 952 953 return nil 954}