forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "maps" 9 "slices" 10 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 jmodels "github.com/bluesky-social/jetstream/pkg/models" 15 "github.com/go-git/go-git/v5/plumbing" 16 "github.com/ipfs/go-cid" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/serververify" 22 "tangled.org/core/appview/validator" 23 "tangled.org/core/idresolver" 24 "tangled.org/core/rbac" 25) 26 27type Ingester struct { 28 Db db.DbWrapper 29 Enforcer *rbac.Enforcer 30 IdResolver *idresolver.Resolver 31 Config *config.Config 32 Logger *slog.Logger 33 Validator *validator.Validator 34} 35 36type processFunc func(ctx context.Context, e *jmodels.Event) error 37 38func (i *Ingester) Ingest() processFunc { 39 return func(ctx context.Context, e *jmodels.Event) error { 40 var err error 41 defer func() { 42 eventTime := e.TimeUS 43 lastTimeUs := eventTime + 1 44 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 45 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 46 } 47 }() 48 49 l := i.Logger.With("kind", e.Kind) 50 switch e.Kind { 51 case jmodels.EventKindAccount: 52 if !e.Account.Active && *e.Account.Status == "deactivated" { 53 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 54 } 55 case jmodels.EventKindIdentity: 56 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 57 case jmodels.EventKindCommit: 58 switch e.Commit.Collection { 59 case tangled.GraphFollowNSID: 60 err = i.ingestFollow(e) 61 case tangled.FeedStarNSID: 62 err = i.ingestStar(e) 63 case tangled.PublicKeyNSID: 64 err = i.ingestPublicKey(e) 65 case tangled.RepoArtifactNSID: 66 err = i.ingestArtifact(e) 67 case tangled.ActorProfileNSID: 68 err = i.ingestProfile(e) 69 case tangled.SpindleMemberNSID: 70 err = i.ingestSpindleMember(ctx, e) 71 case tangled.SpindleNSID: 72 err = i.ingestSpindle(ctx, e) 73 case tangled.KnotMemberNSID: 74 err = i.ingestKnotMember(e) 75 case tangled.KnotNSID: 76 err = i.ingestKnot(e) 77 case tangled.StringNSID: 78 err = i.ingestString(e) 79 case tangled.RepoIssueNSID: 80 err = i.ingestIssue(ctx, e) 81 case tangled.RepoIssueCommentNSID: 82 err = i.ingestIssueComment(e) 83 case tangled.LabelDefinitionNSID: 84 err = i.ingestLabelDefinition(e) 85 case tangled.LabelOpNSID: 86 err = i.ingestLabelOp(e) 87 } 88 l = i.Logger.With("nsid", e.Commit.Collection) 89 } 90 91 if err != nil { 92 l.Warn("refused to ingest record", "err", err) 93 } 94 95 return nil 96 } 97} 98 99func (i *Ingester) ingestStar(e *jmodels.Event) error { 100 var err error 101 did := e.Did 102 103 l := i.Logger.With("handler", "ingestStar") 104 l = l.With("nsid", e.Commit.Collection) 105 106 switch e.Commit.Operation { 107 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 108 var subjectUri syntax.ATURI 109 110 raw := json.RawMessage(e.Commit.Record) 111 record := tangled.FeedStar{} 112 err := json.Unmarshal(raw, &record) 113 if err != nil { 114 l.Error("invalid record", "err", err) 115 return err 116 } 117 118 subjectUri, err = syntax.ParseATURI(record.Subject) 119 if err != nil { 120 l.Error("invalid record", "err", err) 121 return err 122 } 123 err = db.AddStar(i.Db, &models.Star{ 124 StarredByDid: did, 125 RepoAt: subjectUri, 126 Rkey: e.Commit.RKey, 127 }) 128 case jmodels.CommitOperationDelete: 129 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 130 } 131 132 if err != nil { 133 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 134 } 135 136 return nil 137} 138 139func (i *Ingester) ingestFollow(e *jmodels.Event) error { 140 var err error 141 did := e.Did 142 143 l := i.Logger.With("handler", "ingestFollow") 144 l = l.With("nsid", e.Commit.Collection) 145 146 switch e.Commit.Operation { 147 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 148 raw := json.RawMessage(e.Commit.Record) 149 record := tangled.GraphFollow{} 150 err = json.Unmarshal(raw, &record) 151 if err != nil { 152 l.Error("invalid record", "err", err) 153 return err 154 } 155 156 err = db.AddFollow(i.Db, &models.Follow{ 157 UserDid: did, 158 SubjectDid: record.Subject, 159 Rkey: e.Commit.RKey, 160 }) 161 case jmodels.CommitOperationDelete: 162 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 163 } 164 165 if err != nil { 166 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 167 } 168 169 return nil 170} 171 172func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 173 did := e.Did 174 var err error 175 176 l := i.Logger.With("handler", "ingestPublicKey") 177 l = l.With("nsid", e.Commit.Collection) 178 179 switch e.Commit.Operation { 180 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 181 l.Debug("processing add of pubkey") 182 raw := json.RawMessage(e.Commit.Record) 183 record := tangled.PublicKey{} 184 err = json.Unmarshal(raw, &record) 185 if err != nil { 186 l.Error("invalid record", "err", err) 187 return err 188 } 189 190 name := record.Name 191 key := record.Key 192 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 193 case jmodels.CommitOperationDelete: 194 l.Debug("processing delete of pubkey") 195 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 196 } 197 198 if err != nil { 199 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 200 } 201 202 return nil 203} 204 205func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 206 did := e.Did 207 var err error 208 209 l := i.Logger.With("handler", "ingestArtifact") 210 l = l.With("nsid", e.Commit.Collection) 211 212 switch e.Commit.Operation { 213 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 214 raw := json.RawMessage(e.Commit.Record) 215 record := tangled.RepoArtifact{} 216 err = json.Unmarshal(raw, &record) 217 if err != nil { 218 l.Error("invalid record", "err", err) 219 return err 220 } 221 222 repoAt, err := syntax.ParseATURI(record.Repo) 223 if err != nil { 224 return err 225 } 226 227 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 228 if err != nil { 229 return err 230 } 231 232 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 233 if err != nil || !ok { 234 return err 235 } 236 237 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 238 if err != nil { 239 createdAt = time.Now() 240 } 241 242 artifact := models.Artifact{ 243 Did: did, 244 Rkey: e.Commit.RKey, 245 RepoAt: repoAt, 246 Tag: plumbing.Hash(record.Tag), 247 CreatedAt: createdAt, 248 BlobCid: cid.Cid(record.Artifact.Ref), 249 Name: record.Name, 250 Size: uint64(record.Artifact.Size), 251 MimeType: record.Artifact.MimeType, 252 } 253 254 err = db.AddArtifact(i.Db, artifact) 255 case jmodels.CommitOperationDelete: 256 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 257 } 258 259 if err != nil { 260 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 261 } 262 263 return nil 264} 265 266func (i *Ingester) ingestProfile(e *jmodels.Event) error { 267 did := e.Did 268 var err error 269 270 l := i.Logger.With("handler", "ingestProfile") 271 l = l.With("nsid", e.Commit.Collection) 272 273 if e.Commit.RKey != "self" { 274 return fmt.Errorf("ingestProfile only ingests `self` record") 275 } 276 277 switch e.Commit.Operation { 278 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 279 raw := json.RawMessage(e.Commit.Record) 280 record := tangled.ActorProfile{} 281 err = json.Unmarshal(raw, &record) 282 if err != nil { 283 l.Error("invalid record", "err", err) 284 return err 285 } 286 287 description := "" 288 if record.Description != nil { 289 description = *record.Description 290 } 291 292 includeBluesky := record.Bluesky 293 294 location := "" 295 if record.Location != nil { 296 location = *record.Location 297 } 298 299 var links [5]string 300 for i, l := range record.Links { 301 if i < 5 { 302 links[i] = l 303 } 304 } 305 306 var stats [2]models.VanityStat 307 for i, s := range record.Stats { 308 if i < 2 { 309 stats[i].Kind = models.VanityStatKind(s) 310 } 311 } 312 313 var pinned [6]syntax.ATURI 314 for i, r := range record.PinnedRepositories { 315 if i < 6 { 316 pinned[i] = syntax.ATURI(r) 317 } 318 } 319 320 profile := models.Profile{ 321 Did: did, 322 Description: description, 323 IncludeBluesky: includeBluesky, 324 Location: location, 325 Links: links, 326 Stats: stats, 327 PinnedRepos: pinned, 328 } 329 330 ddb, ok := i.Db.Execer.(*db.DB) 331 if !ok { 332 return fmt.Errorf("failed to index profile record, invalid db cast") 333 } 334 335 tx, err := ddb.Begin() 336 if err != nil { 337 return fmt.Errorf("failed to start transaction") 338 } 339 340 err = db.ValidateProfile(tx, &profile) 341 if err != nil { 342 return fmt.Errorf("invalid profile record") 343 } 344 345 err = db.UpsertProfile(tx, &profile) 346 case jmodels.CommitOperationDelete: 347 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 348 } 349 350 if err != nil { 351 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 352 } 353 354 return nil 355} 356 357func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 358 did := e.Did 359 var err error 360 361 l := i.Logger.With("handler", "ingestSpindleMember") 362 l = l.With("nsid", e.Commit.Collection) 363 364 switch e.Commit.Operation { 365 case jmodels.CommitOperationCreate: 366 raw := json.RawMessage(e.Commit.Record) 367 record := tangled.SpindleMember{} 368 err = json.Unmarshal(raw, &record) 369 if err != nil { 370 l.Error("invalid record", "err", err) 371 return err 372 } 373 374 // only spindle owner can invite to spindles 375 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 376 if err != nil || !ok { 377 return fmt.Errorf("failed to enforce permissions: %w", err) 378 } 379 380 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 381 if err != nil { 382 return err 383 } 384 385 if memberId.Handle.IsInvalidHandle() { 386 return err 387 } 388 389 ddb, ok := i.Db.Execer.(*db.DB) 390 if !ok { 391 return fmt.Errorf("failed to index profile record, invalid db cast") 392 } 393 394 err = db.AddSpindleMember(ddb, models.SpindleMember{ 395 Did: syntax.DID(did), 396 Rkey: e.Commit.RKey, 397 Instance: record.Instance, 398 Subject: memberId.DID, 399 }) 400 if !ok { 401 return fmt.Errorf("failed to add to db: %w", err) 402 } 403 404 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 405 if err != nil { 406 return fmt.Errorf("failed to update ACLs: %w", err) 407 } 408 409 l.Info("added spindle member") 410 case jmodels.CommitOperationDelete: 411 rkey := e.Commit.RKey 412 413 ddb, ok := i.Db.Execer.(*db.DB) 414 if !ok { 415 return fmt.Errorf("failed to index profile record, invalid db cast") 416 } 417 418 // get record from db first 419 members, err := db.GetSpindleMembers( 420 ddb, 421 db.FilterEq("did", did), 422 db.FilterEq("rkey", rkey), 423 ) 424 if err != nil || len(members) != 1 { 425 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 426 } 427 member := members[0] 428 429 tx, err := ddb.Begin() 430 if err != nil { 431 return fmt.Errorf("failed to start txn: %w", err) 432 } 433 434 // remove record by rkey && update enforcer 435 if err = db.RemoveSpindleMember( 436 tx, 437 db.FilterEq("did", did), 438 db.FilterEq("rkey", rkey), 439 ); err != nil { 440 return fmt.Errorf("failed to remove from db: %w", err) 441 } 442 443 // update enforcer 444 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 445 if err != nil { 446 return fmt.Errorf("failed to update ACLs: %w", err) 447 } 448 449 if err = tx.Commit(); err != nil { 450 return fmt.Errorf("failed to commit txn: %w", err) 451 } 452 453 if err = i.Enforcer.E.SavePolicy(); err != nil { 454 return fmt.Errorf("failed to save ACLs: %w", err) 455 } 456 457 l.Info("removed spindle member") 458 } 459 460 return nil 461} 462 463func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 464 did := e.Did 465 var err error 466 467 l := i.Logger.With("handler", "ingestSpindle") 468 l = l.With("nsid", e.Commit.Collection) 469 470 switch e.Commit.Operation { 471 case jmodels.CommitOperationCreate: 472 raw := json.RawMessage(e.Commit.Record) 473 record := tangled.Spindle{} 474 err = json.Unmarshal(raw, &record) 475 if err != nil { 476 l.Error("invalid record", "err", err) 477 return err 478 } 479 480 instance := e.Commit.RKey 481 482 ddb, ok := i.Db.Execer.(*db.DB) 483 if !ok { 484 return fmt.Errorf("failed to index profile record, invalid db cast") 485 } 486 487 err := db.AddSpindle(ddb, models.Spindle{ 488 Owner: syntax.DID(did), 489 Instance: instance, 490 }) 491 if err != nil { 492 l.Error("failed to add spindle to db", "err", err, "instance", instance) 493 return err 494 } 495 496 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 497 if err != nil { 498 l.Error("failed to add spindle to db", "err", err, "instance", instance) 499 return err 500 } 501 502 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 503 if err != nil { 504 return fmt.Errorf("failed to mark verified: %w", err) 505 } 506 507 return nil 508 509 case jmodels.CommitOperationDelete: 510 instance := e.Commit.RKey 511 512 ddb, ok := i.Db.Execer.(*db.DB) 513 if !ok { 514 return fmt.Errorf("failed to index profile record, invalid db cast") 515 } 516 517 // get record from db first 518 spindles, err := db.GetSpindles( 519 ddb, 520 db.FilterEq("owner", did), 521 db.FilterEq("instance", instance), 522 ) 523 if err != nil || len(spindles) != 1 { 524 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 525 } 526 spindle := spindles[0] 527 528 tx, err := ddb.Begin() 529 if err != nil { 530 return err 531 } 532 defer func() { 533 tx.Rollback() 534 i.Enforcer.E.LoadPolicy() 535 }() 536 537 // remove spindle members first 538 err = db.RemoveSpindleMember( 539 tx, 540 db.FilterEq("owner", did), 541 db.FilterEq("instance", instance), 542 ) 543 if err != nil { 544 return err 545 } 546 547 err = db.DeleteSpindle( 548 tx, 549 db.FilterEq("owner", did), 550 db.FilterEq("instance", instance), 551 ) 552 if err != nil { 553 return err 554 } 555 556 if spindle.Verified != nil { 557 err = i.Enforcer.RemoveSpindle(instance) 558 if err != nil { 559 return err 560 } 561 } 562 563 err = tx.Commit() 564 if err != nil { 565 return err 566 } 567 568 err = i.Enforcer.E.SavePolicy() 569 if err != nil { 570 return err 571 } 572 } 573 574 return nil 575} 576 577func (i *Ingester) ingestString(e *jmodels.Event) error { 578 did := e.Did 579 rkey := e.Commit.RKey 580 581 var err error 582 583 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 584 l.Info("ingesting record") 585 586 ddb, ok := i.Db.Execer.(*db.DB) 587 if !ok { 588 return fmt.Errorf("failed to index string record, invalid db cast") 589 } 590 591 switch e.Commit.Operation { 592 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 593 raw := json.RawMessage(e.Commit.Record) 594 record := tangled.String{} 595 err = json.Unmarshal(raw, &record) 596 if err != nil { 597 l.Error("invalid record", "err", err) 598 return err 599 } 600 601 string := models.StringFromRecord(did, rkey, record) 602 603 if err = i.Validator.ValidateString(&string); err != nil { 604 l.Error("invalid record", "err", err) 605 return err 606 } 607 608 if err = db.AddString(ddb, string); err != nil { 609 l.Error("failed to add string", "err", err) 610 return err 611 } 612 613 return nil 614 615 case jmodels.CommitOperationDelete: 616 if err := db.DeleteString( 617 ddb, 618 db.FilterEq("did", did), 619 db.FilterEq("rkey", rkey), 620 ); err != nil { 621 l.Error("failed to delete", "err", err) 622 return fmt.Errorf("failed to delete string record: %w", err) 623 } 624 625 return nil 626 } 627 628 return nil 629} 630 631func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 632 did := e.Did 633 var err error 634 635 l := i.Logger.With("handler", "ingestKnotMember") 636 l = l.With("nsid", e.Commit.Collection) 637 638 switch e.Commit.Operation { 639 case jmodels.CommitOperationCreate: 640 raw := json.RawMessage(e.Commit.Record) 641 record := tangled.KnotMember{} 642 err = json.Unmarshal(raw, &record) 643 if err != nil { 644 l.Error("invalid record", "err", err) 645 return err 646 } 647 648 // only knot owner can invite to knots 649 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 650 if err != nil || !ok { 651 return fmt.Errorf("failed to enforce permissions: %w", err) 652 } 653 654 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 655 if err != nil { 656 return err 657 } 658 659 if memberId.Handle.IsInvalidHandle() { 660 return err 661 } 662 663 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 664 if err != nil { 665 return fmt.Errorf("failed to update ACLs: %w", err) 666 } 667 668 l.Info("added knot member") 669 case jmodels.CommitOperationDelete: 670 // we don't store knot members in a table (like we do for spindle) 671 // and we can't remove this just yet. possibly fixed if we switch 672 // to either: 673 // 1. a knot_members table like with spindle and store the rkey 674 // 2. use the knot host as the rkey 675 // 676 // TODO: implement member deletion 677 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 678 } 679 680 return nil 681} 682 683func (i *Ingester) ingestKnot(e *jmodels.Event) error { 684 did := e.Did 685 var err error 686 687 l := i.Logger.With("handler", "ingestKnot") 688 l = l.With("nsid", e.Commit.Collection) 689 690 switch e.Commit.Operation { 691 case jmodels.CommitOperationCreate: 692 raw := json.RawMessage(e.Commit.Record) 693 record := tangled.Knot{} 694 err = json.Unmarshal(raw, &record) 695 if err != nil { 696 l.Error("invalid record", "err", err) 697 return err 698 } 699 700 domain := e.Commit.RKey 701 702 ddb, ok := i.Db.Execer.(*db.DB) 703 if !ok { 704 return fmt.Errorf("failed to index profile record, invalid db cast") 705 } 706 707 err := db.AddKnot(ddb, domain, did) 708 if err != nil { 709 l.Error("failed to add knot to db", "err", err, "domain", domain) 710 return err 711 } 712 713 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 714 if err != nil { 715 l.Error("failed to verify knot", "err", err, "domain", domain) 716 return err 717 } 718 719 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 720 if err != nil { 721 return fmt.Errorf("failed to mark verified: %w", err) 722 } 723 724 return nil 725 726 case jmodels.CommitOperationDelete: 727 domain := e.Commit.RKey 728 729 ddb, ok := i.Db.Execer.(*db.DB) 730 if !ok { 731 return fmt.Errorf("failed to index knot record, invalid db cast") 732 } 733 734 // get record from db first 735 registrations, err := db.GetRegistrations( 736 ddb, 737 db.FilterEq("domain", domain), 738 db.FilterEq("did", did), 739 ) 740 if err != nil { 741 return fmt.Errorf("failed to get registration: %w", err) 742 } 743 if len(registrations) != 1 { 744 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 745 } 746 registration := registrations[0] 747 748 tx, err := ddb.Begin() 749 if err != nil { 750 return err 751 } 752 defer func() { 753 tx.Rollback() 754 i.Enforcer.E.LoadPolicy() 755 }() 756 757 err = db.DeleteKnot( 758 tx, 759 db.FilterEq("did", did), 760 db.FilterEq("domain", domain), 761 ) 762 if err != nil { 763 return err 764 } 765 766 if registration.Registered != nil { 767 err = i.Enforcer.RemoveKnot(domain) 768 if err != nil { 769 return err 770 } 771 } 772 773 err = tx.Commit() 774 if err != nil { 775 return err 776 } 777 778 err = i.Enforcer.E.SavePolicy() 779 if err != nil { 780 return err 781 } 782 } 783 784 return nil 785} 786func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 787 did := e.Did 788 rkey := e.Commit.RKey 789 790 var err error 791 792 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 793 l.Info("ingesting record") 794 795 ddb, ok := i.Db.Execer.(*db.DB) 796 if !ok { 797 return fmt.Errorf("failed to index issue record, invalid db cast") 798 } 799 800 switch e.Commit.Operation { 801 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 802 raw := json.RawMessage(e.Commit.Record) 803 record := tangled.RepoIssue{} 804 err = json.Unmarshal(raw, &record) 805 if err != nil { 806 l.Error("invalid record", "err", err) 807 return err 808 } 809 810 issue := models.IssueFromRecord(did, rkey, record) 811 812 if err := i.Validator.ValidateIssue(&issue); err != nil { 813 return fmt.Errorf("failed to validate issue: %w", err) 814 } 815 816 tx, err := ddb.BeginTx(ctx, nil) 817 if err != nil { 818 l.Error("failed to begin transaction", "err", err) 819 return err 820 } 821 defer tx.Rollback() 822 823 err = db.PutIssue(tx, &issue) 824 if err != nil { 825 l.Error("failed to create issue", "err", err) 826 return err 827 } 828 829 err = tx.Commit() 830 if err != nil { 831 l.Error("failed to commit txn", "err", err) 832 return err 833 } 834 835 return nil 836 837 case jmodels.CommitOperationDelete: 838 if err := db.DeleteIssues( 839 ddb, 840 db.FilterEq("did", did), 841 db.FilterEq("rkey", rkey), 842 ); err != nil { 843 l.Error("failed to delete", "err", err) 844 return fmt.Errorf("failed to delete issue record: %w", err) 845 } 846 847 return nil 848 } 849 850 return nil 851} 852 853func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 854 did := e.Did 855 rkey := e.Commit.RKey 856 857 var err error 858 859 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 860 l.Info("ingesting record") 861 862 ddb, ok := i.Db.Execer.(*db.DB) 863 if !ok { 864 return fmt.Errorf("failed to index issue comment record, invalid db cast") 865 } 866 867 switch e.Commit.Operation { 868 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 869 raw := json.RawMessage(e.Commit.Record) 870 record := tangled.RepoIssueComment{} 871 err = json.Unmarshal(raw, &record) 872 if err != nil { 873 return fmt.Errorf("invalid record: %w", err) 874 } 875 876 comment, err := models.IssueCommentFromRecord(did, rkey, record) 877 if err != nil { 878 return fmt.Errorf("failed to parse comment from record: %w", err) 879 } 880 881 if err := i.Validator.ValidateIssueComment(comment); err != nil { 882 return fmt.Errorf("failed to validate comment: %w", err) 883 } 884 885 _, err = db.AddIssueComment(ddb, *comment) 886 if err != nil { 887 return fmt.Errorf("failed to create issue comment: %w", err) 888 } 889 890 return nil 891 892 case jmodels.CommitOperationDelete: 893 if err := db.DeleteIssueComments( 894 ddb, 895 db.FilterEq("did", did), 896 db.FilterEq("rkey", rkey), 897 ); err != nil { 898 return fmt.Errorf("failed to delete issue comment record: %w", err) 899 } 900 901 return nil 902 } 903 904 return nil 905} 906 907func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 908 did := e.Did 909 rkey := e.Commit.RKey 910 911 var err error 912 913 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 914 l.Info("ingesting record") 915 916 ddb, ok := i.Db.Execer.(*db.DB) 917 if !ok { 918 return fmt.Errorf("failed to index label definition, invalid db cast") 919 } 920 921 switch e.Commit.Operation { 922 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 923 raw := json.RawMessage(e.Commit.Record) 924 record := tangled.LabelDefinition{} 925 err = json.Unmarshal(raw, &record) 926 if err != nil { 927 return fmt.Errorf("invalid record: %w", err) 928 } 929 930 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 931 if err != nil { 932 return fmt.Errorf("failed to parse labeldef from record: %w", err) 933 } 934 935 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 936 return fmt.Errorf("failed to validate labeldef: %w", err) 937 } 938 939 _, err = db.AddLabelDefinition(ddb, def) 940 if err != nil { 941 return fmt.Errorf("failed to create labeldef: %w", err) 942 } 943 944 return nil 945 946 case jmodels.CommitOperationDelete: 947 if err := db.DeleteLabelDefinition( 948 ddb, 949 db.FilterEq("did", did), 950 db.FilterEq("rkey", rkey), 951 ); err != nil { 952 return fmt.Errorf("failed to delete labeldef record: %w", err) 953 } 954 955 return nil 956 } 957 958 return nil 959} 960 961func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 962 did := e.Did 963 rkey := e.Commit.RKey 964 965 var err error 966 967 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 968 l.Info("ingesting record") 969 970 ddb, ok := i.Db.Execer.(*db.DB) 971 if !ok { 972 return fmt.Errorf("failed to index label op, invalid db cast") 973 } 974 975 switch e.Commit.Operation { 976 case jmodels.CommitOperationCreate: 977 raw := json.RawMessage(e.Commit.Record) 978 record := tangled.LabelOp{} 979 err = json.Unmarshal(raw, &record) 980 if err != nil { 981 return fmt.Errorf("invalid record: %w", err) 982 } 983 984 subject := syntax.ATURI(record.Subject) 985 collection := subject.Collection() 986 987 var repo *models.Repo 988 switch collection { 989 case tangled.RepoIssueNSID: 990 i, err := db.GetIssues(ddb, db.FilterEq("at_uri", subject)) 991 if err != nil || len(i) != 1 { 992 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 993 } 994 repo = i[0].Repo 995 default: 996 return fmt.Errorf("unsupport label subject: %s", collection) 997 } 998 999 actx, err := db.NewLabelApplicationCtx(ddb, db.FilterIn("at_uri", repo.Labels)) 1000 if err != nil { 1001 return fmt.Errorf("failed to build label application ctx: %w", err) 1002 } 1003 1004 ops := models.LabelOpsFromRecord(did, rkey, record) 1005 1006 for _, o := range ops { 1007 def, ok := actx.Defs[o.OperandKey] 1008 if !ok { 1009 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1010 } 1011 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1012 return fmt.Errorf("failed to validate labelop: %w", err) 1013 } 1014 } 1015 1016 tx, err := ddb.Begin() 1017 if err != nil { 1018 return err 1019 } 1020 defer tx.Rollback() 1021 1022 for _, o := range ops { 1023 _, err = db.AddLabelOp(tx, &o) 1024 if err != nil { 1025 return fmt.Errorf("failed to add labelop: %w", err) 1026 } 1027 } 1028 1029 if err = tx.Commit(); err != nil { 1030 return err 1031 } 1032 } 1033 1034 return nil 1035}