forked from tangled.org/core
this repo has no description
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "github.com/bluesky-social/jetstream/pkg/models" 13 "github.com/go-git/go-git/v5/plumbing" 14 "github.com/ipfs/go-cid" 15 "tangled.sh/tangled.sh/core/api/tangled" 16 "tangled.sh/tangled.sh/core/appview/config" 17 "tangled.sh/tangled.sh/core/appview/db" 18 "tangled.sh/tangled.sh/core/appview/pages/markup" 19 "tangled.sh/tangled.sh/core/appview/serververify" 20 "tangled.sh/tangled.sh/core/idresolver" 21 "tangled.sh/tangled.sh/core/rbac" 22) 23 24type Ingester struct { 25 Db db.DbWrapper 26 Enforcer *rbac.Enforcer 27 IdResolver *idresolver.Resolver 28 Config *config.Config 29 Logger *slog.Logger 30} 31 32type processFunc func(ctx context.Context, e *models.Event) error 33 34func (i *Ingester) Ingest() processFunc { 35 return func(ctx context.Context, e *models.Event) error { 36 var err error 37 defer func() { 38 eventTime := e.TimeUS 39 lastTimeUs := eventTime + 1 40 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 41 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 42 } 43 }() 44 45 l := i.Logger.With("kind", e.Kind) 46 switch e.Kind { 47 case models.EventKindAccount: 48 if !e.Account.Active && *e.Account.Status == "deactivated" { 49 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 50 } 51 case models.EventKindIdentity: 52 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 53 case models.EventKindCommit: 54 switch e.Commit.Collection { 55 case tangled.GraphFollowNSID: 56 err = i.ingestFollow(e) 57 case tangled.FeedStarNSID: 58 err = i.ingestStar(e) 59 case tangled.PublicKeyNSID: 60 err = i.ingestPublicKey(e) 61 case tangled.RepoArtifactNSID: 62 err = i.ingestArtifact(e) 63 case tangled.ActorProfileNSID: 64 err = i.ingestProfile(e) 65 case tangled.SpindleMemberNSID: 66 err = i.ingestSpindleMember(ctx, e) 67 case tangled.SpindleNSID: 68 err = i.ingestSpindle(ctx, e) 69 case tangled.KnotMemberNSID: 70 err = i.ingestKnotMember(e) 71 case tangled.KnotNSID: 72 err = i.ingestKnot(e) 73 case tangled.StringNSID: 74 err = i.ingestString(e) 75 case tangled.RepoIssueNSID: 76 err = i.ingestIssue(ctx, e) 77 case tangled.RepoIssueCommentNSID: 78 err = i.ingestIssueComment(e) 79 } 80 l = i.Logger.With("nsid", e.Commit.Collection) 81 } 82 83 if err != nil { 84 l.Debug("error ingesting record", "err", err) 85 } 86 87 return nil 88 } 89} 90 91func (i *Ingester) ingestStar(e *models.Event) error { 92 var err error 93 did := e.Did 94 95 l := i.Logger.With("handler", "ingestStar") 96 l = l.With("nsid", e.Commit.Collection) 97 98 switch e.Commit.Operation { 99 case models.CommitOperationCreate, models.CommitOperationUpdate: 100 var subjectUri syntax.ATURI 101 102 raw := json.RawMessage(e.Commit.Record) 103 record := tangled.FeedStar{} 104 err := json.Unmarshal(raw, &record) 105 if err != nil { 106 l.Error("invalid record", "err", err) 107 return err 108 } 109 110 subjectUri, err = syntax.ParseATURI(record.Subject) 111 if err != nil { 112 l.Error("invalid record", "err", err) 113 return err 114 } 115 err = db.AddStar(i.Db, &db.Star{ 116 StarredByDid: did, 117 RepoAt: subjectUri, 118 Rkey: e.Commit.RKey, 119 }) 120 case models.CommitOperationDelete: 121 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 122 } 123 124 if err != nil { 125 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 126 } 127 128 return nil 129} 130 131func (i *Ingester) ingestFollow(e *models.Event) error { 132 var err error 133 did := e.Did 134 135 l := i.Logger.With("handler", "ingestFollow") 136 l = l.With("nsid", e.Commit.Collection) 137 138 switch e.Commit.Operation { 139 case models.CommitOperationCreate, models.CommitOperationUpdate: 140 raw := json.RawMessage(e.Commit.Record) 141 record := tangled.GraphFollow{} 142 err = json.Unmarshal(raw, &record) 143 if err != nil { 144 l.Error("invalid record", "err", err) 145 return err 146 } 147 148 err = db.AddFollow(i.Db, &db.Follow{ 149 UserDid: did, 150 SubjectDid: record.Subject, 151 Rkey: e.Commit.RKey, 152 }) 153 case models.CommitOperationDelete: 154 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 155 } 156 157 if err != nil { 158 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 159 } 160 161 return nil 162} 163 164func (i *Ingester) ingestPublicKey(e *models.Event) error { 165 did := e.Did 166 var err error 167 168 l := i.Logger.With("handler", "ingestPublicKey") 169 l = l.With("nsid", e.Commit.Collection) 170 171 switch e.Commit.Operation { 172 case models.CommitOperationCreate, models.CommitOperationUpdate: 173 l.Debug("processing add of pubkey") 174 raw := json.RawMessage(e.Commit.Record) 175 record := tangled.PublicKey{} 176 err = json.Unmarshal(raw, &record) 177 if err != nil { 178 l.Error("invalid record", "err", err) 179 return err 180 } 181 182 name := record.Name 183 key := record.Key 184 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 185 case models.CommitOperationDelete: 186 l.Debug("processing delete of pubkey") 187 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 188 } 189 190 if err != nil { 191 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 192 } 193 194 return nil 195} 196 197func (i *Ingester) ingestArtifact(e *models.Event) error { 198 did := e.Did 199 var err error 200 201 l := i.Logger.With("handler", "ingestArtifact") 202 l = l.With("nsid", e.Commit.Collection) 203 204 switch e.Commit.Operation { 205 case models.CommitOperationCreate, models.CommitOperationUpdate: 206 raw := json.RawMessage(e.Commit.Record) 207 record := tangled.RepoArtifact{} 208 err = json.Unmarshal(raw, &record) 209 if err != nil { 210 l.Error("invalid record", "err", err) 211 return err 212 } 213 214 repoAt, err := syntax.ParseATURI(record.Repo) 215 if err != nil { 216 return err 217 } 218 219 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 220 if err != nil { 221 return err 222 } 223 224 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 225 if err != nil || !ok { 226 return err 227 } 228 229 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 230 if err != nil { 231 createdAt = time.Now() 232 } 233 234 artifact := db.Artifact{ 235 Did: did, 236 Rkey: e.Commit.RKey, 237 RepoAt: repoAt, 238 Tag: plumbing.Hash(record.Tag), 239 CreatedAt: createdAt, 240 BlobCid: cid.Cid(record.Artifact.Ref), 241 Name: record.Name, 242 Size: uint64(record.Artifact.Size), 243 MimeType: record.Artifact.MimeType, 244 } 245 246 err = db.AddArtifact(i.Db, artifact) 247 case models.CommitOperationDelete: 248 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 249 } 250 251 if err != nil { 252 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 253 } 254 255 return nil 256} 257 258func (i *Ingester) ingestProfile(e *models.Event) error { 259 did := e.Did 260 var err error 261 262 l := i.Logger.With("handler", "ingestProfile") 263 l = l.With("nsid", e.Commit.Collection) 264 265 if e.Commit.RKey != "self" { 266 return fmt.Errorf("ingestProfile only ingests `self` record") 267 } 268 269 switch e.Commit.Operation { 270 case models.CommitOperationCreate, models.CommitOperationUpdate: 271 raw := json.RawMessage(e.Commit.Record) 272 record := tangled.ActorProfile{} 273 err = json.Unmarshal(raw, &record) 274 if err != nil { 275 l.Error("invalid record", "err", err) 276 return err 277 } 278 279 description := "" 280 if record.Description != nil { 281 description = *record.Description 282 } 283 284 includeBluesky := record.Bluesky 285 286 location := "" 287 if record.Location != nil { 288 location = *record.Location 289 } 290 291 var links [5]string 292 for i, l := range record.Links { 293 if i < 5 { 294 links[i] = l 295 } 296 } 297 298 var stats [2]db.VanityStat 299 for i, s := range record.Stats { 300 if i < 2 { 301 stats[i].Kind = db.VanityStatKind(s) 302 } 303 } 304 305 var pinned [6]syntax.ATURI 306 for i, r := range record.PinnedRepositories { 307 if i < 6 { 308 pinned[i] = syntax.ATURI(r) 309 } 310 } 311 312 profile := db.Profile{ 313 Did: did, 314 Description: description, 315 IncludeBluesky: includeBluesky, 316 Location: location, 317 Links: links, 318 Stats: stats, 319 PinnedRepos: pinned, 320 } 321 322 ddb, ok := i.Db.Execer.(*db.DB) 323 if !ok { 324 return fmt.Errorf("failed to index profile record, invalid db cast") 325 } 326 327 tx, err := ddb.Begin() 328 if err != nil { 329 return fmt.Errorf("failed to start transaction") 330 } 331 332 err = db.ValidateProfile(tx, &profile) 333 if err != nil { 334 return fmt.Errorf("invalid profile record") 335 } 336 337 err = db.UpsertProfile(tx, &profile) 338 case models.CommitOperationDelete: 339 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 340 } 341 342 if err != nil { 343 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 344 } 345 346 return nil 347} 348 349func (i *Ingester) ingestSpindleMember(ctx context.Context, e *models.Event) error { 350 did := e.Did 351 var err error 352 353 l := i.Logger.With("handler", "ingestSpindleMember") 354 l = l.With("nsid", e.Commit.Collection) 355 356 switch e.Commit.Operation { 357 case models.CommitOperationCreate: 358 raw := json.RawMessage(e.Commit.Record) 359 record := tangled.SpindleMember{} 360 err = json.Unmarshal(raw, &record) 361 if err != nil { 362 l.Error("invalid record", "err", err) 363 return err 364 } 365 366 // only spindle owner can invite to spindles 367 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 368 if err != nil || !ok { 369 return fmt.Errorf("failed to enforce permissions: %w", err) 370 } 371 372 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 373 if err != nil { 374 return err 375 } 376 377 if memberId.Handle.IsInvalidHandle() { 378 return err 379 } 380 381 ddb, ok := i.Db.Execer.(*db.DB) 382 if !ok { 383 return fmt.Errorf("failed to index profile record, invalid db cast") 384 } 385 386 err = db.AddSpindleMember(ddb, db.SpindleMember{ 387 Did: syntax.DID(did), 388 Rkey: e.Commit.RKey, 389 Instance: record.Instance, 390 Subject: memberId.DID, 391 }) 392 if !ok { 393 return fmt.Errorf("failed to add to db: %w", err) 394 } 395 396 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 397 if err != nil { 398 return fmt.Errorf("failed to update ACLs: %w", err) 399 } 400 401 l.Info("added spindle member") 402 case models.CommitOperationDelete: 403 rkey := e.Commit.RKey 404 405 ddb, ok := i.Db.Execer.(*db.DB) 406 if !ok { 407 return fmt.Errorf("failed to index profile record, invalid db cast") 408 } 409 410 // get record from db first 411 members, err := db.GetSpindleMembers( 412 ddb, 413 db.FilterEq("did", did), 414 db.FilterEq("rkey", rkey), 415 ) 416 if err != nil || len(members) != 1 { 417 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 418 } 419 member := members[0] 420 421 tx, err := ddb.Begin() 422 if err != nil { 423 return fmt.Errorf("failed to start txn: %w", err) 424 } 425 426 // remove record by rkey && update enforcer 427 if err = db.RemoveSpindleMember( 428 tx, 429 db.FilterEq("did", did), 430 db.FilterEq("rkey", rkey), 431 ); err != nil { 432 return fmt.Errorf("failed to remove from db: %w", err) 433 } 434 435 // update enforcer 436 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 437 if err != nil { 438 return fmt.Errorf("failed to update ACLs: %w", err) 439 } 440 441 if err = tx.Commit(); err != nil { 442 return fmt.Errorf("failed to commit txn: %w", err) 443 } 444 445 if err = i.Enforcer.E.SavePolicy(); err != nil { 446 return fmt.Errorf("failed to save ACLs: %w", err) 447 } 448 449 l.Info("removed spindle member") 450 } 451 452 return nil 453} 454 455func (i *Ingester) ingestSpindle(ctx context.Context, e *models.Event) error { 456 did := e.Did 457 var err error 458 459 l := i.Logger.With("handler", "ingestSpindle") 460 l = l.With("nsid", e.Commit.Collection) 461 462 switch e.Commit.Operation { 463 case models.CommitOperationCreate: 464 raw := json.RawMessage(e.Commit.Record) 465 record := tangled.Spindle{} 466 err = json.Unmarshal(raw, &record) 467 if err != nil { 468 l.Error("invalid record", "err", err) 469 return err 470 } 471 472 instance := e.Commit.RKey 473 474 ddb, ok := i.Db.Execer.(*db.DB) 475 if !ok { 476 return fmt.Errorf("failed to index profile record, invalid db cast") 477 } 478 479 err := db.AddSpindle(ddb, db.Spindle{ 480 Owner: syntax.DID(did), 481 Instance: instance, 482 }) 483 if err != nil { 484 l.Error("failed to add spindle to db", "err", err, "instance", instance) 485 return err 486 } 487 488 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 489 if err != nil { 490 l.Error("failed to add spindle to db", "err", err, "instance", instance) 491 return err 492 } 493 494 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 495 if err != nil { 496 return fmt.Errorf("failed to mark verified: %w", err) 497 } 498 499 return nil 500 501 case models.CommitOperationDelete: 502 instance := e.Commit.RKey 503 504 ddb, ok := i.Db.Execer.(*db.DB) 505 if !ok { 506 return fmt.Errorf("failed to index profile record, invalid db cast") 507 } 508 509 // get record from db first 510 spindles, err := db.GetSpindles( 511 ddb, 512 db.FilterEq("owner", did), 513 db.FilterEq("instance", instance), 514 ) 515 if err != nil || len(spindles) != 1 { 516 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 517 } 518 spindle := spindles[0] 519 520 tx, err := ddb.Begin() 521 if err != nil { 522 return err 523 } 524 defer func() { 525 tx.Rollback() 526 i.Enforcer.E.LoadPolicy() 527 }() 528 529 // remove spindle members first 530 err = db.RemoveSpindleMember( 531 tx, 532 db.FilterEq("owner", did), 533 db.FilterEq("instance", instance), 534 ) 535 if err != nil { 536 return err 537 } 538 539 err = db.DeleteSpindle( 540 tx, 541 db.FilterEq("owner", did), 542 db.FilterEq("instance", instance), 543 ) 544 if err != nil { 545 return err 546 } 547 548 if spindle.Verified != nil { 549 err = i.Enforcer.RemoveSpindle(instance) 550 if err != nil { 551 return err 552 } 553 } 554 555 err = tx.Commit() 556 if err != nil { 557 return err 558 } 559 560 err = i.Enforcer.E.SavePolicy() 561 if err != nil { 562 return err 563 } 564 } 565 566 return nil 567} 568 569func (i *Ingester) ingestString(e *models.Event) error { 570 did := e.Did 571 rkey := e.Commit.RKey 572 573 var err error 574 575 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 576 l.Info("ingesting record") 577 578 ddb, ok := i.Db.Execer.(*db.DB) 579 if !ok { 580 return fmt.Errorf("failed to index string record, invalid db cast") 581 } 582 583 switch e.Commit.Operation { 584 case models.CommitOperationCreate, models.CommitOperationUpdate: 585 raw := json.RawMessage(e.Commit.Record) 586 record := tangled.String{} 587 err = json.Unmarshal(raw, &record) 588 if err != nil { 589 l.Error("invalid record", "err", err) 590 return err 591 } 592 593 string := db.StringFromRecord(did, rkey, record) 594 595 if err = string.Validate(); err != nil { 596 l.Error("invalid record", "err", err) 597 return err 598 } 599 600 if err = db.AddString(ddb, string); err != nil { 601 l.Error("failed to add string", "err", err) 602 return err 603 } 604 605 return nil 606 607 case models.CommitOperationDelete: 608 if err := db.DeleteString( 609 ddb, 610 db.FilterEq("did", did), 611 db.FilterEq("rkey", rkey), 612 ); err != nil { 613 l.Error("failed to delete", "err", err) 614 return fmt.Errorf("failed to delete string record: %w", err) 615 } 616 617 return nil 618 } 619 620 return nil 621} 622 623func (i *Ingester) ingestKnotMember(e *models.Event) error { 624 did := e.Did 625 var err error 626 627 l := i.Logger.With("handler", "ingestKnotMember") 628 l = l.With("nsid", e.Commit.Collection) 629 630 switch e.Commit.Operation { 631 case models.CommitOperationCreate: 632 raw := json.RawMessage(e.Commit.Record) 633 record := tangled.KnotMember{} 634 err = json.Unmarshal(raw, &record) 635 if err != nil { 636 l.Error("invalid record", "err", err) 637 return err 638 } 639 640 // only knot owner can invite to knots 641 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 642 if err != nil || !ok { 643 return fmt.Errorf("failed to enforce permissions: %w", err) 644 } 645 646 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 647 if err != nil { 648 return err 649 } 650 651 if memberId.Handle.IsInvalidHandle() { 652 return err 653 } 654 655 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 656 if err != nil { 657 return fmt.Errorf("failed to update ACLs: %w", err) 658 } 659 660 l.Info("added knot member") 661 case models.CommitOperationDelete: 662 // we don't store knot members in a table (like we do for spindle) 663 // and we can't remove this just yet. possibly fixed if we switch 664 // to either: 665 // 1. a knot_members table like with spindle and store the rkey 666 // 2. use the knot host as the rkey 667 // 668 // TODO: implement member deletion 669 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 670 } 671 672 return nil 673} 674 675func (i *Ingester) ingestKnot(e *models.Event) error { 676 did := e.Did 677 var err error 678 679 l := i.Logger.With("handler", "ingestKnot") 680 l = l.With("nsid", e.Commit.Collection) 681 682 switch e.Commit.Operation { 683 case models.CommitOperationCreate: 684 raw := json.RawMessage(e.Commit.Record) 685 record := tangled.Knot{} 686 err = json.Unmarshal(raw, &record) 687 if err != nil { 688 l.Error("invalid record", "err", err) 689 return err 690 } 691 692 domain := e.Commit.RKey 693 694 ddb, ok := i.Db.Execer.(*db.DB) 695 if !ok { 696 return fmt.Errorf("failed to index profile record, invalid db cast") 697 } 698 699 err := db.AddKnot(ddb, domain, did) 700 if err != nil { 701 l.Error("failed to add knot to db", "err", err, "domain", domain) 702 return err 703 } 704 705 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 706 if err != nil { 707 l.Error("failed to verify knot", "err", err, "domain", domain) 708 return err 709 } 710 711 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 712 if err != nil { 713 return fmt.Errorf("failed to mark verified: %w", err) 714 } 715 716 return nil 717 718 case models.CommitOperationDelete: 719 domain := e.Commit.RKey 720 721 ddb, ok := i.Db.Execer.(*db.DB) 722 if !ok { 723 return fmt.Errorf("failed to index knot record, invalid db cast") 724 } 725 726 // get record from db first 727 registrations, err := db.GetRegistrations( 728 ddb, 729 db.FilterEq("domain", domain), 730 db.FilterEq("did", did), 731 ) 732 if err != nil { 733 return fmt.Errorf("failed to get registration: %w", err) 734 } 735 if len(registrations) != 1 { 736 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 737 } 738 registration := registrations[0] 739 740 tx, err := ddb.Begin() 741 if err != nil { 742 return err 743 } 744 defer func() { 745 tx.Rollback() 746 i.Enforcer.E.LoadPolicy() 747 }() 748 749 err = db.DeleteKnot( 750 tx, 751 db.FilterEq("did", did), 752 db.FilterEq("domain", domain), 753 ) 754 if err != nil { 755 return err 756 } 757 758 if registration.Registered != nil { 759 err = i.Enforcer.RemoveKnot(domain) 760 if err != nil { 761 return err 762 } 763 } 764 765 err = tx.Commit() 766 if err != nil { 767 return err 768 } 769 770 err = i.Enforcer.E.SavePolicy() 771 if err != nil { 772 return err 773 } 774 } 775 776 return nil 777} 778func (i *Ingester) ingestIssue(ctx context.Context, e *models.Event) error { 779 did := e.Did 780 rkey := e.Commit.RKey 781 782 var err error 783 784 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 785 l.Info("ingesting record") 786 787 ddb, ok := i.Db.Execer.(*db.DB) 788 if !ok { 789 return fmt.Errorf("failed to index issue record, invalid db cast") 790 } 791 792 switch e.Commit.Operation { 793 case models.CommitOperationCreate: 794 raw := json.RawMessage(e.Commit.Record) 795 record := tangled.RepoIssue{} 796 err = json.Unmarshal(raw, &record) 797 if err != nil { 798 l.Error("invalid record", "err", err) 799 return err 800 } 801 802 issue := db.IssueFromRecord(did, rkey, record) 803 804 sanitizer := markup.NewSanitizer() 805 if st := strings.TrimSpace(sanitizer.SanitizeDescription(issue.Title)); st == "" { 806 return fmt.Errorf("title is empty after HTML sanitization") 807 } 808 if sb := strings.TrimSpace(sanitizer.SanitizeDefault(issue.Body)); sb == "" { 809 return fmt.Errorf("body is empty after HTML sanitization") 810 } 811 812 tx, err := ddb.BeginTx(ctx, nil) 813 if err != nil { 814 l.Error("failed to begin transaction", "err", err) 815 return err 816 } 817 818 err = db.NewIssue(tx, &issue) 819 if err != nil { 820 l.Error("failed to create issue", "err", err) 821 return err 822 } 823 824 return nil 825 826 case models.CommitOperationUpdate: 827 raw := json.RawMessage(e.Commit.Record) 828 record := tangled.RepoIssue{} 829 err = json.Unmarshal(raw, &record) 830 if err != nil { 831 l.Error("invalid record", "err", err) 832 return err 833 } 834 835 body := "" 836 if record.Body != nil { 837 body = *record.Body 838 } 839 840 sanitizer := markup.NewSanitizer() 841 if st := strings.TrimSpace(sanitizer.SanitizeDescription(record.Title)); st == "" { 842 return fmt.Errorf("title is empty after HTML sanitization") 843 } 844 if sb := strings.TrimSpace(sanitizer.SanitizeDefault(body)); sb == "" { 845 return fmt.Errorf("body is empty after HTML sanitization") 846 } 847 848 err = db.UpdateIssueByRkey(ddb, did, rkey, record.Title, body) 849 if err != nil { 850 l.Error("failed to update issue", "err", err) 851 return err 852 } 853 854 return nil 855 856 case models.CommitOperationDelete: 857 if err := db.DeleteIssueByRkey(ddb, did, rkey); err != nil { 858 l.Error("failed to delete", "err", err) 859 return fmt.Errorf("failed to delete issue record: %w", err) 860 } 861 862 return nil 863 } 864 865 return fmt.Errorf("unknown operation: %s", e.Commit.Operation) 866} 867 868func (i *Ingester) ingestIssueComment(e *models.Event) error { 869 did := e.Did 870 rkey := e.Commit.RKey 871 872 var err error 873 874 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 875 l.Info("ingesting record") 876 877 ddb, ok := i.Db.Execer.(*db.DB) 878 if !ok { 879 return fmt.Errorf("failed to index issue comment record, invalid db cast") 880 } 881 882 switch e.Commit.Operation { 883 case models.CommitOperationCreate: 884 raw := json.RawMessage(e.Commit.Record) 885 record := tangled.RepoIssueComment{} 886 err = json.Unmarshal(raw, &record) 887 if err != nil { 888 l.Error("invalid record", "err", err) 889 return err 890 } 891 892 comment, err := db.IssueCommentFromRecord(ddb, did, rkey, record) 893 if err != nil { 894 l.Error("failed to parse comment from record", "err", err) 895 return err 896 } 897 898 sanitizer := markup.NewSanitizer() 899 if sb := strings.TrimSpace(sanitizer.SanitizeDefault(comment.Body)); sb == "" { 900 return fmt.Errorf("body is empty after HTML sanitization") 901 } 902 903 err = db.NewIssueComment(ddb, &comment) 904 if err != nil { 905 l.Error("failed to create issue comment", "err", err) 906 return err 907 } 908 909 return nil 910 911 case models.CommitOperationUpdate: 912 raw := json.RawMessage(e.Commit.Record) 913 record := tangled.RepoIssueComment{} 914 err = json.Unmarshal(raw, &record) 915 if err != nil { 916 l.Error("invalid record", "err", err) 917 return err 918 } 919 920 sanitizer := markup.NewSanitizer() 921 if sb := strings.TrimSpace(sanitizer.SanitizeDefault(record.Body)); sb == "" { 922 return fmt.Errorf("body is empty after HTML sanitization") 923 } 924 925 err = db.UpdateCommentByRkey(ddb, did, rkey, record.Body) 926 if err != nil { 927 l.Error("failed to update issue comment", "err", err) 928 return err 929 } 930 931 return nil 932 933 case models.CommitOperationDelete: 934 if err := db.DeleteCommentByRkey(ddb, did, rkey); err != nil { 935 l.Error("failed to delete", "err", err) 936 return fmt.Errorf("failed to delete issue comment record: %w", err) 937 } 938 939 return nil 940 } 941 942 return fmt.Errorf("unknown operation: %s", e.Commit.Operation) 943}