forked from tangled.org/core
this repo has no description
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/go-git/go-git/v5/plumbing" 13 "github.com/ipfs/go-cid" 14 "tangled.sh/tangled.sh/core/api/tangled" 15 "tangled.sh/tangled.sh/core/appview/config" 16 "tangled.sh/tangled.sh/core/appview/db" 17 "tangled.sh/tangled.sh/core/appview/spindleverify" 18 "tangled.sh/tangled.sh/core/idresolver" 19 "tangled.sh/tangled.sh/core/rbac" 20) 21 22type Ingester struct { 23 Db db.DbWrapper 24 Enforcer *rbac.Enforcer 25 IdResolver *idresolver.Resolver 26 Config *config.Config 27 Logger *slog.Logger 28} 29 30type processFunc func(ctx context.Context, e *models.Event) error 31 32func (i *Ingester) Ingest() processFunc { 33 return func(ctx context.Context, e *models.Event) error { 34 var err error 35 defer func() { 36 eventTime := e.TimeUS 37 lastTimeUs := eventTime + 1 38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 40 } 41 }() 42 43 l := i.Logger.With("kind", e.Kind) 44 switch e.Kind { 45 case models.EventKindAccount: 46 if !e.Account.Active && *e.Account.Status == "deactivated" { 47 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 48 } 49 case models.EventKindIdentity: 50 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 51 case models.EventKindCommit: 52 switch e.Commit.Collection { 53 case tangled.GraphFollowNSID: 54 err = i.ingestFollow(e) 55 case tangled.FeedStarNSID: 56 err = i.ingestStar(e) 57 case tangled.PublicKeyNSID: 58 err = i.ingestPublicKey(e) 59 case tangled.RepoArtifactNSID: 60 err = i.ingestArtifact(e) 61 case tangled.ActorProfileNSID: 62 err = i.ingestProfile(e) 63 case tangled.SpindleMemberNSID: 64 err = i.ingestSpindleMember(e) 65 case tangled.SpindleNSID: 66 err = i.ingestSpindle(e) 67 case tangled.StringNSID: 68 err = i.ingestString(e) 69 } 70 l = i.Logger.With("nsid", e.Commit.Collection) 71 } 72 73 if err != nil { 74 l.Debug("error ingesting record", "err", err) 75 } 76 77 return nil 78 } 79} 80 81func (i *Ingester) ingestStar(e *models.Event) error { 82 var err error 83 did := e.Did 84 85 l := i.Logger.With("handler", "ingestStar") 86 l = l.With("nsid", e.Commit.Collection) 87 88 switch e.Commit.Operation { 89 case models.CommitOperationCreate, models.CommitOperationUpdate: 90 var subjectUri syntax.ATURI 91 92 raw := json.RawMessage(e.Commit.Record) 93 record := tangled.FeedStar{} 94 err := json.Unmarshal(raw, &record) 95 if err != nil { 96 l.Error("invalid record", "err", err) 97 return err 98 } 99 100 subjectUri, err = syntax.ParseATURI(record.Subject) 101 if err != nil { 102 l.Error("invalid record", "err", err) 103 return err 104 } 105 err = db.AddStar(i.Db, &db.Star{ 106 StarredByDid: did, 107 RepoAt: subjectUri, 108 Rkey: e.Commit.RKey, 109 }) 110 case models.CommitOperationDelete: 111 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 112 } 113 114 if err != nil { 115 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 116 } 117 118 return nil 119} 120 121func (i *Ingester) ingestFollow(e *models.Event) error { 122 var err error 123 did := e.Did 124 125 l := i.Logger.With("handler", "ingestFollow") 126 l = l.With("nsid", e.Commit.Collection) 127 128 switch e.Commit.Operation { 129 case models.CommitOperationCreate, models.CommitOperationUpdate: 130 raw := json.RawMessage(e.Commit.Record) 131 record := tangled.GraphFollow{} 132 err = json.Unmarshal(raw, &record) 133 if err != nil { 134 l.Error("invalid record", "err", err) 135 return err 136 } 137 138 err = db.AddFollow(i.Db, &db.Follow{ 139 UserDid: did, 140 SubjectDid: record.Subject, 141 Rkey: e.Commit.RKey, 142 }) 143 case models.CommitOperationDelete: 144 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 145 } 146 147 if err != nil { 148 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 149 } 150 151 return nil 152} 153 154func (i *Ingester) ingestPublicKey(e *models.Event) error { 155 did := e.Did 156 var err error 157 158 l := i.Logger.With("handler", "ingestPublicKey") 159 l = l.With("nsid", e.Commit.Collection) 160 161 switch e.Commit.Operation { 162 case models.CommitOperationCreate, models.CommitOperationUpdate: 163 l.Debug("processing add of pubkey") 164 raw := json.RawMessage(e.Commit.Record) 165 record := tangled.PublicKey{} 166 err = json.Unmarshal(raw, &record) 167 if err != nil { 168 l.Error("invalid record", "err", err) 169 return err 170 } 171 172 name := record.Name 173 key := record.Key 174 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 175 case models.CommitOperationDelete: 176 l.Debug("processing delete of pubkey") 177 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 178 } 179 180 if err != nil { 181 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 182 } 183 184 return nil 185} 186 187func (i *Ingester) ingestArtifact(e *models.Event) error { 188 did := e.Did 189 var err error 190 191 l := i.Logger.With("handler", "ingestArtifact") 192 l = l.With("nsid", e.Commit.Collection) 193 194 switch e.Commit.Operation { 195 case models.CommitOperationCreate, models.CommitOperationUpdate: 196 raw := json.RawMessage(e.Commit.Record) 197 record := tangled.RepoArtifact{} 198 err = json.Unmarshal(raw, &record) 199 if err != nil { 200 l.Error("invalid record", "err", err) 201 return err 202 } 203 204 repoAt, err := syntax.ParseATURI(record.Repo) 205 if err != nil { 206 return err 207 } 208 209 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 210 if err != nil { 211 return err 212 } 213 214 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 215 if err != nil || !ok { 216 return err 217 } 218 219 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 220 if err != nil { 221 createdAt = time.Now() 222 } 223 224 artifact := db.Artifact{ 225 Did: did, 226 Rkey: e.Commit.RKey, 227 RepoAt: repoAt, 228 Tag: plumbing.Hash(record.Tag), 229 CreatedAt: createdAt, 230 BlobCid: cid.Cid(record.Artifact.Ref), 231 Name: record.Name, 232 Size: uint64(record.Artifact.Size), 233 MimeType: record.Artifact.MimeType, 234 } 235 236 err = db.AddArtifact(i.Db, artifact) 237 case models.CommitOperationDelete: 238 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 239 } 240 241 if err != nil { 242 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 243 } 244 245 return nil 246} 247 248func (i *Ingester) ingestProfile(e *models.Event) error { 249 did := e.Did 250 var err error 251 252 l := i.Logger.With("handler", "ingestProfile") 253 l = l.With("nsid", e.Commit.Collection) 254 255 if e.Commit.RKey != "self" { 256 return fmt.Errorf("ingestProfile only ingests `self` record") 257 } 258 259 switch e.Commit.Operation { 260 case models.CommitOperationCreate, models.CommitOperationUpdate: 261 raw := json.RawMessage(e.Commit.Record) 262 record := tangled.ActorProfile{} 263 err = json.Unmarshal(raw, &record) 264 if err != nil { 265 l.Error("invalid record", "err", err) 266 return err 267 } 268 269 description := "" 270 if record.Description != nil { 271 description = *record.Description 272 } 273 274 includeBluesky := record.Bluesky 275 276 location := "" 277 if record.Location != nil { 278 location = *record.Location 279 } 280 281 var links [5]string 282 for i, l := range record.Links { 283 if i < 5 { 284 links[i] = l 285 } 286 } 287 288 var stats [2]db.VanityStat 289 for i, s := range record.Stats { 290 if i < 2 { 291 stats[i].Kind = db.VanityStatKind(s) 292 } 293 } 294 295 var pinned [6]syntax.ATURI 296 for i, r := range record.PinnedRepositories { 297 if i < 6 { 298 pinned[i] = syntax.ATURI(r) 299 } 300 } 301 302 profile := db.Profile{ 303 Did: did, 304 Description: description, 305 IncludeBluesky: includeBluesky, 306 Location: location, 307 Links: links, 308 Stats: stats, 309 PinnedRepos: pinned, 310 } 311 312 ddb, ok := i.Db.Execer.(*db.DB) 313 if !ok { 314 return fmt.Errorf("failed to index profile record, invalid db cast") 315 } 316 317 tx, err := ddb.Begin() 318 if err != nil { 319 return fmt.Errorf("failed to start transaction") 320 } 321 322 err = db.ValidateProfile(tx, &profile) 323 if err != nil { 324 return fmt.Errorf("invalid profile record") 325 } 326 327 err = db.UpsertProfile(tx, &profile) 328 case models.CommitOperationDelete: 329 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 330 } 331 332 if err != nil { 333 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 334 } 335 336 return nil 337} 338 339func (i *Ingester) ingestSpindleMember(e *models.Event) error { 340 did := e.Did 341 var err error 342 343 l := i.Logger.With("handler", "ingestSpindleMember") 344 l = l.With("nsid", e.Commit.Collection) 345 346 switch e.Commit.Operation { 347 case models.CommitOperationCreate: 348 raw := json.RawMessage(e.Commit.Record) 349 record := tangled.SpindleMember{} 350 err = json.Unmarshal(raw, &record) 351 if err != nil { 352 l.Error("invalid record", "err", err) 353 return err 354 } 355 356 // only spindle owner can invite to spindles 357 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 358 if err != nil || !ok { 359 return fmt.Errorf("failed to enforce permissions: %w", err) 360 } 361 362 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 363 if err != nil { 364 return err 365 } 366 367 if memberId.Handle.IsInvalidHandle() { 368 return err 369 } 370 371 ddb, ok := i.Db.Execer.(*db.DB) 372 if !ok { 373 return fmt.Errorf("failed to index profile record, invalid db cast") 374 } 375 376 err = db.AddSpindleMember(ddb, db.SpindleMember{ 377 Did: syntax.DID(did), 378 Rkey: e.Commit.RKey, 379 Instance: record.Instance, 380 Subject: memberId.DID, 381 }) 382 if !ok { 383 return fmt.Errorf("failed to add to db: %w", err) 384 } 385 386 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 387 if err != nil { 388 return fmt.Errorf("failed to update ACLs: %w", err) 389 } 390 391 l.Info("added spindle member") 392 case models.CommitOperationDelete: 393 rkey := e.Commit.RKey 394 395 ddb, ok := i.Db.Execer.(*db.DB) 396 if !ok { 397 return fmt.Errorf("failed to index profile record, invalid db cast") 398 } 399 400 // get record from db first 401 members, err := db.GetSpindleMembers( 402 ddb, 403 db.FilterEq("did", did), 404 db.FilterEq("rkey", rkey), 405 ) 406 if err != nil || len(members) != 1 { 407 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 408 } 409 member := members[0] 410 411 tx, err := ddb.Begin() 412 if err != nil { 413 return fmt.Errorf("failed to start txn: %w", err) 414 } 415 416 // remove record by rkey && update enforcer 417 if err = db.RemoveSpindleMember( 418 tx, 419 db.FilterEq("did", did), 420 db.FilterEq("rkey", rkey), 421 ); err != nil { 422 return fmt.Errorf("failed to remove from db: %w", err) 423 } 424 425 // update enforcer 426 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 427 if err != nil { 428 return fmt.Errorf("failed to update ACLs: %w", err) 429 } 430 431 if err = tx.Commit(); err != nil { 432 return fmt.Errorf("failed to commit txn: %w", err) 433 } 434 435 if err = i.Enforcer.E.SavePolicy(); err != nil { 436 return fmt.Errorf("failed to save ACLs: %w", err) 437 } 438 439 l.Info("removed spindle member") 440 } 441 442 return nil 443} 444 445func (i *Ingester) ingestSpindle(e *models.Event) error { 446 did := e.Did 447 var err error 448 449 l := i.Logger.With("handler", "ingestSpindle") 450 l = l.With("nsid", e.Commit.Collection) 451 452 switch e.Commit.Operation { 453 case models.CommitOperationCreate: 454 raw := json.RawMessage(e.Commit.Record) 455 record := tangled.Spindle{} 456 err = json.Unmarshal(raw, &record) 457 if err != nil { 458 l.Error("invalid record", "err", err) 459 return err 460 } 461 462 instance := e.Commit.RKey 463 464 ddb, ok := i.Db.Execer.(*db.DB) 465 if !ok { 466 return fmt.Errorf("failed to index profile record, invalid db cast") 467 } 468 469 err := db.AddSpindle(ddb, db.Spindle{ 470 Owner: syntax.DID(did), 471 Instance: instance, 472 }) 473 if err != nil { 474 l.Error("failed to add spindle to db", "err", err, "instance", instance) 475 return err 476 } 477 478 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev) 479 if err != nil { 480 l.Error("failed to add spindle to db", "err", err, "instance", instance) 481 return err 482 } 483 484 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did) 485 if err != nil { 486 return fmt.Errorf("failed to mark verified: %w", err) 487 } 488 489 return nil 490 491 case models.CommitOperationDelete: 492 instance := e.Commit.RKey 493 494 ddb, ok := i.Db.Execer.(*db.DB) 495 if !ok { 496 return fmt.Errorf("failed to index profile record, invalid db cast") 497 } 498 499 // get record from db first 500 spindles, err := db.GetSpindles( 501 ddb, 502 db.FilterEq("owner", did), 503 db.FilterEq("instance", instance), 504 ) 505 if err != nil || len(spindles) != 1 { 506 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 507 } 508 spindle := spindles[0] 509 510 tx, err := ddb.Begin() 511 if err != nil { 512 return err 513 } 514 defer func() { 515 tx.Rollback() 516 i.Enforcer.E.LoadPolicy() 517 }() 518 519 // remove spindle members first 520 err = db.RemoveSpindleMember( 521 tx, 522 db.FilterEq("owner", did), 523 db.FilterEq("instance", instance), 524 ) 525 if err != nil { 526 return err 527 } 528 529 err = db.DeleteSpindle( 530 tx, 531 db.FilterEq("owner", did), 532 db.FilterEq("instance", instance), 533 ) 534 if err != nil { 535 return err 536 } 537 538 if spindle.Verified != nil { 539 err = i.Enforcer.RemoveSpindle(instance) 540 if err != nil { 541 return err 542 } 543 } 544 545 err = tx.Commit() 546 if err != nil { 547 return err 548 } 549 550 err = i.Enforcer.E.SavePolicy() 551 if err != nil { 552 return err 553 } 554 } 555 556 return nil 557} 558 559func (i *Ingester) ingestString(e *models.Event) error { 560 did := e.Did 561 rkey := e.Commit.RKey 562 563 var err error 564 565 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 566 l.Info("ingesting record") 567 568 ddb, ok := i.Db.Execer.(*db.DB) 569 if !ok { 570 return fmt.Errorf("failed to index string record, invalid db cast") 571 } 572 573 switch e.Commit.Operation { 574 case models.CommitOperationCreate, models.CommitOperationUpdate: 575 raw := json.RawMessage(e.Commit.Record) 576 record := tangled.String{} 577 err = json.Unmarshal(raw, &record) 578 if err != nil { 579 l.Error("invalid record", "err", err) 580 return err 581 } 582 583 string := db.StringFromRecord(did, rkey, record) 584 585 if err = string.Validate(); err != nil { 586 l.Error("invalid record", "err", err) 587 return err 588 } 589 590 if err = db.AddString(ddb, string); err != nil { 591 l.Error("failed to add string", "err", err) 592 return err 593 } 594 595 return nil 596 597 case models.CommitOperationDelete: 598 if err := db.DeleteString( 599 ddb, 600 db.FilterEq("did", did), 601 db.FilterEq("rkey", rkey), 602 ); err != nil { 603 l.Error("failed to delete", "err", err) 604 return fmt.Errorf("failed to delete string record: %w", err) 605 } 606 607 return nil 608 } 609 610 return nil 611}