forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "maps" 9 "slices" 10 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 jmodels "github.com/bluesky-social/jetstream/pkg/models" 15 "github.com/go-git/go-git/v5/plumbing" 16 "github.com/ipfs/go-cid" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/serververify" 22 "tangled.org/core/appview/validator" 23 "tangled.org/core/idresolver" 24 "tangled.org/core/rbac" 25) 26 27type Ingester struct { 28 Db db.DbWrapper 29 Enforcer *rbac.Enforcer 30 IdResolver *idresolver.Resolver 31 Config *config.Config 32 Logger *slog.Logger 33 Validator *validator.Validator 34} 35 36type processFunc func(ctx context.Context, e *jmodels.Event) error 37 38func (i *Ingester) Ingest() processFunc { 39 return func(ctx context.Context, e *jmodels.Event) error { 40 var err error 41 defer func() { 42 eventTime := e.TimeUS 43 lastTimeUs := eventTime + 1 44 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 45 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 46 } 47 }() 48 49 l := i.Logger.With("kind", e.Kind) 50 switch e.Kind { 51 case jmodels.EventKindAccount: 52 if !e.Account.Active && *e.Account.Status == "deactivated" { 53 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 54 } 55 case jmodels.EventKindIdentity: 56 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 57 case jmodels.EventKindCommit: 58 switch e.Commit.Collection { 59 case tangled.GraphFollowNSID: 60 err = i.ingestFollow(e) 61 case tangled.FeedStarNSID: 62 err = i.ingestStar(e) 63 case tangled.PublicKeyNSID: 64 err = i.ingestPublicKey(e) 65 case tangled.RepoArtifactNSID: 66 err = i.ingestArtifact(e) 67 case tangled.ActorProfileNSID: 68 err = i.ingestProfile(e) 69 case tangled.SpindleMemberNSID: 70 err = i.ingestSpindleMember(ctx, e) 71 case tangled.SpindleNSID: 72 err = i.ingestSpindle(ctx, e) 73 case tangled.KnotMemberNSID: 74 err = i.ingestKnotMember(e) 75 case tangled.KnotNSID: 76 err = i.ingestKnot(e) 77 case tangled.StringNSID: 78 err = i.ingestString(e) 79 case tangled.RepoIssueNSID: 80 err = i.ingestIssue(ctx, e) 81 case tangled.RepoIssueCommentNSID: 82 err = i.ingestIssueComment(e) 83 case tangled.LabelDefinitionNSID: 84 err = i.ingestLabelDefinition(e) 85 case tangled.LabelOpNSID: 86 err = i.ingestLabelOp(e) 87 } 88 l = i.Logger.With("nsid", e.Commit.Collection) 89 } 90 91 if err != nil { 92 l.Warn("refused to ingest record", "err", err) 93 } 94 95 return nil 96 } 97} 98 99func (i *Ingester) ingestStar(e *jmodels.Event) error { 100 var err error 101 did := e.Did 102 103 l := i.Logger.With("handler", "ingestStar") 104 l = l.With("nsid", e.Commit.Collection) 105 106 switch e.Commit.Operation { 107 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 108 var subjectUri syntax.ATURI 109 110 raw := json.RawMessage(e.Commit.Record) 111 record := tangled.FeedStar{} 112 err := json.Unmarshal(raw, &record) 113 if err != nil { 114 l.Error("invalid record", "err", err) 115 return err 116 } 117 118 subjectUri, err = syntax.ParseATURI(record.Subject) 119 if err != nil { 120 l.Error("invalid record", "err", err) 121 return err 122 } 123 err = db.AddStar(i.Db, &models.Star{ 124 StarredByDid: did, 125 RepoAt: subjectUri, 126 Rkey: e.Commit.RKey, 127 }) 128 case jmodels.CommitOperationDelete: 129 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 130 } 131 132 if err != nil { 133 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 134 } 135 136 return nil 137} 138 139func (i *Ingester) ingestFollow(e *jmodels.Event) error { 140 var err error 141 did := e.Did 142 143 l := i.Logger.With("handler", "ingestFollow") 144 l = l.With("nsid", e.Commit.Collection) 145 146 switch e.Commit.Operation { 147 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 148 raw := json.RawMessage(e.Commit.Record) 149 record := tangled.GraphFollow{} 150 err = json.Unmarshal(raw, &record) 151 if err != nil { 152 l.Error("invalid record", "err", err) 153 return err 154 } 155 156 err = db.AddFollow(i.Db, &models.Follow{ 157 UserDid: did, 158 SubjectDid: record.Subject, 159 Rkey: e.Commit.RKey, 160 }) 161 case jmodels.CommitOperationDelete: 162 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 163 } 164 165 if err != nil { 166 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 167 } 168 169 return nil 170} 171 172func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 173 did := e.Did 174 var err error 175 176 l := i.Logger.With("handler", "ingestPublicKey") 177 l = l.With("nsid", e.Commit.Collection) 178 179 switch e.Commit.Operation { 180 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 181 l.Debug("processing add of pubkey") 182 raw := json.RawMessage(e.Commit.Record) 183 record := tangled.PublicKey{} 184 err = json.Unmarshal(raw, &record) 185 if err != nil { 186 l.Error("invalid record", "err", err) 187 return err 188 } 189 190 name := record.Name 191 key := record.Key 192 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 193 case jmodels.CommitOperationDelete: 194 l.Debug("processing delete of pubkey") 195 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 196 } 197 198 if err != nil { 199 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 200 } 201 202 return nil 203} 204 205func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 206 did := e.Did 207 var err error 208 209 l := i.Logger.With("handler", "ingestArtifact") 210 l = l.With("nsid", e.Commit.Collection) 211 212 switch e.Commit.Operation { 213 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 214 raw := json.RawMessage(e.Commit.Record) 215 record := tangled.RepoArtifact{} 216 err = json.Unmarshal(raw, &record) 217 if err != nil { 218 l.Error("invalid record", "err", err) 219 return err 220 } 221 222 repoAt, err := syntax.ParseATURI(record.Repo) 223 if err != nil { 224 return err 225 } 226 227 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 228 if err != nil { 229 return err 230 } 231 232 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 233 if err != nil || !ok { 234 return err 235 } 236 237 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 238 if err != nil { 239 createdAt = time.Now() 240 } 241 242 artifact := models.Artifact{ 243 Did: did, 244 Rkey: e.Commit.RKey, 245 RepoAt: repoAt, 246 Tag: plumbing.Hash(record.Tag), 247 CreatedAt: createdAt, 248 BlobCid: cid.Cid(record.Artifact.Ref), 249 Name: record.Name, 250 Size: uint64(record.Artifact.Size), 251 MimeType: record.Artifact.MimeType, 252 } 253 254 err = db.AddArtifact(i.Db, artifact) 255 case jmodels.CommitOperationDelete: 256 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 257 } 258 259 if err != nil { 260 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 261 } 262 263 return nil 264} 265 266func (i *Ingester) ingestProfile(e *jmodels.Event) error { 267 did := e.Did 268 var err error 269 270 l := i.Logger.With("handler", "ingestProfile") 271 l = l.With("nsid", e.Commit.Collection) 272 273 if e.Commit.RKey != "self" { 274 return fmt.Errorf("ingestProfile only ingests `self` record") 275 } 276 277 switch e.Commit.Operation { 278 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 279 raw := json.RawMessage(e.Commit.Record) 280 record := tangled.ActorProfile{} 281 err = json.Unmarshal(raw, &record) 282 if err != nil { 283 l.Error("invalid record", "err", err) 284 return err 285 } 286 287 description := "" 288 if record.Description != nil { 289 description = *record.Description 290 } 291 292 includeBluesky := record.Bluesky 293 294 pronouns := "" 295 if record.Pronouns != nil { 296 pronouns = *record.Pronouns 297 } 298 299 location := "" 300 if record.Location != nil { 301 location = *record.Location 302 } 303 304 var links [5]string 305 for i, l := range record.Links { 306 if i < 5 { 307 links[i] = l 308 } 309 } 310 311 var stats [2]models.VanityStat 312 for i, s := range record.Stats { 313 if i < 2 { 314 stats[i].Kind = models.VanityStatKind(s) 315 } 316 } 317 318 var pinned [6]syntax.ATURI 319 for i, r := range record.PinnedRepositories { 320 if i < 6 { 321 pinned[i] = syntax.ATURI(r) 322 } 323 } 324 325 profile := models.Profile{ 326 Did: did, 327 Description: description, 328 IncludeBluesky: includeBluesky, 329 Location: location, 330 Links: links, 331 Stats: stats, 332 PinnedRepos: pinned, 333 Pronouns: pronouns, 334 } 335 336 ddb, ok := i.Db.Execer.(*db.DB) 337 if !ok { 338 return fmt.Errorf("failed to index profile record, invalid db cast") 339 } 340 341 tx, err := ddb.Begin() 342 if err != nil { 343 return fmt.Errorf("failed to start transaction") 344 } 345 346 err = db.ValidateProfile(tx, &profile) 347 if err != nil { 348 return fmt.Errorf("invalid profile record") 349 } 350 351 err = db.UpsertProfile(tx, &profile) 352 case jmodels.CommitOperationDelete: 353 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 354 } 355 356 if err != nil { 357 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 358 } 359 360 return nil 361} 362 363func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 364 did := e.Did 365 var err error 366 367 l := i.Logger.With("handler", "ingestSpindleMember") 368 l = l.With("nsid", e.Commit.Collection) 369 370 switch e.Commit.Operation { 371 case jmodels.CommitOperationCreate: 372 raw := json.RawMessage(e.Commit.Record) 373 record := tangled.SpindleMember{} 374 err = json.Unmarshal(raw, &record) 375 if err != nil { 376 l.Error("invalid record", "err", err) 377 return err 378 } 379 380 // only spindle owner can invite to spindles 381 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 382 if err != nil || !ok { 383 return fmt.Errorf("failed to enforce permissions: %w", err) 384 } 385 386 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 387 if err != nil { 388 return err 389 } 390 391 if memberId.Handle.IsInvalidHandle() { 392 return err 393 } 394 395 ddb, ok := i.Db.Execer.(*db.DB) 396 if !ok { 397 return fmt.Errorf("failed to index profile record, invalid db cast") 398 } 399 400 err = db.AddSpindleMember(ddb, models.SpindleMember{ 401 Did: syntax.DID(did), 402 Rkey: e.Commit.RKey, 403 Instance: record.Instance, 404 Subject: memberId.DID, 405 }) 406 if !ok { 407 return fmt.Errorf("failed to add to db: %w", err) 408 } 409 410 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 411 if err != nil { 412 return fmt.Errorf("failed to update ACLs: %w", err) 413 } 414 415 l.Info("added spindle member") 416 case jmodels.CommitOperationDelete: 417 rkey := e.Commit.RKey 418 419 ddb, ok := i.Db.Execer.(*db.DB) 420 if !ok { 421 return fmt.Errorf("failed to index profile record, invalid db cast") 422 } 423 424 // get record from db first 425 members, err := db.GetSpindleMembers( 426 ddb, 427 db.FilterEq("did", did), 428 db.FilterEq("rkey", rkey), 429 ) 430 if err != nil || len(members) != 1 { 431 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 432 } 433 member := members[0] 434 435 tx, err := ddb.Begin() 436 if err != nil { 437 return fmt.Errorf("failed to start txn: %w", err) 438 } 439 440 // remove record by rkey && update enforcer 441 if err = db.RemoveSpindleMember( 442 tx, 443 db.FilterEq("did", did), 444 db.FilterEq("rkey", rkey), 445 ); err != nil { 446 return fmt.Errorf("failed to remove from db: %w", err) 447 } 448 449 // update enforcer 450 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 451 if err != nil { 452 return fmt.Errorf("failed to update ACLs: %w", err) 453 } 454 455 if err = tx.Commit(); err != nil { 456 return fmt.Errorf("failed to commit txn: %w", err) 457 } 458 459 if err = i.Enforcer.E.SavePolicy(); err != nil { 460 return fmt.Errorf("failed to save ACLs: %w", err) 461 } 462 463 l.Info("removed spindle member") 464 } 465 466 return nil 467} 468 469func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 470 did := e.Did 471 var err error 472 473 l := i.Logger.With("handler", "ingestSpindle") 474 l = l.With("nsid", e.Commit.Collection) 475 476 switch e.Commit.Operation { 477 case jmodels.CommitOperationCreate: 478 raw := json.RawMessage(e.Commit.Record) 479 record := tangled.Spindle{} 480 err = json.Unmarshal(raw, &record) 481 if err != nil { 482 l.Error("invalid record", "err", err) 483 return err 484 } 485 486 instance := e.Commit.RKey 487 488 ddb, ok := i.Db.Execer.(*db.DB) 489 if !ok { 490 return fmt.Errorf("failed to index profile record, invalid db cast") 491 } 492 493 err := db.AddSpindle(ddb, models.Spindle{ 494 Owner: syntax.DID(did), 495 Instance: instance, 496 }) 497 if err != nil { 498 l.Error("failed to add spindle to db", "err", err, "instance", instance) 499 return err 500 } 501 502 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 503 if err != nil { 504 l.Error("failed to add spindle to db", "err", err, "instance", instance) 505 return err 506 } 507 508 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 509 if err != nil { 510 return fmt.Errorf("failed to mark verified: %w", err) 511 } 512 513 return nil 514 515 case jmodels.CommitOperationDelete: 516 instance := e.Commit.RKey 517 518 ddb, ok := i.Db.Execer.(*db.DB) 519 if !ok { 520 return fmt.Errorf("failed to index profile record, invalid db cast") 521 } 522 523 // get record from db first 524 spindles, err := db.GetSpindles( 525 ddb, 526 db.FilterEq("owner", did), 527 db.FilterEq("instance", instance), 528 ) 529 if err != nil || len(spindles) != 1 { 530 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 531 } 532 spindle := spindles[0] 533 534 tx, err := ddb.Begin() 535 if err != nil { 536 return err 537 } 538 defer func() { 539 tx.Rollback() 540 i.Enforcer.E.LoadPolicy() 541 }() 542 543 // remove spindle members first 544 err = db.RemoveSpindleMember( 545 tx, 546 db.FilterEq("owner", did), 547 db.FilterEq("instance", instance), 548 ) 549 if err != nil { 550 return err 551 } 552 553 err = db.DeleteSpindle( 554 tx, 555 db.FilterEq("owner", did), 556 db.FilterEq("instance", instance), 557 ) 558 if err != nil { 559 return err 560 } 561 562 if spindle.Verified != nil { 563 err = i.Enforcer.RemoveSpindle(instance) 564 if err != nil { 565 return err 566 } 567 } 568 569 err = tx.Commit() 570 if err != nil { 571 return err 572 } 573 574 err = i.Enforcer.E.SavePolicy() 575 if err != nil { 576 return err 577 } 578 } 579 580 return nil 581} 582 583func (i *Ingester) ingestString(e *jmodels.Event) error { 584 did := e.Did 585 rkey := e.Commit.RKey 586 587 var err error 588 589 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 590 l.Info("ingesting record") 591 592 ddb, ok := i.Db.Execer.(*db.DB) 593 if !ok { 594 return fmt.Errorf("failed to index string record, invalid db cast") 595 } 596 597 switch e.Commit.Operation { 598 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 599 raw := json.RawMessage(e.Commit.Record) 600 record := tangled.String{} 601 err = json.Unmarshal(raw, &record) 602 if err != nil { 603 l.Error("invalid record", "err", err) 604 return err 605 } 606 607 string := models.StringFromRecord(did, rkey, record) 608 609 if err = i.Validator.ValidateString(&string); err != nil { 610 l.Error("invalid record", "err", err) 611 return err 612 } 613 614 if err = db.AddString(ddb, string); err != nil { 615 l.Error("failed to add string", "err", err) 616 return err 617 } 618 619 return nil 620 621 case jmodels.CommitOperationDelete: 622 if err := db.DeleteString( 623 ddb, 624 db.FilterEq("did", did), 625 db.FilterEq("rkey", rkey), 626 ); err != nil { 627 l.Error("failed to delete", "err", err) 628 return fmt.Errorf("failed to delete string record: %w", err) 629 } 630 631 return nil 632 } 633 634 return nil 635} 636 637func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 638 did := e.Did 639 var err error 640 641 l := i.Logger.With("handler", "ingestKnotMember") 642 l = l.With("nsid", e.Commit.Collection) 643 644 switch e.Commit.Operation { 645 case jmodels.CommitOperationCreate: 646 raw := json.RawMessage(e.Commit.Record) 647 record := tangled.KnotMember{} 648 err = json.Unmarshal(raw, &record) 649 if err != nil { 650 l.Error("invalid record", "err", err) 651 return err 652 } 653 654 // only knot owner can invite to knots 655 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 656 if err != nil || !ok { 657 return fmt.Errorf("failed to enforce permissions: %w", err) 658 } 659 660 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 661 if err != nil { 662 return err 663 } 664 665 if memberId.Handle.IsInvalidHandle() { 666 return err 667 } 668 669 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 670 if err != nil { 671 return fmt.Errorf("failed to update ACLs: %w", err) 672 } 673 674 l.Info("added knot member") 675 case jmodels.CommitOperationDelete: 676 // we don't store knot members in a table (like we do for spindle) 677 // and we can't remove this just yet. possibly fixed if we switch 678 // to either: 679 // 1. a knot_members table like with spindle and store the rkey 680 // 2. use the knot host as the rkey 681 // 682 // TODO: implement member deletion 683 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 684 } 685 686 return nil 687} 688 689func (i *Ingester) ingestKnot(e *jmodels.Event) error { 690 did := e.Did 691 var err error 692 693 l := i.Logger.With("handler", "ingestKnot") 694 l = l.With("nsid", e.Commit.Collection) 695 696 switch e.Commit.Operation { 697 case jmodels.CommitOperationCreate: 698 raw := json.RawMessage(e.Commit.Record) 699 record := tangled.Knot{} 700 err = json.Unmarshal(raw, &record) 701 if err != nil { 702 l.Error("invalid record", "err", err) 703 return err 704 } 705 706 domain := e.Commit.RKey 707 708 ddb, ok := i.Db.Execer.(*db.DB) 709 if !ok { 710 return fmt.Errorf("failed to index profile record, invalid db cast") 711 } 712 713 err := db.AddKnot(ddb, domain, did) 714 if err != nil { 715 l.Error("failed to add knot to db", "err", err, "domain", domain) 716 return err 717 } 718 719 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 720 if err != nil { 721 l.Error("failed to verify knot", "err", err, "domain", domain) 722 return err 723 } 724 725 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 726 if err != nil { 727 return fmt.Errorf("failed to mark verified: %w", err) 728 } 729 730 return nil 731 732 case jmodels.CommitOperationDelete: 733 domain := e.Commit.RKey 734 735 ddb, ok := i.Db.Execer.(*db.DB) 736 if !ok { 737 return fmt.Errorf("failed to index knot record, invalid db cast") 738 } 739 740 // get record from db first 741 registrations, err := db.GetRegistrations( 742 ddb, 743 db.FilterEq("domain", domain), 744 db.FilterEq("did", did), 745 ) 746 if err != nil { 747 return fmt.Errorf("failed to get registration: %w", err) 748 } 749 if len(registrations) != 1 { 750 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 751 } 752 registration := registrations[0] 753 754 tx, err := ddb.Begin() 755 if err != nil { 756 return err 757 } 758 defer func() { 759 tx.Rollback() 760 i.Enforcer.E.LoadPolicy() 761 }() 762 763 err = db.DeleteKnot( 764 tx, 765 db.FilterEq("did", did), 766 db.FilterEq("domain", domain), 767 ) 768 if err != nil { 769 return err 770 } 771 772 if registration.Registered != nil { 773 err = i.Enforcer.RemoveKnot(domain) 774 if err != nil { 775 return err 776 } 777 } 778 779 err = tx.Commit() 780 if err != nil { 781 return err 782 } 783 784 err = i.Enforcer.E.SavePolicy() 785 if err != nil { 786 return err 787 } 788 } 789 790 return nil 791} 792func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 793 did := e.Did 794 rkey := e.Commit.RKey 795 796 var err error 797 798 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 799 l.Info("ingesting record") 800 801 ddb, ok := i.Db.Execer.(*db.DB) 802 if !ok { 803 return fmt.Errorf("failed to index issue record, invalid db cast") 804 } 805 806 switch e.Commit.Operation { 807 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 808 raw := json.RawMessage(e.Commit.Record) 809 record := tangled.RepoIssue{} 810 err = json.Unmarshal(raw, &record) 811 if err != nil { 812 l.Error("invalid record", "err", err) 813 return err 814 } 815 816 issue := models.IssueFromRecord(did, rkey, record) 817 818 if err := i.Validator.ValidateIssue(&issue); err != nil { 819 return fmt.Errorf("failed to validate issue: %w", err) 820 } 821 822 tx, err := ddb.BeginTx(ctx, nil) 823 if err != nil { 824 l.Error("failed to begin transaction", "err", err) 825 return err 826 } 827 defer tx.Rollback() 828 829 err = db.PutIssue(tx, &issue) 830 if err != nil { 831 l.Error("failed to create issue", "err", err) 832 return err 833 } 834 835 err = tx.Commit() 836 if err != nil { 837 l.Error("failed to commit txn", "err", err) 838 return err 839 } 840 841 return nil 842 843 case jmodels.CommitOperationDelete: 844 if err := db.DeleteIssues( 845 ddb, 846 db.FilterEq("did", did), 847 db.FilterEq("rkey", rkey), 848 ); err != nil { 849 l.Error("failed to delete", "err", err) 850 return fmt.Errorf("failed to delete issue record: %w", err) 851 } 852 853 return nil 854 } 855 856 return nil 857} 858 859func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 860 did := e.Did 861 rkey := e.Commit.RKey 862 863 var err error 864 865 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 866 l.Info("ingesting record") 867 868 ddb, ok := i.Db.Execer.(*db.DB) 869 if !ok { 870 return fmt.Errorf("failed to index issue comment record, invalid db cast") 871 } 872 873 switch e.Commit.Operation { 874 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 875 raw := json.RawMessage(e.Commit.Record) 876 record := tangled.RepoIssueComment{} 877 err = json.Unmarshal(raw, &record) 878 if err != nil { 879 return fmt.Errorf("invalid record: %w", err) 880 } 881 882 comment, err := models.IssueCommentFromRecord(did, rkey, record) 883 if err != nil { 884 return fmt.Errorf("failed to parse comment from record: %w", err) 885 } 886 887 if err := i.Validator.ValidateIssueComment(comment); err != nil { 888 return fmt.Errorf("failed to validate comment: %w", err) 889 } 890 891 _, err = db.AddIssueComment(ddb, *comment) 892 if err != nil { 893 return fmt.Errorf("failed to create issue comment: %w", err) 894 } 895 896 return nil 897 898 case jmodels.CommitOperationDelete: 899 if err := db.DeleteIssueComments( 900 ddb, 901 db.FilterEq("did", did), 902 db.FilterEq("rkey", rkey), 903 ); err != nil { 904 return fmt.Errorf("failed to delete issue comment record: %w", err) 905 } 906 907 return nil 908 } 909 910 return nil 911} 912 913func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 914 did := e.Did 915 rkey := e.Commit.RKey 916 917 var err error 918 919 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 920 l.Info("ingesting record") 921 922 ddb, ok := i.Db.Execer.(*db.DB) 923 if !ok { 924 return fmt.Errorf("failed to index label definition, invalid db cast") 925 } 926 927 switch e.Commit.Operation { 928 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 929 raw := json.RawMessage(e.Commit.Record) 930 record := tangled.LabelDefinition{} 931 err = json.Unmarshal(raw, &record) 932 if err != nil { 933 return fmt.Errorf("invalid record: %w", err) 934 } 935 936 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 937 if err != nil { 938 return fmt.Errorf("failed to parse labeldef from record: %w", err) 939 } 940 941 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 942 return fmt.Errorf("failed to validate labeldef: %w", err) 943 } 944 945 _, err = db.AddLabelDefinition(ddb, def) 946 if err != nil { 947 return fmt.Errorf("failed to create labeldef: %w", err) 948 } 949 950 return nil 951 952 case jmodels.CommitOperationDelete: 953 if err := db.DeleteLabelDefinition( 954 ddb, 955 db.FilterEq("did", did), 956 db.FilterEq("rkey", rkey), 957 ); err != nil { 958 return fmt.Errorf("failed to delete labeldef record: %w", err) 959 } 960 961 return nil 962 } 963 964 return nil 965} 966 967func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 968 did := e.Did 969 rkey := e.Commit.RKey 970 971 var err error 972 973 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 974 l.Info("ingesting record") 975 976 ddb, ok := i.Db.Execer.(*db.DB) 977 if !ok { 978 return fmt.Errorf("failed to index label op, invalid db cast") 979 } 980 981 switch e.Commit.Operation { 982 case jmodels.CommitOperationCreate: 983 raw := json.RawMessage(e.Commit.Record) 984 record := tangled.LabelOp{} 985 err = json.Unmarshal(raw, &record) 986 if err != nil { 987 return fmt.Errorf("invalid record: %w", err) 988 } 989 990 subject := syntax.ATURI(record.Subject) 991 collection := subject.Collection() 992 993 var repo *models.Repo 994 switch collection { 995 case tangled.RepoIssueNSID: 996 i, err := db.GetIssues(ddb, db.FilterEq("at_uri", subject)) 997 if err != nil || len(i) != 1 { 998 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 999 } 1000 repo = i[0].Repo 1001 default: 1002 return fmt.Errorf("unsupport label subject: %s", collection) 1003 } 1004 1005 actx, err := db.NewLabelApplicationCtx(ddb, db.FilterIn("at_uri", repo.Labels)) 1006 if err != nil { 1007 return fmt.Errorf("failed to build label application ctx: %w", err) 1008 } 1009 1010 ops := models.LabelOpsFromRecord(did, rkey, record) 1011 1012 for _, o := range ops { 1013 def, ok := actx.Defs[o.OperandKey] 1014 if !ok { 1015 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1016 } 1017 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1018 return fmt.Errorf("failed to validate labelop: %w", err) 1019 } 1020 } 1021 1022 tx, err := ddb.Begin() 1023 if err != nil { 1024 return err 1025 } 1026 defer tx.Rollback() 1027 1028 for _, o := range ops { 1029 _, err = db.AddLabelOp(tx, &o) 1030 if err != nil { 1031 return fmt.Errorf("failed to add labelop: %w", err) 1032 } 1033 } 1034 1035 if err = tx.Commit(); err != nil { 1036 return err 1037 } 1038 } 1039 1040 return nil 1041}