forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 jmodels "github.com/bluesky-social/jetstream/pkg/models" 13 "github.com/go-git/go-git/v5/plumbing" 14 "github.com/ipfs/go-cid" 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/serververify" 20 "tangled.org/core/appview/validator" 21 "tangled.org/core/idresolver" 22 "tangled.org/core/rbac" 23) 24 25type Ingester struct { 26 Db db.DbWrapper 27 Enforcer *rbac.Enforcer 28 IdResolver *idresolver.Resolver 29 Config *config.Config 30 Logger *slog.Logger 31 Validator *validator.Validator 32} 33 34type processFunc func(ctx context.Context, e *jmodels.Event) error 35 36func (i *Ingester) Ingest() processFunc { 37 return func(ctx context.Context, e *jmodels.Event) error { 38 var err error 39 defer func() { 40 eventTime := e.TimeUS 41 lastTimeUs := eventTime + 1 42 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 43 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 44 } 45 }() 46 47 l := i.Logger.With("kind", e.Kind) 48 switch e.Kind { 49 case jmodels.EventKindAccount: 50 if !e.Account.Active && *e.Account.Status == "deactivated" { 51 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 52 } 53 case jmodels.EventKindIdentity: 54 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 55 case jmodels.EventKindCommit: 56 switch e.Commit.Collection { 57 case tangled.GraphFollowNSID: 58 err = i.ingestFollow(e) 59 case tangled.FeedStarNSID: 60 err = i.ingestStar(e) 61 case tangled.PublicKeyNSID: 62 err = i.ingestPublicKey(e) 63 case tangled.RepoArtifactNSID: 64 err = i.ingestArtifact(e) 65 case tangled.ActorProfileNSID: 66 err = i.ingestProfile(e) 67 case tangled.SpindleMemberNSID: 68 err = i.ingestSpindleMember(ctx, e) 69 case tangled.SpindleNSID: 70 err = i.ingestSpindle(ctx, e) 71 case tangled.KnotMemberNSID: 72 err = i.ingestKnotMember(e) 73 case tangled.KnotNSID: 74 err = i.ingestKnot(e) 75 case tangled.StringNSID: 76 err = i.ingestString(e) 77 case tangled.RepoIssueNSID: 78 err = i.ingestIssue(ctx, e) 79 case tangled.RepoIssueCommentNSID: 80 err = i.ingestIssueComment(e) 81 case tangled.LabelDefinitionNSID: 82 err = i.ingestLabelDefinition(e) 83 } 84 l = i.Logger.With("nsid", e.Commit.Collection) 85 } 86 87 if err != nil { 88 l.Debug("error ingesting record", "err", err) 89 } 90 91 return nil 92 } 93} 94 95func (i *Ingester) ingestStar(e *jmodels.Event) error { 96 var err error 97 did := e.Did 98 99 l := i.Logger.With("handler", "ingestStar") 100 l = l.With("nsid", e.Commit.Collection) 101 102 switch e.Commit.Operation { 103 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 104 var subjectUri syntax.ATURI 105 106 raw := json.RawMessage(e.Commit.Record) 107 record := tangled.FeedStar{} 108 err := json.Unmarshal(raw, &record) 109 if err != nil { 110 l.Error("invalid record", "err", err) 111 return err 112 } 113 114 subjectUri, err = syntax.ParseATURI(record.Subject) 115 if err != nil { 116 l.Error("invalid record", "err", err) 117 return err 118 } 119 err = db.AddStar(i.Db, &models.Star{ 120 StarredByDid: did, 121 RepoAt: subjectUri, 122 Rkey: e.Commit.RKey, 123 }) 124 case jmodels.CommitOperationDelete: 125 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 126 } 127 128 if err != nil { 129 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 130 } 131 132 return nil 133} 134 135func (i *Ingester) ingestFollow(e *jmodels.Event) error { 136 var err error 137 did := e.Did 138 139 l := i.Logger.With("handler", "ingestFollow") 140 l = l.With("nsid", e.Commit.Collection) 141 142 switch e.Commit.Operation { 143 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 144 raw := json.RawMessage(e.Commit.Record) 145 record := tangled.GraphFollow{} 146 err = json.Unmarshal(raw, &record) 147 if err != nil { 148 l.Error("invalid record", "err", err) 149 return err 150 } 151 152 err = db.AddFollow(i.Db, &models.Follow{ 153 UserDid: did, 154 SubjectDid: record.Subject, 155 Rkey: e.Commit.RKey, 156 }) 157 case jmodels.CommitOperationDelete: 158 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 159 } 160 161 if err != nil { 162 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 163 } 164 165 return nil 166} 167 168func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 169 did := e.Did 170 var err error 171 172 l := i.Logger.With("handler", "ingestPublicKey") 173 l = l.With("nsid", e.Commit.Collection) 174 175 switch e.Commit.Operation { 176 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 177 l.Debug("processing add of pubkey") 178 raw := json.RawMessage(e.Commit.Record) 179 record := tangled.PublicKey{} 180 err = json.Unmarshal(raw, &record) 181 if err != nil { 182 l.Error("invalid record", "err", err) 183 return err 184 } 185 186 name := record.Name 187 key := record.Key 188 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 189 case jmodels.CommitOperationDelete: 190 l.Debug("processing delete of pubkey") 191 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 192 } 193 194 if err != nil { 195 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 196 } 197 198 return nil 199} 200 201func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 202 did := e.Did 203 var err error 204 205 l := i.Logger.With("handler", "ingestArtifact") 206 l = l.With("nsid", e.Commit.Collection) 207 208 switch e.Commit.Operation { 209 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 210 raw := json.RawMessage(e.Commit.Record) 211 record := tangled.RepoArtifact{} 212 err = json.Unmarshal(raw, &record) 213 if err != nil { 214 l.Error("invalid record", "err", err) 215 return err 216 } 217 218 repoAt, err := syntax.ParseATURI(record.Repo) 219 if err != nil { 220 return err 221 } 222 223 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 224 if err != nil { 225 return err 226 } 227 228 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 229 if err != nil || !ok { 230 return err 231 } 232 233 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 234 if err != nil { 235 createdAt = time.Now() 236 } 237 238 artifact := models.Artifact{ 239 Did: did, 240 Rkey: e.Commit.RKey, 241 RepoAt: repoAt, 242 Tag: plumbing.Hash(record.Tag), 243 CreatedAt: createdAt, 244 BlobCid: cid.Cid(record.Artifact.Ref), 245 Name: record.Name, 246 Size: uint64(record.Artifact.Size), 247 MimeType: record.Artifact.MimeType, 248 } 249 250 err = db.AddArtifact(i.Db, artifact) 251 case jmodels.CommitOperationDelete: 252 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 253 } 254 255 if err != nil { 256 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 257 } 258 259 return nil 260} 261 262func (i *Ingester) ingestProfile(e *jmodels.Event) error { 263 did := e.Did 264 var err error 265 266 l := i.Logger.With("handler", "ingestProfile") 267 l = l.With("nsid", e.Commit.Collection) 268 269 if e.Commit.RKey != "self" { 270 return fmt.Errorf("ingestProfile only ingests `self` record") 271 } 272 273 switch e.Commit.Operation { 274 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 275 raw := json.RawMessage(e.Commit.Record) 276 record := tangled.ActorProfile{} 277 err = json.Unmarshal(raw, &record) 278 if err != nil { 279 l.Error("invalid record", "err", err) 280 return err 281 } 282 283 description := "" 284 if record.Description != nil { 285 description = *record.Description 286 } 287 288 includeBluesky := record.Bluesky 289 290 location := "" 291 if record.Location != nil { 292 location = *record.Location 293 } 294 295 var links [5]string 296 for i, l := range record.Links { 297 if i < 5 { 298 links[i] = l 299 } 300 } 301 302 var stats [2]models.VanityStat 303 for i, s := range record.Stats { 304 if i < 2 { 305 stats[i].Kind = models.VanityStatKind(s) 306 } 307 } 308 309 var pinned [6]syntax.ATURI 310 for i, r := range record.PinnedRepositories { 311 if i < 6 { 312 pinned[i] = syntax.ATURI(r) 313 } 314 } 315 316 profile := models.Profile{ 317 Did: did, 318 Description: description, 319 IncludeBluesky: includeBluesky, 320 Location: location, 321 Links: links, 322 Stats: stats, 323 PinnedRepos: pinned, 324 } 325 326 ddb, ok := i.Db.Execer.(*db.DB) 327 if !ok { 328 return fmt.Errorf("failed to index profile record, invalid db cast") 329 } 330 331 tx, err := ddb.Begin() 332 if err != nil { 333 return fmt.Errorf("failed to start transaction") 334 } 335 336 err = db.ValidateProfile(tx, &profile) 337 if err != nil { 338 return fmt.Errorf("invalid profile record") 339 } 340 341 err = db.UpsertProfile(tx, &profile) 342 case jmodels.CommitOperationDelete: 343 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 344 } 345 346 if err != nil { 347 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 348 } 349 350 return nil 351} 352 353func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 354 did := e.Did 355 var err error 356 357 l := i.Logger.With("handler", "ingestSpindleMember") 358 l = l.With("nsid", e.Commit.Collection) 359 360 switch e.Commit.Operation { 361 case jmodels.CommitOperationCreate: 362 raw := json.RawMessage(e.Commit.Record) 363 record := tangled.SpindleMember{} 364 err = json.Unmarshal(raw, &record) 365 if err != nil { 366 l.Error("invalid record", "err", err) 367 return err 368 } 369 370 // only spindle owner can invite to spindles 371 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 372 if err != nil || !ok { 373 return fmt.Errorf("failed to enforce permissions: %w", err) 374 } 375 376 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 377 if err != nil { 378 return err 379 } 380 381 if memberId.Handle.IsInvalidHandle() { 382 return err 383 } 384 385 ddb, ok := i.Db.Execer.(*db.DB) 386 if !ok { 387 return fmt.Errorf("failed to index profile record, invalid db cast") 388 } 389 390 err = db.AddSpindleMember(ddb, models.SpindleMember{ 391 Did: syntax.DID(did), 392 Rkey: e.Commit.RKey, 393 Instance: record.Instance, 394 Subject: memberId.DID, 395 }) 396 if !ok { 397 return fmt.Errorf("failed to add to db: %w", err) 398 } 399 400 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 401 if err != nil { 402 return fmt.Errorf("failed to update ACLs: %w", err) 403 } 404 405 l.Info("added spindle member") 406 case jmodels.CommitOperationDelete: 407 rkey := e.Commit.RKey 408 409 ddb, ok := i.Db.Execer.(*db.DB) 410 if !ok { 411 return fmt.Errorf("failed to index profile record, invalid db cast") 412 } 413 414 // get record from db first 415 members, err := db.GetSpindleMembers( 416 ddb, 417 db.FilterEq("did", did), 418 db.FilterEq("rkey", rkey), 419 ) 420 if err != nil || len(members) != 1 { 421 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 422 } 423 member := members[0] 424 425 tx, err := ddb.Begin() 426 if err != nil { 427 return fmt.Errorf("failed to start txn: %w", err) 428 } 429 430 // remove record by rkey && update enforcer 431 if err = db.RemoveSpindleMember( 432 tx, 433 db.FilterEq("did", did), 434 db.FilterEq("rkey", rkey), 435 ); err != nil { 436 return fmt.Errorf("failed to remove from db: %w", err) 437 } 438 439 // update enforcer 440 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 441 if err != nil { 442 return fmt.Errorf("failed to update ACLs: %w", err) 443 } 444 445 if err = tx.Commit(); err != nil { 446 return fmt.Errorf("failed to commit txn: %w", err) 447 } 448 449 if err = i.Enforcer.E.SavePolicy(); err != nil { 450 return fmt.Errorf("failed to save ACLs: %w", err) 451 } 452 453 l.Info("removed spindle member") 454 } 455 456 return nil 457} 458 459func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 460 did := e.Did 461 var err error 462 463 l := i.Logger.With("handler", "ingestSpindle") 464 l = l.With("nsid", e.Commit.Collection) 465 466 switch e.Commit.Operation { 467 case jmodels.CommitOperationCreate: 468 raw := json.RawMessage(e.Commit.Record) 469 record := tangled.Spindle{} 470 err = json.Unmarshal(raw, &record) 471 if err != nil { 472 l.Error("invalid record", "err", err) 473 return err 474 } 475 476 instance := e.Commit.RKey 477 478 ddb, ok := i.Db.Execer.(*db.DB) 479 if !ok { 480 return fmt.Errorf("failed to index profile record, invalid db cast") 481 } 482 483 err := db.AddSpindle(ddb, models.Spindle{ 484 Owner: syntax.DID(did), 485 Instance: instance, 486 }) 487 if err != nil { 488 l.Error("failed to add spindle to db", "err", err, "instance", instance) 489 return err 490 } 491 492 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 493 if err != nil { 494 l.Error("failed to add spindle to db", "err", err, "instance", instance) 495 return err 496 } 497 498 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 499 if err != nil { 500 return fmt.Errorf("failed to mark verified: %w", err) 501 } 502 503 return nil 504 505 case jmodels.CommitOperationDelete: 506 instance := e.Commit.RKey 507 508 ddb, ok := i.Db.Execer.(*db.DB) 509 if !ok { 510 return fmt.Errorf("failed to index profile record, invalid db cast") 511 } 512 513 // get record from db first 514 spindles, err := db.GetSpindles( 515 ddb, 516 db.FilterEq("owner", did), 517 db.FilterEq("instance", instance), 518 ) 519 if err != nil || len(spindles) != 1 { 520 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 521 } 522 spindle := spindles[0] 523 524 tx, err := ddb.Begin() 525 if err != nil { 526 return err 527 } 528 defer func() { 529 tx.Rollback() 530 i.Enforcer.E.LoadPolicy() 531 }() 532 533 // remove spindle members first 534 err = db.RemoveSpindleMember( 535 tx, 536 db.FilterEq("owner", did), 537 db.FilterEq("instance", instance), 538 ) 539 if err != nil { 540 return err 541 } 542 543 err = db.DeleteSpindle( 544 tx, 545 db.FilterEq("owner", did), 546 db.FilterEq("instance", instance), 547 ) 548 if err != nil { 549 return err 550 } 551 552 if spindle.Verified != nil { 553 err = i.Enforcer.RemoveSpindle(instance) 554 if err != nil { 555 return err 556 } 557 } 558 559 err = tx.Commit() 560 if err != nil { 561 return err 562 } 563 564 err = i.Enforcer.E.SavePolicy() 565 if err != nil { 566 return err 567 } 568 } 569 570 return nil 571} 572 573func (i *Ingester) ingestString(e *jmodels.Event) error { 574 did := e.Did 575 rkey := e.Commit.RKey 576 577 var err error 578 579 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 580 l.Info("ingesting record") 581 582 ddb, ok := i.Db.Execer.(*db.DB) 583 if !ok { 584 return fmt.Errorf("failed to index string record, invalid db cast") 585 } 586 587 switch e.Commit.Operation { 588 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 589 raw := json.RawMessage(e.Commit.Record) 590 record := tangled.String{} 591 err = json.Unmarshal(raw, &record) 592 if err != nil { 593 l.Error("invalid record", "err", err) 594 return err 595 } 596 597 string := models.StringFromRecord(did, rkey, record) 598 599 if err = i.Validator.ValidateString(&string); err != nil { 600 l.Error("invalid record", "err", err) 601 return err 602 } 603 604 if err = db.AddString(ddb, string); err != nil { 605 l.Error("failed to add string", "err", err) 606 return err 607 } 608 609 return nil 610 611 case jmodels.CommitOperationDelete: 612 if err := db.DeleteString( 613 ddb, 614 db.FilterEq("did", did), 615 db.FilterEq("rkey", rkey), 616 ); err != nil { 617 l.Error("failed to delete", "err", err) 618 return fmt.Errorf("failed to delete string record: %w", err) 619 } 620 621 return nil 622 } 623 624 return nil 625} 626 627func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 628 did := e.Did 629 var err error 630 631 l := i.Logger.With("handler", "ingestKnotMember") 632 l = l.With("nsid", e.Commit.Collection) 633 634 switch e.Commit.Operation { 635 case jmodels.CommitOperationCreate: 636 raw := json.RawMessage(e.Commit.Record) 637 record := tangled.KnotMember{} 638 err = json.Unmarshal(raw, &record) 639 if err != nil { 640 l.Error("invalid record", "err", err) 641 return err 642 } 643 644 // only knot owner can invite to knots 645 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 646 if err != nil || !ok { 647 return fmt.Errorf("failed to enforce permissions: %w", err) 648 } 649 650 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 651 if err != nil { 652 return err 653 } 654 655 if memberId.Handle.IsInvalidHandle() { 656 return err 657 } 658 659 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 660 if err != nil { 661 return fmt.Errorf("failed to update ACLs: %w", err) 662 } 663 664 l.Info("added knot member") 665 case jmodels.CommitOperationDelete: 666 // we don't store knot members in a table (like we do for spindle) 667 // and we can't remove this just yet. possibly fixed if we switch 668 // to either: 669 // 1. a knot_members table like with spindle and store the rkey 670 // 2. use the knot host as the rkey 671 // 672 // TODO: implement member deletion 673 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 674 } 675 676 return nil 677} 678 679func (i *Ingester) ingestKnot(e *jmodels.Event) error { 680 did := e.Did 681 var err error 682 683 l := i.Logger.With("handler", "ingestKnot") 684 l = l.With("nsid", e.Commit.Collection) 685 686 switch e.Commit.Operation { 687 case jmodels.CommitOperationCreate: 688 raw := json.RawMessage(e.Commit.Record) 689 record := tangled.Knot{} 690 err = json.Unmarshal(raw, &record) 691 if err != nil { 692 l.Error("invalid record", "err", err) 693 return err 694 } 695 696 domain := e.Commit.RKey 697 698 ddb, ok := i.Db.Execer.(*db.DB) 699 if !ok { 700 return fmt.Errorf("failed to index profile record, invalid db cast") 701 } 702 703 err := db.AddKnot(ddb, domain, did) 704 if err != nil { 705 l.Error("failed to add knot to db", "err", err, "domain", domain) 706 return err 707 } 708 709 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 710 if err != nil { 711 l.Error("failed to verify knot", "err", err, "domain", domain) 712 return err 713 } 714 715 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 716 if err != nil { 717 return fmt.Errorf("failed to mark verified: %w", err) 718 } 719 720 return nil 721 722 case jmodels.CommitOperationDelete: 723 domain := e.Commit.RKey 724 725 ddb, ok := i.Db.Execer.(*db.DB) 726 if !ok { 727 return fmt.Errorf("failed to index knot record, invalid db cast") 728 } 729 730 // get record from db first 731 registrations, err := db.GetRegistrations( 732 ddb, 733 db.FilterEq("domain", domain), 734 db.FilterEq("did", did), 735 ) 736 if err != nil { 737 return fmt.Errorf("failed to get registration: %w", err) 738 } 739 if len(registrations) != 1 { 740 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 741 } 742 registration := registrations[0] 743 744 tx, err := ddb.Begin() 745 if err != nil { 746 return err 747 } 748 defer func() { 749 tx.Rollback() 750 i.Enforcer.E.LoadPolicy() 751 }() 752 753 err = db.DeleteKnot( 754 tx, 755 db.FilterEq("did", did), 756 db.FilterEq("domain", domain), 757 ) 758 if err != nil { 759 return err 760 } 761 762 if registration.Registered != nil { 763 err = i.Enforcer.RemoveKnot(domain) 764 if err != nil { 765 return err 766 } 767 } 768 769 err = tx.Commit() 770 if err != nil { 771 return err 772 } 773 774 err = i.Enforcer.E.SavePolicy() 775 if err != nil { 776 return err 777 } 778 } 779 780 return nil 781} 782func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 783 did := e.Did 784 rkey := e.Commit.RKey 785 786 var err error 787 788 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 789 l.Info("ingesting record") 790 791 ddb, ok := i.Db.Execer.(*db.DB) 792 if !ok { 793 return fmt.Errorf("failed to index issue record, invalid db cast") 794 } 795 796 switch e.Commit.Operation { 797 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 798 raw := json.RawMessage(e.Commit.Record) 799 record := tangled.RepoIssue{} 800 err = json.Unmarshal(raw, &record) 801 if err != nil { 802 l.Error("invalid record", "err", err) 803 return err 804 } 805 806 issue := models.IssueFromRecord(did, rkey, record) 807 808 if err := i.Validator.ValidateIssue(&issue); err != nil { 809 return fmt.Errorf("failed to validate issue: %w", err) 810 } 811 812 tx, err := ddb.BeginTx(ctx, nil) 813 if err != nil { 814 l.Error("failed to begin transaction", "err", err) 815 return err 816 } 817 defer tx.Rollback() 818 819 err = db.PutIssue(tx, &issue) 820 if err != nil { 821 l.Error("failed to create issue", "err", err) 822 return err 823 } 824 825 err = tx.Commit() 826 if err != nil { 827 l.Error("failed to commit txn", "err", err) 828 return err 829 } 830 831 return nil 832 833 case jmodels.CommitOperationDelete: 834 if err := db.DeleteIssues( 835 ddb, 836 db.FilterEq("did", did), 837 db.FilterEq("rkey", rkey), 838 ); err != nil { 839 l.Error("failed to delete", "err", err) 840 return fmt.Errorf("failed to delete issue record: %w", err) 841 } 842 843 return nil 844 } 845 846 return nil 847} 848 849func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 850 did := e.Did 851 rkey := e.Commit.RKey 852 853 var err error 854 855 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 856 l.Info("ingesting record") 857 858 ddb, ok := i.Db.Execer.(*db.DB) 859 if !ok { 860 return fmt.Errorf("failed to index issue comment record, invalid db cast") 861 } 862 863 switch e.Commit.Operation { 864 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 865 raw := json.RawMessage(e.Commit.Record) 866 record := tangled.RepoIssueComment{} 867 err = json.Unmarshal(raw, &record) 868 if err != nil { 869 return fmt.Errorf("invalid record: %w", err) 870 } 871 872 comment, err := models.IssueCommentFromRecord(did, rkey, record) 873 if err != nil { 874 return fmt.Errorf("failed to parse comment from record: %w", err) 875 } 876 877 if err := i.Validator.ValidateIssueComment(comment); err != nil { 878 return fmt.Errorf("failed to validate comment: %w", err) 879 } 880 881 _, err = db.AddIssueComment(ddb, *comment) 882 if err != nil { 883 return fmt.Errorf("failed to create issue comment: %w", err) 884 } 885 886 return nil 887 888 case jmodels.CommitOperationDelete: 889 if err := db.DeleteIssueComments( 890 ddb, 891 db.FilterEq("did", did), 892 db.FilterEq("rkey", rkey), 893 ); err != nil { 894 return fmt.Errorf("failed to delete issue comment record: %w", err) 895 } 896 897 return nil 898 } 899 900 return nil 901} 902 903func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 904 did := e.Did 905 rkey := e.Commit.RKey 906 907 var err error 908 909 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 910 l.Info("ingesting record") 911 912 ddb, ok := i.Db.Execer.(*db.DB) 913 if !ok { 914 return fmt.Errorf("failed to index label definition, invalid db cast") 915 } 916 917 switch e.Commit.Operation { 918 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 919 raw := json.RawMessage(e.Commit.Record) 920 record := tangled.LabelDefinition{} 921 err = json.Unmarshal(raw, &record) 922 if err != nil { 923 return fmt.Errorf("invalid record: %w", err) 924 } 925 926 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 927 if err != nil { 928 return fmt.Errorf("failed to parse labeldef from record: %w", err) 929 } 930 931 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 932 return fmt.Errorf("failed to validate labeldef: %w", err) 933 } 934 935 _, err = db.AddLabelDefinition(ddb, def) 936 if err != nil { 937 return fmt.Errorf("failed to create labeldef: %w", err) 938 } 939 940 return nil 941 942 case jmodels.CommitOperationDelete: 943 if err := db.DeleteLabelDefinition( 944 ddb, 945 db.FilterEq("did", did), 946 db.FilterEq("rkey", rkey), 947 ); err != nil { 948 return fmt.Errorf("failed to delete labeldef record: %w", err) 949 } 950 951 return nil 952 } 953 954 return nil 955}