forked from tangled.org/core
this repo has no description
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/go-git/go-git/v5/plumbing" 13 "github.com/ipfs/go-cid" 14 "tangled.sh/tangled.sh/core/api/tangled" 15 "tangled.sh/tangled.sh/core/appview/config" 16 "tangled.sh/tangled.sh/core/appview/db" 17 "tangled.sh/tangled.sh/core/appview/spindleverify" 18 "tangled.sh/tangled.sh/core/idresolver" 19 "tangled.sh/tangled.sh/core/rbac" 20) 21 22type Ingester struct { 23 Db db.DbWrapper 24 Enforcer *rbac.Enforcer 25 IdResolver *idresolver.Resolver 26 Config *config.Config 27 Logger *slog.Logger 28} 29 30type processFunc func(ctx context.Context, e *models.Event) error 31 32func (i *Ingester) Ingest() processFunc { 33 return func(ctx context.Context, e *models.Event) error { 34 var err error 35 defer func() { 36 eventTime := e.TimeUS 37 lastTimeUs := eventTime + 1 38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 40 } 41 }() 42 43 l := i.Logger.With("kind", e.Kind) 44 switch e.Kind { 45 case models.EventKindAccount: 46 if !e.Account.Active && *e.Account.Status == "deactivated" { 47 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 48 } 49 case models.EventKindIdentity: 50 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 51 case models.EventKindCommit: 52 switch e.Commit.Collection { 53 case tangled.GraphFollowNSID: 54 err = i.ingestFollow(e) 55 case tangled.FeedStarNSID: 56 err = i.ingestStar(e) 57 case tangled.PublicKeyNSID: 58 err = i.ingestPublicKey(e) 59 case tangled.RepoArtifactNSID: 60 err = i.ingestArtifact(e) 61 case tangled.ActorProfileNSID: 62 err = i.ingestProfile(e) 63 case tangled.SpindleMemberNSID: 64 err = i.ingestSpindleMember(e) 65 case tangled.SpindleNSID: 66 err = i.ingestSpindle(e) 67 case tangled.StringNSID: 68 err = i.ingestString(e) 69 } 70 l = i.Logger.With("nsid", e.Commit.Collection) 71 } 72 73 if err != nil { 74 l.Error("error ingesting record", "err", err) 75 } 76 77 return err 78 } 79} 80 81func (i *Ingester) ingestStar(e *models.Event) error { 82 var err error 83 did := e.Did 84 85 l := i.Logger.With("handler", "ingestStar") 86 l = l.With("nsid", e.Commit.Collection) 87 88 switch e.Commit.Operation { 89 case models.CommitOperationCreate, models.CommitOperationUpdate: 90 var subjectUri syntax.ATURI 91 92 raw := json.RawMessage(e.Commit.Record) 93 record := tangled.FeedStar{} 94 err := json.Unmarshal(raw, &record) 95 if err != nil { 96 l.Error("invalid record", "err", err) 97 return err 98 } 99 100 subjectUri, err = syntax.ParseATURI(record.Subject) 101 if err != nil { 102 l.Error("invalid record", "err", err) 103 return err 104 } 105 err = db.AddStar(i.Db, &db.Star{ 106 StarredByDid: did, 107 RepoAt: subjectUri, 108 Rkey: e.Commit.RKey, 109 }) 110 case models.CommitOperationDelete: 111 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 112 } 113 114 if err != nil { 115 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 116 } 117 118 return nil 119} 120 121func (i *Ingester) ingestFollow(e *models.Event) error { 122 var err error 123 did := e.Did 124 125 l := i.Logger.With("handler", "ingestFollow") 126 l = l.With("nsid", e.Commit.Collection) 127 128 switch e.Commit.Operation { 129 case models.CommitOperationCreate, models.CommitOperationUpdate: 130 raw := json.RawMessage(e.Commit.Record) 131 record := tangled.GraphFollow{} 132 err = json.Unmarshal(raw, &record) 133 if err != nil { 134 l.Error("invalid record", "err", err) 135 return err 136 } 137 138 err = db.AddFollow(i.Db, &db.Follow{ 139 UserDid: did, 140 SubjectDid: record.Subject, 141 Rkey: e.Commit.RKey, 142 }) 143 case models.CommitOperationDelete: 144 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 145 } 146 147 if err != nil { 148 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 149 } 150 151 return nil 152} 153 154func (i *Ingester) ingestPublicKey(e *models.Event) error { 155 did := e.Did 156 var err error 157 158 l := i.Logger.With("handler", "ingestPublicKey") 159 l = l.With("nsid", e.Commit.Collection) 160 161 switch e.Commit.Operation { 162 case models.CommitOperationCreate, models.CommitOperationUpdate: 163 l.Debug("processing add of pubkey") 164 raw := json.RawMessage(e.Commit.Record) 165 record := tangled.PublicKey{} 166 err = json.Unmarshal(raw, &record) 167 if err != nil { 168 l.Error("invalid record", "err", err) 169 return err 170 } 171 172 name := record.Name 173 key := record.Key 174 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 175 case models.CommitOperationDelete: 176 l.Debug("processing delete of pubkey") 177 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 178 } 179 180 if err != nil { 181 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 182 } 183 184 return nil 185} 186 187func (i *Ingester) ingestArtifact(e *models.Event) error { 188 did := e.Did 189 var err error 190 191 l := i.Logger.With("handler", "ingestArtifact") 192 l = l.With("nsid", e.Commit.Collection) 193 194 switch e.Commit.Operation { 195 case models.CommitOperationCreate, models.CommitOperationUpdate: 196 raw := json.RawMessage(e.Commit.Record) 197 record := tangled.RepoArtifact{} 198 err = json.Unmarshal(raw, &record) 199 if err != nil { 200 l.Error("invalid record", "err", err) 201 return err 202 } 203 204 repoAt, err := syntax.ParseATURI(record.Repo) 205 if err != nil { 206 return err 207 } 208 209 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 210 if err != nil { 211 return err 212 } 213 214 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 215 if err != nil || !ok { 216 return err 217 } 218 219 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 220 if err != nil { 221 createdAt = time.Now() 222 } 223 224 artifact := db.Artifact{ 225 Did: did, 226 Rkey: e.Commit.RKey, 227 RepoAt: repoAt, 228 Tag: plumbing.Hash(record.Tag), 229 CreatedAt: createdAt, 230 BlobCid: cid.Cid(record.Artifact.Ref), 231 Name: record.Name, 232 Size: uint64(record.Artifact.Size), 233 MimeType: record.Artifact.MimeType, 234 } 235 236 err = db.AddArtifact(i.Db, artifact) 237 case models.CommitOperationDelete: 238 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 239 } 240 241 if err != nil { 242 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 243 } 244 245 return nil 246} 247 248func (i *Ingester) ingestProfile(e *models.Event) error { 249 did := e.Did 250 var err error 251 252 l := i.Logger.With("handler", "ingestProfile") 253 l = l.With("nsid", e.Commit.Collection) 254 255 if e.Commit.RKey != "self" { 256 return fmt.Errorf("ingestProfile only ingests `self` record") 257 } 258 259 switch e.Commit.Operation { 260 case models.CommitOperationCreate, models.CommitOperationUpdate: 261 raw := json.RawMessage(e.Commit.Record) 262 record := tangled.ActorProfile{} 263 err = json.Unmarshal(raw, &record) 264 if err != nil { 265 l.Error("invalid record", "err", err) 266 return err 267 } 268 269 description := "" 270 if record.Description != nil { 271 description = *record.Description 272 } 273 274 includeBluesky := record.Bluesky 275 276 location := "" 277 if record.Location != nil { 278 location = *record.Location 279 } 280 281 var links [5]string 282 for i, l := range record.Links { 283 if i < 5 { 284 links[i] = l 285 } 286 } 287 288 var stats [2]db.VanityStat 289 for i, s := range record.Stats { 290 if i < 2 { 291 stats[i].Kind = db.VanityStatKind(s) 292 } 293 } 294 295 var pinned [6]syntax.ATURI 296 for i, r := range record.PinnedRepositories { 297 if i < 6 { 298 pinned[i] = syntax.ATURI(r) 299 } 300 } 301 302 profile := db.Profile{ 303 Did: did, 304 Description: description, 305 IncludeBluesky: includeBluesky, 306 Location: location, 307 Links: links, 308 Stats: stats, 309 PinnedRepos: pinned, 310 } 311 312 ddb, ok := i.Db.Execer.(*db.DB) 313 if !ok { 314 return fmt.Errorf("failed to index profile record, invalid db cast") 315 } 316 317 tx, err := ddb.Begin() 318 if err != nil { 319 return fmt.Errorf("failed to start transaction") 320 } 321 322 err = db.ValidateProfile(tx, &profile) 323 if err != nil { 324 return fmt.Errorf("invalid profile record") 325 } 326 327 err = db.UpsertProfile(tx, &profile) 328 case models.CommitOperationDelete: 329 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 330 } 331 332 if err != nil { 333 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 334 } 335 336 return nil 337} 338 339func (i *Ingester) ingestSpindleMember(e *models.Event) error { 340 did := e.Did 341 var err error 342 343 l := i.Logger.With("handler", "ingestSpindleMember") 344 l = l.With("nsid", e.Commit.Collection) 345 346 switch e.Commit.Operation { 347 case models.CommitOperationCreate: 348 raw := json.RawMessage(e.Commit.Record) 349 record := tangled.SpindleMember{} 350 err = json.Unmarshal(raw, &record) 351 if err != nil { 352 l.Error("invalid record", "err", err) 353 return err 354 } 355 356 // only spindle owner can invite to spindles 357 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 358 if err != nil || !ok { 359 return fmt.Errorf("failed to enforce permissions: %w", err) 360 } 361 362 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 363 if err != nil { 364 return err 365 } 366 367 if memberId.Handle.IsInvalidHandle() { 368 return err 369 } 370 371 ddb, ok := i.Db.Execer.(*db.DB) 372 if !ok { 373 return fmt.Errorf("failed to index profile record, invalid db cast") 374 } 375 376 err = db.AddSpindleMember(ddb, db.SpindleMember{ 377 Did: syntax.DID(did), 378 Rkey: e.Commit.RKey, 379 Instance: record.Instance, 380 Subject: memberId.DID, 381 }) 382 if !ok { 383 return fmt.Errorf("failed to add to db: %w", err) 384 } 385 386 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 387 if err != nil { 388 return fmt.Errorf("failed to update ACLs: %w", err) 389 } 390 case models.CommitOperationDelete: 391 rkey := e.Commit.RKey 392 393 ddb, ok := i.Db.Execer.(*db.DB) 394 if !ok { 395 return fmt.Errorf("failed to index profile record, invalid db cast") 396 } 397 398 // get record from db first 399 members, err := db.GetSpindleMembers( 400 ddb, 401 db.FilterEq("did", did), 402 db.FilterEq("rkey", rkey), 403 ) 404 if err != nil || len(members) != 1 { 405 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 406 } 407 member := members[0] 408 409 tx, err := ddb.Begin() 410 if err != nil { 411 return fmt.Errorf("failed to start txn: %w", err) 412 } 413 414 // remove record by rkey && update enforcer 415 if err = db.RemoveSpindleMember( 416 tx, 417 db.FilterEq("did", did), 418 db.FilterEq("rkey", rkey), 419 ); err != nil { 420 return fmt.Errorf("failed to remove from db: %w", err) 421 } 422 423 // update enforcer 424 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 425 if err != nil { 426 return fmt.Errorf("failed to update ACLs: %w", err) 427 } 428 429 if err = tx.Commit(); err != nil { 430 return fmt.Errorf("failed to commit txn: %w", err) 431 } 432 433 if err = i.Enforcer.E.SavePolicy(); err != nil { 434 return fmt.Errorf("failed to save ACLs: %w", err) 435 } 436 } 437 438 return nil 439} 440 441func (i *Ingester) ingestSpindle(e *models.Event) error { 442 did := e.Did 443 var err error 444 445 l := i.Logger.With("handler", "ingestSpindle") 446 l = l.With("nsid", e.Commit.Collection) 447 448 switch e.Commit.Operation { 449 case models.CommitOperationCreate: 450 raw := json.RawMessage(e.Commit.Record) 451 record := tangled.Spindle{} 452 err = json.Unmarshal(raw, &record) 453 if err != nil { 454 l.Error("invalid record", "err", err) 455 return err 456 } 457 458 instance := e.Commit.RKey 459 460 ddb, ok := i.Db.Execer.(*db.DB) 461 if !ok { 462 return fmt.Errorf("failed to index profile record, invalid db cast") 463 } 464 465 err := db.AddSpindle(ddb, db.Spindle{ 466 Owner: syntax.DID(did), 467 Instance: instance, 468 }) 469 if err != nil { 470 l.Error("failed to add spindle to db", "err", err, "instance", instance) 471 return err 472 } 473 474 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev) 475 if err != nil { 476 l.Error("failed to add spindle to db", "err", err, "instance", instance) 477 return err 478 } 479 480 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did) 481 if err != nil { 482 return fmt.Errorf("failed to mark verified: %w", err) 483 } 484 485 return nil 486 487 case models.CommitOperationDelete: 488 instance := e.Commit.RKey 489 490 ddb, ok := i.Db.Execer.(*db.DB) 491 if !ok { 492 return fmt.Errorf("failed to index profile record, invalid db cast") 493 } 494 495 // get record from db first 496 spindles, err := db.GetSpindles( 497 ddb, 498 db.FilterEq("owner", did), 499 db.FilterEq("instance", instance), 500 ) 501 if err != nil || len(spindles) != 1 { 502 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 503 } 504 spindle := spindles[0] 505 506 tx, err := ddb.Begin() 507 if err != nil { 508 return err 509 } 510 defer func() { 511 tx.Rollback() 512 i.Enforcer.E.LoadPolicy() 513 }() 514 515 // remove spindle members first 516 err = db.RemoveSpindleMember( 517 tx, 518 db.FilterEq("owner", did), 519 db.FilterEq("instance", instance), 520 ) 521 if err != nil { 522 return err 523 } 524 525 err = db.DeleteSpindle( 526 tx, 527 db.FilterEq("owner", did), 528 db.FilterEq("instance", instance), 529 ) 530 if err != nil { 531 return err 532 } 533 534 if spindle.Verified != nil { 535 err = i.Enforcer.RemoveSpindle(instance) 536 if err != nil { 537 return err 538 } 539 } 540 541 err = tx.Commit() 542 if err != nil { 543 return err 544 } 545 546 err = i.Enforcer.E.SavePolicy() 547 if err != nil { 548 return err 549 } 550 } 551 552 return nil 553} 554 555func (i *Ingester) ingestString(e *models.Event) error { 556 did := e.Did 557 rkey := e.Commit.RKey 558 559 var err error 560 561 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 562 l.Info("ingesting record") 563 564 ddb, ok := i.Db.Execer.(*db.DB) 565 if !ok { 566 return fmt.Errorf("failed to index string record, invalid db cast") 567 } 568 569 switch e.Commit.Operation { 570 case models.CommitOperationCreate, models.CommitOperationUpdate: 571 raw := json.RawMessage(e.Commit.Record) 572 record := tangled.String{} 573 err = json.Unmarshal(raw, &record) 574 if err != nil { 575 l.Error("invalid record", "err", err) 576 return err 577 } 578 579 string := db.StringFromRecord(did, rkey, record) 580 581 if err = string.Validate(); err != nil { 582 l.Error("invalid record", "err", err) 583 return err 584 } 585 586 if err = db.AddString(ddb, string); err != nil { 587 l.Error("failed to add string", "err", err) 588 return err 589 } 590 591 return nil 592 593 case models.CommitOperationDelete: 594 if err := db.DeleteString( 595 ddb, 596 db.FilterEq("did", did), 597 db.FilterEq("rkey", rkey), 598 ); err != nil { 599 l.Error("failed to delete", "err", err) 600 return fmt.Errorf("failed to delete string record: %w", err) 601 } 602 603 return nil 604 } 605 606 return nil 607}