forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 25 kB view raw
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "maps" 9 "slices" 10 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 jmodels "github.com/bluesky-social/jetstream/pkg/models" 15 "github.com/go-git/go-git/v5/plumbing" 16 "github.com/ipfs/go-cid" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/serververify" 22 "tangled.org/core/appview/validator" 23 "tangled.org/core/idresolver" 24 "tangled.org/core/orm" 25 "tangled.org/core/rbac" 26) 27 28type Ingester struct { 29 Db db.DbWrapper 30 Enforcer *rbac.Enforcer 31 IdResolver *idresolver.Resolver 32 Config *config.Config 33 Logger *slog.Logger 34 Validator *validator.Validator 35} 36 37type processFunc func(ctx context.Context, e *jmodels.Event) error 38 39func (i *Ingester) Ingest() processFunc { 40 return func(ctx context.Context, e *jmodels.Event) error { 41 var err error 42 defer func() { 43 eventTime := e.TimeUS 44 lastTimeUs := eventTime + 1 45 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 46 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 47 } 48 }() 49 50 l := i.Logger.With("kind", e.Kind) 51 switch e.Kind { 52 case jmodels.EventKindAccount: 53 if !e.Account.Active && *e.Account.Status == "deactivated" { 54 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 55 } 56 case jmodels.EventKindIdentity: 57 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 58 case jmodels.EventKindCommit: 59 switch e.Commit.Collection { 60 case tangled.GraphFollowNSID: 61 err = i.ingestFollow(e) 62 case tangled.FeedStarNSID: 63 err = i.ingestStar(e) 64 case tangled.PublicKeyNSID: 65 err = i.ingestPublicKey(e) 66 case tangled.RepoArtifactNSID: 67 err = i.ingestArtifact(e) 68 case tangled.ActorProfileNSID: 69 err = i.ingestProfile(e) 70 case tangled.SpindleMemberNSID: 71 err = i.ingestSpindleMember(ctx, e) 72 case tangled.SpindleNSID: 73 err = i.ingestSpindle(ctx, e) 74 case tangled.KnotMemberNSID: 75 err = i.ingestKnotMember(e) 76 case tangled.KnotNSID: 77 err = i.ingestKnot(e) 78 case tangled.StringNSID: 79 err = i.ingestString(e) 80 case tangled.RepoIssueNSID: 81 err = i.ingestIssue(ctx, e) 82 case tangled.RepoIssueCommentNSID: 83 err = i.ingestIssueComment(e) 84 case tangled.LabelDefinitionNSID: 85 err = i.ingestLabelDefinition(e) 86 case tangled.LabelOpNSID: 87 err = i.ingestLabelOp(e) 88 } 89 l = i.Logger.With("nsid", e.Commit.Collection) 90 } 91 92 if err != nil { 93 l.Warn("refused to ingest record", "err", err) 94 } 95 96 return nil 97 } 98} 99 100func (i *Ingester) ingestStar(e *jmodels.Event) error { 101 var err error 102 did := e.Did 103 104 l := i.Logger.With("handler", "ingestStar") 105 l = l.With("nsid", e.Commit.Collection) 106 107 switch e.Commit.Operation { 108 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 109 var subjectUri syntax.ATURI 110 111 raw := json.RawMessage(e.Commit.Record) 112 record := tangled.FeedStar{} 113 err := json.Unmarshal(raw, &record) 114 if err != nil { 115 l.Error("invalid record", "err", err) 116 return err 117 } 118 119 subjectUri, err = syntax.ParseATURI(record.Subject) 120 if err != nil { 121 l.Error("invalid record", "err", err) 122 return err 123 } 124 err = db.AddStar(i.Db, &models.Star{ 125 Did: did, 126 RepoAt: subjectUri, 127 Rkey: e.Commit.RKey, 128 }) 129 case jmodels.CommitOperationDelete: 130 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 131 } 132 133 if err != nil { 134 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 135 } 136 137 return nil 138} 139 140func (i *Ingester) ingestFollow(e *jmodels.Event) error { 141 var err error 142 did := e.Did 143 144 l := i.Logger.With("handler", "ingestFollow") 145 l = l.With("nsid", e.Commit.Collection) 146 147 switch e.Commit.Operation { 148 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 149 raw := json.RawMessage(e.Commit.Record) 150 record := tangled.GraphFollow{} 151 err = json.Unmarshal(raw, &record) 152 if err != nil { 153 l.Error("invalid record", "err", err) 154 return err 155 } 156 157 err = db.AddFollow(i.Db, &models.Follow{ 158 UserDid: did, 159 SubjectDid: record.Subject, 160 Rkey: e.Commit.RKey, 161 }) 162 case jmodels.CommitOperationDelete: 163 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 164 } 165 166 if err != nil { 167 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 168 } 169 170 return nil 171} 172 173func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 174 did := e.Did 175 var err error 176 177 l := i.Logger.With("handler", "ingestPublicKey") 178 l = l.With("nsid", e.Commit.Collection) 179 180 switch e.Commit.Operation { 181 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 182 l.Debug("processing add of pubkey") 183 raw := json.RawMessage(e.Commit.Record) 184 record := tangled.PublicKey{} 185 err = json.Unmarshal(raw, &record) 186 if err != nil { 187 l.Error("invalid record", "err", err) 188 return err 189 } 190 191 name := record.Name 192 key := record.Key 193 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 194 case jmodels.CommitOperationDelete: 195 l.Debug("processing delete of pubkey") 196 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 197 } 198 199 if err != nil { 200 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 201 } 202 203 return nil 204} 205 206func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 207 did := e.Did 208 var err error 209 210 l := i.Logger.With("handler", "ingestArtifact") 211 l = l.With("nsid", e.Commit.Collection) 212 213 switch e.Commit.Operation { 214 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 215 raw := json.RawMessage(e.Commit.Record) 216 record := tangled.RepoArtifact{} 217 err = json.Unmarshal(raw, &record) 218 if err != nil { 219 l.Error("invalid record", "err", err) 220 return err 221 } 222 223 repoAt, err := syntax.ParseATURI(record.Repo) 224 if err != nil { 225 return err 226 } 227 228 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 229 if err != nil { 230 return err 231 } 232 233 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 234 if err != nil || !ok { 235 return err 236 } 237 238 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 239 if err != nil { 240 createdAt = time.Now() 241 } 242 243 artifact := models.Artifact{ 244 Did: did, 245 Rkey: e.Commit.RKey, 246 RepoAt: repoAt, 247 Tag: plumbing.Hash(record.Tag), 248 CreatedAt: createdAt, 249 BlobCid: cid.Cid(record.Artifact.Ref), 250 Name: record.Name, 251 Size: uint64(record.Artifact.Size), 252 MimeType: record.Artifact.MimeType, 253 } 254 255 err = db.AddArtifact(i.Db, artifact) 256 case jmodels.CommitOperationDelete: 257 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 258 } 259 260 if err != nil { 261 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 262 } 263 264 return nil 265} 266 267func (i *Ingester) ingestProfile(e *jmodels.Event) error { 268 did := e.Did 269 var err error 270 271 l := i.Logger.With("handler", "ingestProfile") 272 l = l.With("nsid", e.Commit.Collection) 273 274 if e.Commit.RKey != "self" { 275 return fmt.Errorf("ingestProfile only ingests `self` record") 276 } 277 278 switch e.Commit.Operation { 279 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 280 raw := json.RawMessage(e.Commit.Record) 281 record := tangled.ActorProfile{} 282 err = json.Unmarshal(raw, &record) 283 if err != nil { 284 l.Error("invalid record", "err", err) 285 return err 286 } 287 288 description := "" 289 if record.Description != nil { 290 description = *record.Description 291 } 292 293 includeBluesky := record.Bluesky 294 295 pronouns := "" 296 if record.Pronouns != nil { 297 pronouns = *record.Pronouns 298 } 299 300 location := "" 301 if record.Location != nil { 302 location = *record.Location 303 } 304 305 var links [5]string 306 for i, l := range record.Links { 307 if i < 5 { 308 links[i] = l 309 } 310 } 311 312 var stats [2]models.VanityStat 313 for i, s := range record.Stats { 314 if i < 2 { 315 stats[i].Kind = models.VanityStatKind(s) 316 } 317 } 318 319 var pinned [6]syntax.ATURI 320 for i, r := range record.PinnedRepositories { 321 if i < 6 { 322 pinned[i] = syntax.ATURI(r) 323 } 324 } 325 326 profile := models.Profile{ 327 Did: did, 328 Description: description, 329 IncludeBluesky: includeBluesky, 330 Location: location, 331 Links: links, 332 Stats: stats, 333 PinnedRepos: pinned, 334 Pronouns: pronouns, 335 } 336 337 ddb, ok := i.Db.Execer.(*db.DB) 338 if !ok { 339 return fmt.Errorf("failed to index profile record, invalid db cast") 340 } 341 342 tx, err := ddb.Begin() 343 if err != nil { 344 return fmt.Errorf("failed to start transaction") 345 } 346 347 err = db.ValidateProfile(tx, &profile) 348 if err != nil { 349 return fmt.Errorf("invalid profile record") 350 } 351 352 err = db.UpsertProfile(tx, &profile) 353 case jmodels.CommitOperationDelete: 354 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 355 } 356 357 if err != nil { 358 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 359 } 360 361 return nil 362} 363 364func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 365 did := e.Did 366 var err error 367 368 l := i.Logger.With("handler", "ingestSpindleMember") 369 l = l.With("nsid", e.Commit.Collection) 370 371 switch e.Commit.Operation { 372 case jmodels.CommitOperationCreate: 373 raw := json.RawMessage(e.Commit.Record) 374 record := tangled.SpindleMember{} 375 err = json.Unmarshal(raw, &record) 376 if err != nil { 377 l.Error("invalid record", "err", err) 378 return err 379 } 380 381 // only spindle owner can invite to spindles 382 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 383 if err != nil || !ok { 384 return fmt.Errorf("failed to enforce permissions: %w", err) 385 } 386 387 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 388 if err != nil { 389 return err 390 } 391 392 if memberId.Handle.IsInvalidHandle() { 393 return err 394 } 395 396 ddb, ok := i.Db.Execer.(*db.DB) 397 if !ok { 398 return fmt.Errorf("failed to index profile record, invalid db cast") 399 } 400 401 err = db.AddSpindleMember(ddb, models.SpindleMember{ 402 Did: syntax.DID(did), 403 Rkey: e.Commit.RKey, 404 Instance: record.Instance, 405 Subject: memberId.DID, 406 }) 407 if !ok { 408 return fmt.Errorf("failed to add to db: %w", err) 409 } 410 411 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 412 if err != nil { 413 return fmt.Errorf("failed to update ACLs: %w", err) 414 } 415 416 l.Info("added spindle member") 417 case jmodels.CommitOperationDelete: 418 rkey := e.Commit.RKey 419 420 ddb, ok := i.Db.Execer.(*db.DB) 421 if !ok { 422 return fmt.Errorf("failed to index profile record, invalid db cast") 423 } 424 425 // get record from db first 426 members, err := db.GetSpindleMembers( 427 ddb, 428 orm.FilterEq("did", did), 429 orm.FilterEq("rkey", rkey), 430 ) 431 if err != nil || len(members) != 1 { 432 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 433 } 434 member := members[0] 435 436 tx, err := ddb.Begin() 437 if err != nil { 438 return fmt.Errorf("failed to start txn: %w", err) 439 } 440 441 // remove record by rkey && update enforcer 442 if err = db.RemoveSpindleMember( 443 tx, 444 orm.FilterEq("did", did), 445 orm.FilterEq("rkey", rkey), 446 ); err != nil { 447 return fmt.Errorf("failed to remove from db: %w", err) 448 } 449 450 // update enforcer 451 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 452 if err != nil { 453 return fmt.Errorf("failed to update ACLs: %w", err) 454 } 455 456 if err = tx.Commit(); err != nil { 457 return fmt.Errorf("failed to commit txn: %w", err) 458 } 459 460 if err = i.Enforcer.E.SavePolicy(); err != nil { 461 return fmt.Errorf("failed to save ACLs: %w", err) 462 } 463 464 l.Info("removed spindle member") 465 } 466 467 return nil 468} 469 470func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 471 did := e.Did 472 var err error 473 474 l := i.Logger.With("handler", "ingestSpindle") 475 l = l.With("nsid", e.Commit.Collection) 476 477 switch e.Commit.Operation { 478 case jmodels.CommitOperationCreate: 479 raw := json.RawMessage(e.Commit.Record) 480 record := tangled.Spindle{} 481 err = json.Unmarshal(raw, &record) 482 if err != nil { 483 l.Error("invalid record", "err", err) 484 return err 485 } 486 487 instance := e.Commit.RKey 488 489 ddb, ok := i.Db.Execer.(*db.DB) 490 if !ok { 491 return fmt.Errorf("failed to index profile record, invalid db cast") 492 } 493 494 err := db.AddSpindle(ddb, models.Spindle{ 495 Owner: syntax.DID(did), 496 Instance: instance, 497 }) 498 if err != nil { 499 l.Error("failed to add spindle to db", "err", err, "instance", instance) 500 return err 501 } 502 503 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 504 if err != nil { 505 l.Error("failed to add spindle to db", "err", err, "instance", instance) 506 return err 507 } 508 509 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 510 if err != nil { 511 return fmt.Errorf("failed to mark verified: %w", err) 512 } 513 514 return nil 515 516 case jmodels.CommitOperationDelete: 517 instance := e.Commit.RKey 518 519 ddb, ok := i.Db.Execer.(*db.DB) 520 if !ok { 521 return fmt.Errorf("failed to index profile record, invalid db cast") 522 } 523 524 // get record from db first 525 spindles, err := db.GetSpindles( 526 ddb, 527 orm.FilterEq("owner", did), 528 orm.FilterEq("instance", instance), 529 ) 530 if err != nil || len(spindles) != 1 { 531 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 532 } 533 spindle := spindles[0] 534 535 tx, err := ddb.Begin() 536 if err != nil { 537 return err 538 } 539 defer func() { 540 tx.Rollback() 541 i.Enforcer.E.LoadPolicy() 542 }() 543 544 // remove spindle members first 545 err = db.RemoveSpindleMember( 546 tx, 547 orm.FilterEq("owner", did), 548 orm.FilterEq("instance", instance), 549 ) 550 if err != nil { 551 return err 552 } 553 554 err = db.DeleteSpindle( 555 tx, 556 orm.FilterEq("owner", did), 557 orm.FilterEq("instance", instance), 558 ) 559 if err != nil { 560 return err 561 } 562 563 if spindle.Verified != nil { 564 err = i.Enforcer.RemoveSpindle(instance) 565 if err != nil { 566 return err 567 } 568 } 569 570 err = tx.Commit() 571 if err != nil { 572 return err 573 } 574 575 err = i.Enforcer.E.SavePolicy() 576 if err != nil { 577 return err 578 } 579 } 580 581 return nil 582} 583 584func (i *Ingester) ingestString(e *jmodels.Event) error { 585 did := e.Did 586 rkey := e.Commit.RKey 587 588 var err error 589 590 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 591 l.Info("ingesting record") 592 593 ddb, ok := i.Db.Execer.(*db.DB) 594 if !ok { 595 return fmt.Errorf("failed to index string record, invalid db cast") 596 } 597 598 switch e.Commit.Operation { 599 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 600 raw := json.RawMessage(e.Commit.Record) 601 record := tangled.String{} 602 err = json.Unmarshal(raw, &record) 603 if err != nil { 604 l.Error("invalid record", "err", err) 605 return err 606 } 607 608 string := models.StringFromRecord(did, rkey, record) 609 610 if err = i.Validator.ValidateString(&string); err != nil { 611 l.Error("invalid record", "err", err) 612 return err 613 } 614 615 if err = db.AddString(ddb, string); err != nil { 616 l.Error("failed to add string", "err", err) 617 return err 618 } 619 620 return nil 621 622 case jmodels.CommitOperationDelete: 623 if err := db.DeleteString( 624 ddb, 625 orm.FilterEq("did", did), 626 orm.FilterEq("rkey", rkey), 627 ); err != nil { 628 l.Error("failed to delete", "err", err) 629 return fmt.Errorf("failed to delete string record: %w", err) 630 } 631 632 return nil 633 } 634 635 return nil 636} 637 638func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 639 did := e.Did 640 var err error 641 642 l := i.Logger.With("handler", "ingestKnotMember") 643 l = l.With("nsid", e.Commit.Collection) 644 645 switch e.Commit.Operation { 646 case jmodels.CommitOperationCreate: 647 raw := json.RawMessage(e.Commit.Record) 648 record := tangled.KnotMember{} 649 err = json.Unmarshal(raw, &record) 650 if err != nil { 651 l.Error("invalid record", "err", err) 652 return err 653 } 654 655 // only knot owner can invite to knots 656 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 657 if err != nil || !ok { 658 return fmt.Errorf("failed to enforce permissions: %w", err) 659 } 660 661 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 662 if err != nil { 663 return err 664 } 665 666 if memberId.Handle.IsInvalidHandle() { 667 return err 668 } 669 670 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 671 if err != nil { 672 return fmt.Errorf("failed to update ACLs: %w", err) 673 } 674 675 l.Info("added knot member") 676 case jmodels.CommitOperationDelete: 677 // we don't store knot members in a table (like we do for spindle) 678 // and we can't remove this just yet. possibly fixed if we switch 679 // to either: 680 // 1. a knot_members table like with spindle and store the rkey 681 // 2. use the knot host as the rkey 682 // 683 // TODO: implement member deletion 684 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 685 } 686 687 return nil 688} 689 690func (i *Ingester) ingestKnot(e *jmodels.Event) error { 691 did := e.Did 692 var err error 693 694 l := i.Logger.With("handler", "ingestKnot") 695 l = l.With("nsid", e.Commit.Collection) 696 697 switch e.Commit.Operation { 698 case jmodels.CommitOperationCreate: 699 raw := json.RawMessage(e.Commit.Record) 700 record := tangled.Knot{} 701 err = json.Unmarshal(raw, &record) 702 if err != nil { 703 l.Error("invalid record", "err", err) 704 return err 705 } 706 707 domain := e.Commit.RKey 708 709 ddb, ok := i.Db.Execer.(*db.DB) 710 if !ok { 711 return fmt.Errorf("failed to index profile record, invalid db cast") 712 } 713 714 err := db.AddKnot(ddb, domain, did) 715 if err != nil { 716 l.Error("failed to add knot to db", "err", err, "domain", domain) 717 return err 718 } 719 720 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 721 if err != nil { 722 l.Error("failed to verify knot", "err", err, "domain", domain) 723 return err 724 } 725 726 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 727 if err != nil { 728 return fmt.Errorf("failed to mark verified: %w", err) 729 } 730 731 return nil 732 733 case jmodels.CommitOperationDelete: 734 domain := e.Commit.RKey 735 736 ddb, ok := i.Db.Execer.(*db.DB) 737 if !ok { 738 return fmt.Errorf("failed to index knot record, invalid db cast") 739 } 740 741 // get record from db first 742 registrations, err := db.GetRegistrations( 743 ddb, 744 orm.FilterEq("domain", domain), 745 orm.FilterEq("did", did), 746 ) 747 if err != nil { 748 return fmt.Errorf("failed to get registration: %w", err) 749 } 750 if len(registrations) != 1 { 751 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 752 } 753 registration := registrations[0] 754 755 tx, err := ddb.Begin() 756 if err != nil { 757 return err 758 } 759 defer func() { 760 tx.Rollback() 761 i.Enforcer.E.LoadPolicy() 762 }() 763 764 err = db.DeleteKnot( 765 tx, 766 orm.FilterEq("did", did), 767 orm.FilterEq("domain", domain), 768 ) 769 if err != nil { 770 return err 771 } 772 773 if registration.Registered != nil { 774 err = i.Enforcer.RemoveKnot(domain) 775 if err != nil { 776 return err 777 } 778 } 779 780 err = tx.Commit() 781 if err != nil { 782 return err 783 } 784 785 err = i.Enforcer.E.SavePolicy() 786 if err != nil { 787 return err 788 } 789 } 790 791 return nil 792} 793func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 794 did := e.Did 795 rkey := e.Commit.RKey 796 797 var err error 798 799 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 800 l.Info("ingesting record") 801 802 ddb, ok := i.Db.Execer.(*db.DB) 803 if !ok { 804 return fmt.Errorf("failed to index issue record, invalid db cast") 805 } 806 807 switch e.Commit.Operation { 808 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 809 raw := json.RawMessage(e.Commit.Record) 810 record := tangled.RepoIssue{} 811 err = json.Unmarshal(raw, &record) 812 if err != nil { 813 l.Error("invalid record", "err", err) 814 return err 815 } 816 817 issue := models.IssueFromRecord(did, rkey, record) 818 819 if err := i.Validator.ValidateIssue(&issue); err != nil { 820 return fmt.Errorf("failed to validate issue: %w", err) 821 } 822 823 tx, err := ddb.BeginTx(ctx, nil) 824 if err != nil { 825 l.Error("failed to begin transaction", "err", err) 826 return err 827 } 828 defer tx.Rollback() 829 830 err = db.PutIssue(tx, &issue) 831 if err != nil { 832 l.Error("failed to create issue", "err", err) 833 return err 834 } 835 836 err = tx.Commit() 837 if err != nil { 838 l.Error("failed to commit txn", "err", err) 839 return err 840 } 841 842 return nil 843 844 case jmodels.CommitOperationDelete: 845 tx, err := ddb.BeginTx(ctx, nil) 846 if err != nil { 847 l.Error("failed to begin transaction", "err", err) 848 return err 849 } 850 defer tx.Rollback() 851 852 if err := db.DeleteIssues( 853 tx, 854 did, 855 rkey, 856 ); err != nil { 857 l.Error("failed to delete", "err", err) 858 return fmt.Errorf("failed to delete issue record: %w", err) 859 } 860 if err := tx.Commit(); err != nil { 861 l.Error("failed to commit txn", "err", err) 862 return err 863 } 864 865 return nil 866 } 867 868 return nil 869} 870 871func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 872 did := e.Did 873 rkey := e.Commit.RKey 874 875 var err error 876 877 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 878 l.Info("ingesting record") 879 880 ddb, ok := i.Db.Execer.(*db.DB) 881 if !ok { 882 return fmt.Errorf("failed to index issue comment record, invalid db cast") 883 } 884 885 switch e.Commit.Operation { 886 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 887 raw := json.RawMessage(e.Commit.Record) 888 record := tangled.RepoIssueComment{} 889 err = json.Unmarshal(raw, &record) 890 if err != nil { 891 return fmt.Errorf("invalid record: %w", err) 892 } 893 894 comment, err := models.IssueCommentFromRecord(did, rkey, record) 895 if err != nil { 896 return fmt.Errorf("failed to parse comment from record: %w", err) 897 } 898 899 if err := i.Validator.ValidateIssueComment(comment); err != nil { 900 return fmt.Errorf("failed to validate comment: %w", err) 901 } 902 903 tx, err := ddb.Begin() 904 if err != nil { 905 return fmt.Errorf("failed to start transaction: %w", err) 906 } 907 defer tx.Rollback() 908 909 _, err = db.AddIssueComment(tx, *comment) 910 if err != nil { 911 return fmt.Errorf("failed to create issue comment: %w", err) 912 } 913 914 return tx.Commit() 915 916 case jmodels.CommitOperationDelete: 917 if err := db.DeleteIssueComments( 918 ddb, 919 orm.FilterEq("did", did), 920 orm.FilterEq("rkey", rkey), 921 ); err != nil { 922 return fmt.Errorf("failed to delete issue comment record: %w", err) 923 } 924 925 return nil 926 } 927 928 return nil 929} 930 931func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 932 did := e.Did 933 rkey := e.Commit.RKey 934 935 var err error 936 937 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 938 l.Info("ingesting record") 939 940 ddb, ok := i.Db.Execer.(*db.DB) 941 if !ok { 942 return fmt.Errorf("failed to index label definition, invalid db cast") 943 } 944 945 switch e.Commit.Operation { 946 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 947 raw := json.RawMessage(e.Commit.Record) 948 record := tangled.LabelDefinition{} 949 err = json.Unmarshal(raw, &record) 950 if err != nil { 951 return fmt.Errorf("invalid record: %w", err) 952 } 953 954 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 955 if err != nil { 956 return fmt.Errorf("failed to parse labeldef from record: %w", err) 957 } 958 959 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 960 return fmt.Errorf("failed to validate labeldef: %w", err) 961 } 962 963 _, err = db.AddLabelDefinition(ddb, def) 964 if err != nil { 965 return fmt.Errorf("failed to create labeldef: %w", err) 966 } 967 968 return nil 969 970 case jmodels.CommitOperationDelete: 971 if err := db.DeleteLabelDefinition( 972 ddb, 973 orm.FilterEq("did", did), 974 orm.FilterEq("rkey", rkey), 975 ); err != nil { 976 return fmt.Errorf("failed to delete labeldef record: %w", err) 977 } 978 979 return nil 980 } 981 982 return nil 983} 984 985func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 986 did := e.Did 987 rkey := e.Commit.RKey 988 989 var err error 990 991 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 992 l.Info("ingesting record") 993 994 ddb, ok := i.Db.Execer.(*db.DB) 995 if !ok { 996 return fmt.Errorf("failed to index label op, invalid db cast") 997 } 998 999 switch e.Commit.Operation { 1000 case jmodels.CommitOperationCreate: 1001 raw := json.RawMessage(e.Commit.Record) 1002 record := tangled.LabelOp{} 1003 err = json.Unmarshal(raw, &record) 1004 if err != nil { 1005 return fmt.Errorf("invalid record: %w", err) 1006 } 1007 1008 subject := syntax.ATURI(record.Subject) 1009 collection := subject.Collection() 1010 1011 var repo *models.Repo 1012 switch collection { 1013 case tangled.RepoIssueNSID: 1014 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1015 if err != nil || len(i) != 1 { 1016 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1017 } 1018 repo = i[0].Repo 1019 default: 1020 return fmt.Errorf("unsupport label subject: %s", collection) 1021 } 1022 1023 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1024 if err != nil { 1025 return fmt.Errorf("failed to build label application ctx: %w", err) 1026 } 1027 1028 ops := models.LabelOpsFromRecord(did, rkey, record) 1029 1030 for _, o := range ops { 1031 def, ok := actx.Defs[o.OperandKey] 1032 if !ok { 1033 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1034 } 1035 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1036 return fmt.Errorf("failed to validate labelop: %w", err) 1037 } 1038 } 1039 1040 tx, err := ddb.Begin() 1041 if err != nil { 1042 return err 1043 } 1044 defer tx.Rollback() 1045 1046 for _, o := range ops { 1047 _, err = db.AddLabelOp(tx, &o) 1048 if err != nil { 1049 return fmt.Errorf("failed to add labelop: %w", err) 1050 } 1051 } 1052 1053 if err = tx.Commit(); err != nil { 1054 return err 1055 } 1056 } 1057 1058 return nil 1059}