forked from tangled.org/core
this repo has no description
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/go-git/go-git/v5/plumbing" 13 "github.com/ipfs/go-cid" 14 "tangled.sh/tangled.sh/core/api/tangled" 15 "tangled.sh/tangled.sh/core/appview/config" 16 "tangled.sh/tangled.sh/core/appview/db" 17 "tangled.sh/tangled.sh/core/appview/serververify" 18 "tangled.sh/tangled.sh/core/idresolver" 19 "tangled.sh/tangled.sh/core/rbac" 20) 21 22type Ingester struct { 23 Db db.DbWrapper 24 Enforcer *rbac.Enforcer 25 IdResolver *idresolver.Resolver 26 Config *config.Config 27 Logger *slog.Logger 28} 29 30type processFunc func(ctx context.Context, e *models.Event) error 31 32func (i *Ingester) Ingest() processFunc { 33 return func(ctx context.Context, e *models.Event) error { 34 var err error 35 defer func() { 36 eventTime := e.TimeUS 37 lastTimeUs := eventTime + 1 38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 40 } 41 }() 42 43 l := i.Logger.With("kind", e.Kind) 44 switch e.Kind { 45 case models.EventKindAccount: 46 if !e.Account.Active && *e.Account.Status == "deactivated" { 47 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 48 } 49 case models.EventKindIdentity: 50 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 51 case models.EventKindCommit: 52 switch e.Commit.Collection { 53 case tangled.GraphFollowNSID: 54 err = i.ingestFollow(e) 55 case tangled.FeedStarNSID: 56 err = i.ingestStar(e) 57 case tangled.PublicKeyNSID: 58 err = i.ingestPublicKey(e) 59 case tangled.RepoArtifactNSID: 60 err = i.ingestArtifact(e) 61 case tangled.ActorProfileNSID: 62 err = i.ingestProfile(e) 63 case tangled.SpindleMemberNSID: 64 err = i.ingestSpindleMember(e) 65 case tangled.SpindleNSID: 66 err = i.ingestSpindle(e) 67 case tangled.KnotMemberNSID: 68 err = i.ingestKnotMember(e) 69 case tangled.KnotNSID: 70 err = i.ingestKnot(e) 71 case tangled.StringNSID: 72 err = i.ingestString(e) 73 } 74 l = i.Logger.With("nsid", e.Commit.Collection) 75 } 76 77 if err != nil { 78 l.Debug("error ingesting record", "err", err) 79 } 80 81 return nil 82 } 83} 84 85func (i *Ingester) ingestStar(e *models.Event) error { 86 var err error 87 did := e.Did 88 89 l := i.Logger.With("handler", "ingestStar") 90 l = l.With("nsid", e.Commit.Collection) 91 92 switch e.Commit.Operation { 93 case models.CommitOperationCreate, models.CommitOperationUpdate: 94 var subjectUri syntax.ATURI 95 96 raw := json.RawMessage(e.Commit.Record) 97 record := tangled.FeedStar{} 98 err := json.Unmarshal(raw, &record) 99 if err != nil { 100 l.Error("invalid record", "err", err) 101 return err 102 } 103 104 subjectUri, err = syntax.ParseATURI(record.Subject) 105 if err != nil { 106 l.Error("invalid record", "err", err) 107 return err 108 } 109 err = db.AddStar(i.Db, &db.Star{ 110 StarredByDid: did, 111 RepoAt: subjectUri, 112 Rkey: e.Commit.RKey, 113 }) 114 case models.CommitOperationDelete: 115 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 116 } 117 118 if err != nil { 119 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 120 } 121 122 return nil 123} 124 125func (i *Ingester) ingestFollow(e *models.Event) error { 126 var err error 127 did := e.Did 128 129 l := i.Logger.With("handler", "ingestFollow") 130 l = l.With("nsid", e.Commit.Collection) 131 132 switch e.Commit.Operation { 133 case models.CommitOperationCreate, models.CommitOperationUpdate: 134 raw := json.RawMessage(e.Commit.Record) 135 record := tangled.GraphFollow{} 136 err = json.Unmarshal(raw, &record) 137 if err != nil { 138 l.Error("invalid record", "err", err) 139 return err 140 } 141 142 err = db.AddFollow(i.Db, &db.Follow{ 143 UserDid: did, 144 SubjectDid: record.Subject, 145 Rkey: e.Commit.RKey, 146 }) 147 case models.CommitOperationDelete: 148 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 149 } 150 151 if err != nil { 152 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 153 } 154 155 return nil 156} 157 158func (i *Ingester) ingestPublicKey(e *models.Event) error { 159 did := e.Did 160 var err error 161 162 l := i.Logger.With("handler", "ingestPublicKey") 163 l = l.With("nsid", e.Commit.Collection) 164 165 switch e.Commit.Operation { 166 case models.CommitOperationCreate, models.CommitOperationUpdate: 167 l.Debug("processing add of pubkey") 168 raw := json.RawMessage(e.Commit.Record) 169 record := tangled.PublicKey{} 170 err = json.Unmarshal(raw, &record) 171 if err != nil { 172 l.Error("invalid record", "err", err) 173 return err 174 } 175 176 name := record.Name 177 key := record.Key 178 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 179 case models.CommitOperationDelete: 180 l.Debug("processing delete of pubkey") 181 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 182 } 183 184 if err != nil { 185 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 186 } 187 188 return nil 189} 190 191func (i *Ingester) ingestArtifact(e *models.Event) error { 192 did := e.Did 193 var err error 194 195 l := i.Logger.With("handler", "ingestArtifact") 196 l = l.With("nsid", e.Commit.Collection) 197 198 switch e.Commit.Operation { 199 case models.CommitOperationCreate, models.CommitOperationUpdate: 200 raw := json.RawMessage(e.Commit.Record) 201 record := tangled.RepoArtifact{} 202 err = json.Unmarshal(raw, &record) 203 if err != nil { 204 l.Error("invalid record", "err", err) 205 return err 206 } 207 208 repoAt, err := syntax.ParseATURI(record.Repo) 209 if err != nil { 210 return err 211 } 212 213 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 214 if err != nil { 215 return err 216 } 217 218 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 219 if err != nil || !ok { 220 return err 221 } 222 223 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 224 if err != nil { 225 createdAt = time.Now() 226 } 227 228 artifact := db.Artifact{ 229 Did: did, 230 Rkey: e.Commit.RKey, 231 RepoAt: repoAt, 232 Tag: plumbing.Hash(record.Tag), 233 CreatedAt: createdAt, 234 BlobCid: cid.Cid(record.Artifact.Ref), 235 Name: record.Name, 236 Size: uint64(record.Artifact.Size), 237 MimeType: record.Artifact.MimeType, 238 } 239 240 err = db.AddArtifact(i.Db, artifact) 241 case models.CommitOperationDelete: 242 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 243 } 244 245 if err != nil { 246 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 247 } 248 249 return nil 250} 251 252func (i *Ingester) ingestProfile(e *models.Event) error { 253 did := e.Did 254 var err error 255 256 l := i.Logger.With("handler", "ingestProfile") 257 l = l.With("nsid", e.Commit.Collection) 258 259 if e.Commit.RKey != "self" { 260 return fmt.Errorf("ingestProfile only ingests `self` record") 261 } 262 263 switch e.Commit.Operation { 264 case models.CommitOperationCreate, models.CommitOperationUpdate: 265 raw := json.RawMessage(e.Commit.Record) 266 record := tangled.ActorProfile{} 267 err = json.Unmarshal(raw, &record) 268 if err != nil { 269 l.Error("invalid record", "err", err) 270 return err 271 } 272 273 description := "" 274 if record.Description != nil { 275 description = *record.Description 276 } 277 278 includeBluesky := record.Bluesky 279 280 location := "" 281 if record.Location != nil { 282 location = *record.Location 283 } 284 285 var links [5]string 286 for i, l := range record.Links { 287 if i < 5 { 288 links[i] = l 289 } 290 } 291 292 var stats [2]db.VanityStat 293 for i, s := range record.Stats { 294 if i < 2 { 295 stats[i].Kind = db.VanityStatKind(s) 296 } 297 } 298 299 var pinned [6]syntax.ATURI 300 for i, r := range record.PinnedRepositories { 301 if i < 6 { 302 pinned[i] = syntax.ATURI(r) 303 } 304 } 305 306 profile := db.Profile{ 307 Did: did, 308 Description: description, 309 IncludeBluesky: includeBluesky, 310 Location: location, 311 Links: links, 312 Stats: stats, 313 PinnedRepos: pinned, 314 } 315 316 ddb, ok := i.Db.Execer.(*db.DB) 317 if !ok { 318 return fmt.Errorf("failed to index profile record, invalid db cast") 319 } 320 321 tx, err := ddb.Begin() 322 if err != nil { 323 return fmt.Errorf("failed to start transaction") 324 } 325 326 err = db.ValidateProfile(tx, &profile) 327 if err != nil { 328 return fmt.Errorf("invalid profile record") 329 } 330 331 err = db.UpsertProfile(tx, &profile) 332 case models.CommitOperationDelete: 333 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 334 } 335 336 if err != nil { 337 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 338 } 339 340 return nil 341} 342 343func (i *Ingester) ingestSpindleMember(e *models.Event) error { 344 did := e.Did 345 var err error 346 347 l := i.Logger.With("handler", "ingestSpindleMember") 348 l = l.With("nsid", e.Commit.Collection) 349 350 switch e.Commit.Operation { 351 case models.CommitOperationCreate: 352 raw := json.RawMessage(e.Commit.Record) 353 record := tangled.SpindleMember{} 354 err = json.Unmarshal(raw, &record) 355 if err != nil { 356 l.Error("invalid record", "err", err) 357 return err 358 } 359 360 // only spindle owner can invite to spindles 361 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 362 if err != nil || !ok { 363 return fmt.Errorf("failed to enforce permissions: %w", err) 364 } 365 366 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 367 if err != nil { 368 return err 369 } 370 371 if memberId.Handle.IsInvalidHandle() { 372 return err 373 } 374 375 ddb, ok := i.Db.Execer.(*db.DB) 376 if !ok { 377 return fmt.Errorf("failed to index profile record, invalid db cast") 378 } 379 380 err = db.AddSpindleMember(ddb, db.SpindleMember{ 381 Did: syntax.DID(did), 382 Rkey: e.Commit.RKey, 383 Instance: record.Instance, 384 Subject: memberId.DID, 385 }) 386 if !ok { 387 return fmt.Errorf("failed to add to db: %w", err) 388 } 389 390 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 391 if err != nil { 392 return fmt.Errorf("failed to update ACLs: %w", err) 393 } 394 395 l.Info("added spindle member") 396 case models.CommitOperationDelete: 397 rkey := e.Commit.RKey 398 399 ddb, ok := i.Db.Execer.(*db.DB) 400 if !ok { 401 return fmt.Errorf("failed to index profile record, invalid db cast") 402 } 403 404 // get record from db first 405 members, err := db.GetSpindleMembers( 406 ddb, 407 db.FilterEq("did", did), 408 db.FilterEq("rkey", rkey), 409 ) 410 if err != nil || len(members) != 1 { 411 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 412 } 413 member := members[0] 414 415 tx, err := ddb.Begin() 416 if err != nil { 417 return fmt.Errorf("failed to start txn: %w", err) 418 } 419 420 // remove record by rkey && update enforcer 421 if err = db.RemoveSpindleMember( 422 tx, 423 db.FilterEq("did", did), 424 db.FilterEq("rkey", rkey), 425 ); err != nil { 426 return fmt.Errorf("failed to remove from db: %w", err) 427 } 428 429 // update enforcer 430 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 431 if err != nil { 432 return fmt.Errorf("failed to update ACLs: %w", err) 433 } 434 435 if err = tx.Commit(); err != nil { 436 return fmt.Errorf("failed to commit txn: %w", err) 437 } 438 439 if err = i.Enforcer.E.SavePolicy(); err != nil { 440 return fmt.Errorf("failed to save ACLs: %w", err) 441 } 442 443 l.Info("removed spindle member") 444 } 445 446 return nil 447} 448 449func (i *Ingester) ingestSpindle(e *models.Event) error { 450 did := e.Did 451 var err error 452 453 l := i.Logger.With("handler", "ingestSpindle") 454 l = l.With("nsid", e.Commit.Collection) 455 456 switch e.Commit.Operation { 457 case models.CommitOperationCreate: 458 raw := json.RawMessage(e.Commit.Record) 459 record := tangled.Spindle{} 460 err = json.Unmarshal(raw, &record) 461 if err != nil { 462 l.Error("invalid record", "err", err) 463 return err 464 } 465 466 instance := e.Commit.RKey 467 468 ddb, ok := i.Db.Execer.(*db.DB) 469 if !ok { 470 return fmt.Errorf("failed to index profile record, invalid db cast") 471 } 472 473 err := db.AddSpindle(ddb, db.Spindle{ 474 Owner: syntax.DID(did), 475 Instance: instance, 476 }) 477 if err != nil { 478 l.Error("failed to add spindle to db", "err", err, "instance", instance) 479 return err 480 } 481 482 err = serververify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev) 483 if err != nil { 484 l.Error("failed to add spindle to db", "err", err, "instance", instance) 485 return err 486 } 487 488 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 489 if err != nil { 490 return fmt.Errorf("failed to mark verified: %w", err) 491 } 492 493 return nil 494 495 case models.CommitOperationDelete: 496 instance := e.Commit.RKey 497 498 ddb, ok := i.Db.Execer.(*db.DB) 499 if !ok { 500 return fmt.Errorf("failed to index profile record, invalid db cast") 501 } 502 503 // get record from db first 504 spindles, err := db.GetSpindles( 505 ddb, 506 db.FilterEq("owner", did), 507 db.FilterEq("instance", instance), 508 ) 509 if err != nil || len(spindles) != 1 { 510 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 511 } 512 spindle := spindles[0] 513 514 tx, err := ddb.Begin() 515 if err != nil { 516 return err 517 } 518 defer func() { 519 tx.Rollback() 520 i.Enforcer.E.LoadPolicy() 521 }() 522 523 // remove spindle members first 524 err = db.RemoveSpindleMember( 525 tx, 526 db.FilterEq("owner", did), 527 db.FilterEq("instance", instance), 528 ) 529 if err != nil { 530 return err 531 } 532 533 err = db.DeleteSpindle( 534 tx, 535 db.FilterEq("owner", did), 536 db.FilterEq("instance", instance), 537 ) 538 if err != nil { 539 return err 540 } 541 542 if spindle.Verified != nil { 543 err = i.Enforcer.RemoveSpindle(instance) 544 if err != nil { 545 return err 546 } 547 } 548 549 err = tx.Commit() 550 if err != nil { 551 return err 552 } 553 554 err = i.Enforcer.E.SavePolicy() 555 if err != nil { 556 return err 557 } 558 } 559 560 return nil 561} 562 563func (i *Ingester) ingestString(e *models.Event) error { 564 did := e.Did 565 rkey := e.Commit.RKey 566 567 var err error 568 569 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 570 l.Info("ingesting record") 571 572 ddb, ok := i.Db.Execer.(*db.DB) 573 if !ok { 574 return fmt.Errorf("failed to index string record, invalid db cast") 575 } 576 577 switch e.Commit.Operation { 578 case models.CommitOperationCreate, models.CommitOperationUpdate: 579 raw := json.RawMessage(e.Commit.Record) 580 record := tangled.String{} 581 err = json.Unmarshal(raw, &record) 582 if err != nil { 583 l.Error("invalid record", "err", err) 584 return err 585 } 586 587 string := db.StringFromRecord(did, rkey, record) 588 589 if err = string.Validate(); err != nil { 590 l.Error("invalid record", "err", err) 591 return err 592 } 593 594 if err = db.AddString(ddb, string); err != nil { 595 l.Error("failed to add string", "err", err) 596 return err 597 } 598 599 return nil 600 601 case models.CommitOperationDelete: 602 if err := db.DeleteString( 603 ddb, 604 db.FilterEq("did", did), 605 db.FilterEq("rkey", rkey), 606 ); err != nil { 607 l.Error("failed to delete", "err", err) 608 return fmt.Errorf("failed to delete string record: %w", err) 609 } 610 611 return nil 612 } 613 614 return nil 615} 616 617func (i *Ingester) ingestKnotMember(e *models.Event) error { 618 did := e.Did 619 var err error 620 621 l := i.Logger.With("handler", "ingestKnotMember") 622 l = l.With("nsid", e.Commit.Collection) 623 624 switch e.Commit.Operation { 625 case models.CommitOperationCreate: 626 raw := json.RawMessage(e.Commit.Record) 627 record := tangled.KnotMember{} 628 err = json.Unmarshal(raw, &record) 629 if err != nil { 630 l.Error("invalid record", "err", err) 631 return err 632 } 633 634 // only knot owner can invite to knots 635 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 636 if err != nil || !ok { 637 return fmt.Errorf("failed to enforce permissions: %w", err) 638 } 639 640 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 641 if err != nil { 642 return err 643 } 644 645 if memberId.Handle.IsInvalidHandle() { 646 return err 647 } 648 649 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 650 if err != nil { 651 return fmt.Errorf("failed to update ACLs: %w", err) 652 } 653 654 l.Info("added knot member") 655 case models.CommitOperationDelete: 656 // we don't store knot members in a table (like we do for spindle) 657 // and we can't remove this just yet. possibly fixed if we switch 658 // to either: 659 // 1. a knot_members table like with spindle and store the rkey 660 // 2. use the knot host as the rkey 661 // 662 // TODO: implement member deletion 663 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 664 } 665 666 return nil 667} 668 669func (i *Ingester) ingestKnot(e *models.Event) error { 670 did := e.Did 671 var err error 672 673 l := i.Logger.With("handler", "ingestKnot") 674 l = l.With("nsid", e.Commit.Collection) 675 676 switch e.Commit.Operation { 677 case models.CommitOperationCreate: 678 raw := json.RawMessage(e.Commit.Record) 679 record := tangled.Knot{} 680 err = json.Unmarshal(raw, &record) 681 if err != nil { 682 l.Error("invalid record", "err", err) 683 return err 684 } 685 686 domain := e.Commit.RKey 687 688 ddb, ok := i.Db.Execer.(*db.DB) 689 if !ok { 690 return fmt.Errorf("failed to index profile record, invalid db cast") 691 } 692 693 err := db.AddKnot(ddb, domain, did) 694 if err != nil { 695 l.Error("failed to add knot to db", "err", err, "domain", domain) 696 return err 697 } 698 699 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 700 if err != nil { 701 l.Error("failed to verify knot", "err", err, "domain", domain) 702 return err 703 } 704 705 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 706 if err != nil { 707 return fmt.Errorf("failed to mark verified: %w", err) 708 } 709 710 return nil 711 712 case models.CommitOperationDelete: 713 domain := e.Commit.RKey 714 715 ddb, ok := i.Db.Execer.(*db.DB) 716 if !ok { 717 return fmt.Errorf("failed to index knot record, invalid db cast") 718 } 719 720 // get record from db first 721 registrations, err := db.GetRegistrations( 722 ddb, 723 db.FilterEq("domain", domain), 724 db.FilterEq("did", did), 725 ) 726 if err != nil { 727 return fmt.Errorf("failed to get registration: %w", err) 728 } 729 if len(registrations) != 1 { 730 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 731 } 732 registration := registrations[0] 733 734 tx, err := ddb.Begin() 735 if err != nil { 736 return err 737 } 738 defer func() { 739 tx.Rollback() 740 i.Enforcer.E.LoadPolicy() 741 }() 742 743 err = db.DeleteKnot( 744 tx, 745 db.FilterEq("did", did), 746 db.FilterEq("domain", domain), 747 ) 748 if err != nil { 749 return err 750 } 751 752 if registration.Registered != nil { 753 err = i.Enforcer.RemoveKnot(domain) 754 if err != nil { 755 return err 756 } 757 } 758 759 err = tx.Commit() 760 if err != nil { 761 return err 762 } 763 764 err = i.Enforcer.E.SavePolicy() 765 if err != nil { 766 return err 767 } 768 } 769 770 return nil 771}