forked from tangled.org/core
this repo has no description
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/go-git/go-git/v5/plumbing" 13 "github.com/ipfs/go-cid" 14 "tangled.sh/tangled.sh/core/api/tangled" 15 "tangled.sh/tangled.sh/core/appview/config" 16 "tangled.sh/tangled.sh/core/appview/db" 17 "tangled.sh/tangled.sh/core/appview/idresolver" 18 "tangled.sh/tangled.sh/core/appview/spindleverify" 19 "tangled.sh/tangled.sh/core/rbac" 20) 21 22type Ingester struct { 23 Db db.DbWrapper 24 Enforcer *rbac.Enforcer 25 IdResolver *idresolver.Resolver 26 Config *config.Config 27 Logger *slog.Logger 28} 29 30type processFunc func(ctx context.Context, e *models.Event) error 31 32func (i *Ingester) Ingest() processFunc { 33 return func(ctx context.Context, e *models.Event) error { 34 var err error 35 defer func() { 36 eventTime := e.TimeUS 37 lastTimeUs := eventTime + 1 38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 40 } 41 }() 42 43 l := i.Logger.With("kind", e.Kind) 44 switch e.Kind { 45 case models.EventKindAccount: 46 if !e.Account.Active && *e.Account.Status == "deactivated" { 47 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 48 } 49 case models.EventKindIdentity: 50 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 51 case models.EventKindCommit: 52 switch e.Commit.Collection { 53 case tangled.GraphFollowNSID: 54 err = i.ingestFollow(e) 55 case tangled.FeedStarNSID: 56 err = i.ingestStar(e) 57 case tangled.PublicKeyNSID: 58 err = i.ingestPublicKey(e) 59 case tangled.RepoArtifactNSID: 60 err = i.ingestArtifact(e) 61 case tangled.ActorProfileNSID: 62 err = i.ingestProfile(e) 63 case tangled.SpindleMemberNSID: 64 err = i.ingestSpindleMember(e) 65 case tangled.SpindleNSID: 66 err = i.ingestSpindle(e) 67 } 68 l = i.Logger.With("nsid", e.Commit.Collection) 69 } 70 71 if err != nil { 72 l.Error("error ingesting record", "err", err) 73 } 74 75 return err 76 } 77} 78 79func (i *Ingester) ingestStar(e *models.Event) error { 80 var err error 81 did := e.Did 82 83 l := i.Logger.With("handler", "ingestStar") 84 l = l.With("nsid", e.Commit.Collection) 85 86 switch e.Commit.Operation { 87 case models.CommitOperationCreate, models.CommitOperationUpdate: 88 var subjectUri syntax.ATURI 89 90 raw := json.RawMessage(e.Commit.Record) 91 record := tangled.FeedStar{} 92 err := json.Unmarshal(raw, &record) 93 if err != nil { 94 l.Error("invalid record", "err", err) 95 return err 96 } 97 98 subjectUri, err = syntax.ParseATURI(record.Subject) 99 if err != nil { 100 l.Error("invalid record", "err", err) 101 return err 102 } 103 err = db.AddStar(i.Db, did, subjectUri, e.Commit.RKey) 104 case models.CommitOperationDelete: 105 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 106 } 107 108 if err != nil { 109 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 110 } 111 112 return nil 113} 114 115func (i *Ingester) ingestFollow(e *models.Event) error { 116 var err error 117 did := e.Did 118 119 l := i.Logger.With("handler", "ingestFollow") 120 l = l.With("nsid", e.Commit.Collection) 121 122 switch e.Commit.Operation { 123 case models.CommitOperationCreate, models.CommitOperationUpdate: 124 raw := json.RawMessage(e.Commit.Record) 125 record := tangled.GraphFollow{} 126 err = json.Unmarshal(raw, &record) 127 if err != nil { 128 l.Error("invalid record", "err", err) 129 return err 130 } 131 132 subjectDid := record.Subject 133 err = db.AddFollow(i.Db, did, subjectDid, e.Commit.RKey) 134 case models.CommitOperationDelete: 135 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 136 } 137 138 if err != nil { 139 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 140 } 141 142 return nil 143} 144 145func (i *Ingester) ingestPublicKey(e *models.Event) error { 146 did := e.Did 147 var err error 148 149 l := i.Logger.With("handler", "ingestPublicKey") 150 l = l.With("nsid", e.Commit.Collection) 151 152 switch e.Commit.Operation { 153 case models.CommitOperationCreate, models.CommitOperationUpdate: 154 l.Debug("processing add of pubkey") 155 raw := json.RawMessage(e.Commit.Record) 156 record := tangled.PublicKey{} 157 err = json.Unmarshal(raw, &record) 158 if err != nil { 159 l.Error("invalid record", "err", err) 160 return err 161 } 162 163 name := record.Name 164 key := record.Key 165 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 166 case models.CommitOperationDelete: 167 l.Debug("processing delete of pubkey") 168 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 169 } 170 171 if err != nil { 172 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 173 } 174 175 return nil 176} 177 178func (i *Ingester) ingestArtifact(e *models.Event) error { 179 did := e.Did 180 var err error 181 182 l := i.Logger.With("handler", "ingestArtifact") 183 l = l.With("nsid", e.Commit.Collection) 184 185 switch e.Commit.Operation { 186 case models.CommitOperationCreate, models.CommitOperationUpdate: 187 raw := json.RawMessage(e.Commit.Record) 188 record := tangled.RepoArtifact{} 189 err = json.Unmarshal(raw, &record) 190 if err != nil { 191 l.Error("invalid record", "err", err) 192 return err 193 } 194 195 repoAt, err := syntax.ParseATURI(record.Repo) 196 if err != nil { 197 return err 198 } 199 200 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 201 if err != nil { 202 return err 203 } 204 205 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 206 if err != nil || !ok { 207 return err 208 } 209 210 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 211 if err != nil { 212 createdAt = time.Now() 213 } 214 215 artifact := db.Artifact{ 216 Did: did, 217 Rkey: e.Commit.RKey, 218 RepoAt: repoAt, 219 Tag: plumbing.Hash(record.Tag), 220 CreatedAt: createdAt, 221 BlobCid: cid.Cid(record.Artifact.Ref), 222 Name: record.Name, 223 Size: uint64(record.Artifact.Size), 224 MimeType: record.Artifact.MimeType, 225 } 226 227 err = db.AddArtifact(i.Db, artifact) 228 case models.CommitOperationDelete: 229 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 230 } 231 232 if err != nil { 233 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 234 } 235 236 return nil 237} 238 239func (i *Ingester) ingestProfile(e *models.Event) error { 240 did := e.Did 241 var err error 242 243 l := i.Logger.With("handler", "ingestProfile") 244 l = l.With("nsid", e.Commit.Collection) 245 246 if e.Commit.RKey != "self" { 247 return fmt.Errorf("ingestProfile only ingests `self` record") 248 } 249 250 switch e.Commit.Operation { 251 case models.CommitOperationCreate, models.CommitOperationUpdate: 252 raw := json.RawMessage(e.Commit.Record) 253 record := tangled.ActorProfile{} 254 err = json.Unmarshal(raw, &record) 255 if err != nil { 256 l.Error("invalid record", "err", err) 257 return err 258 } 259 260 description := "" 261 if record.Description != nil { 262 description = *record.Description 263 } 264 265 includeBluesky := record.Bluesky 266 267 location := "" 268 if record.Location != nil { 269 location = *record.Location 270 } 271 272 var links [5]string 273 for i, l := range record.Links { 274 if i < 5 { 275 links[i] = l 276 } 277 } 278 279 var stats [2]db.VanityStat 280 for i, s := range record.Stats { 281 if i < 2 { 282 stats[i].Kind = db.VanityStatKind(s) 283 } 284 } 285 286 var pinned [6]syntax.ATURI 287 for i, r := range record.PinnedRepositories { 288 if i < 6 { 289 pinned[i] = syntax.ATURI(r) 290 } 291 } 292 293 profile := db.Profile{ 294 Did: did, 295 Description: description, 296 IncludeBluesky: includeBluesky, 297 Location: location, 298 Links: links, 299 Stats: stats, 300 PinnedRepos: pinned, 301 } 302 303 ddb, ok := i.Db.Execer.(*db.DB) 304 if !ok { 305 return fmt.Errorf("failed to index profile record, invalid db cast") 306 } 307 308 tx, err := ddb.Begin() 309 if err != nil { 310 return fmt.Errorf("failed to start transaction") 311 } 312 313 err = db.ValidateProfile(tx, &profile) 314 if err != nil { 315 return fmt.Errorf("invalid profile record") 316 } 317 318 err = db.UpsertProfile(tx, &profile) 319 case models.CommitOperationDelete: 320 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 321 } 322 323 if err != nil { 324 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 325 } 326 327 return nil 328} 329 330func (i *Ingester) ingestSpindleMember(e *models.Event) error { 331 did := e.Did 332 var err error 333 334 l := i.Logger.With("handler", "ingestSpindleMember") 335 l = l.With("nsid", e.Commit.Collection) 336 337 switch e.Commit.Operation { 338 case models.CommitOperationCreate: 339 raw := json.RawMessage(e.Commit.Record) 340 record := tangled.SpindleMember{} 341 err = json.Unmarshal(raw, &record) 342 if err != nil { 343 l.Error("invalid record", "err", err) 344 return err 345 } 346 347 // only spindle owner can invite to spindles 348 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 349 if err != nil || !ok { 350 return fmt.Errorf("failed to enforce permissions: %w", err) 351 } 352 353 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 354 if err != nil { 355 return err 356 } 357 358 if memberId.Handle.IsInvalidHandle() { 359 return err 360 } 361 362 ddb, ok := i.Db.Execer.(*db.DB) 363 if !ok { 364 return fmt.Errorf("failed to index profile record, invalid db cast") 365 } 366 367 err = db.AddSpindleMember(ddb, db.SpindleMember{ 368 Did: syntax.DID(did), 369 Rkey: e.Commit.RKey, 370 Instance: record.Instance, 371 Subject: memberId.DID, 372 }) 373 if !ok { 374 return fmt.Errorf("failed to add to db: %w", err) 375 } 376 377 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 378 if err != nil { 379 return fmt.Errorf("failed to update ACLs: %w", err) 380 } 381 case models.CommitOperationDelete: 382 rkey := e.Commit.RKey 383 384 ddb, ok := i.Db.Execer.(*db.DB) 385 if !ok { 386 return fmt.Errorf("failed to index profile record, invalid db cast") 387 } 388 389 // get record from db first 390 members, err := db.GetSpindleMembers( 391 ddb, 392 db.FilterEq("did", did), 393 db.FilterEq("rkey", rkey), 394 ) 395 if err != nil || len(members) != 1 { 396 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 397 } 398 member := members[0] 399 400 tx, err := ddb.Begin() 401 if err != nil { 402 return fmt.Errorf("failed to start txn: %w", err) 403 } 404 405 // remove record by rkey && update enforcer 406 if err = db.RemoveSpindleMember( 407 tx, 408 db.FilterEq("did", did), 409 db.FilterEq("rkey", rkey), 410 ); err != nil { 411 return fmt.Errorf("failed to remove from db: %w", err) 412 } 413 414 // update enforcer 415 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 416 if err != nil { 417 return fmt.Errorf("failed to update ACLs: %w", err) 418 } 419 420 if err = tx.Commit(); err != nil { 421 return fmt.Errorf("failed to commit txn: %w", err) 422 } 423 424 if err = i.Enforcer.E.SavePolicy(); err != nil { 425 return fmt.Errorf("failed to save ACLs: %w", err) 426 } 427 } 428 429 return nil 430} 431 432func (i *Ingester) ingestSpindle(e *models.Event) error { 433 did := e.Did 434 var err error 435 436 l := i.Logger.With("handler", "ingestSpindle") 437 l = l.With("nsid", e.Commit.Collection) 438 439 switch e.Commit.Operation { 440 case models.CommitOperationCreate: 441 raw := json.RawMessage(e.Commit.Record) 442 record := tangled.Spindle{} 443 err = json.Unmarshal(raw, &record) 444 if err != nil { 445 l.Error("invalid record", "err", err) 446 return err 447 } 448 449 instance := e.Commit.RKey 450 451 ddb, ok := i.Db.Execer.(*db.DB) 452 if !ok { 453 return fmt.Errorf("failed to index profile record, invalid db cast") 454 } 455 456 err := db.AddSpindle(ddb, db.Spindle{ 457 Owner: syntax.DID(did), 458 Instance: instance, 459 }) 460 if err != nil { 461 l.Error("failed to add spindle to db", "err", err, "instance", instance) 462 return err 463 } 464 465 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev) 466 if err != nil { 467 l.Error("failed to add spindle to db", "err", err, "instance", instance) 468 return err 469 } 470 471 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did) 472 if err != nil { 473 return fmt.Errorf("failed to mark verified: %w", err) 474 } 475 476 return nil 477 478 case models.CommitOperationDelete: 479 instance := e.Commit.RKey 480 481 ddb, ok := i.Db.Execer.(*db.DB) 482 if !ok { 483 return fmt.Errorf("failed to index profile record, invalid db cast") 484 } 485 486 // get record from db first 487 spindles, err := db.GetSpindles( 488 ddb, 489 db.FilterEq("owner", did), 490 db.FilterEq("instance", instance), 491 ) 492 if err != nil || len(spindles) != 1 { 493 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 494 } 495 spindle := spindles[0] 496 497 tx, err := ddb.Begin() 498 if err != nil { 499 return err 500 } 501 defer func() { 502 tx.Rollback() 503 i.Enforcer.E.LoadPolicy() 504 }() 505 506 err = db.DeleteSpindle( 507 tx, 508 db.FilterEq("owner", did), 509 db.FilterEq("instance", instance), 510 ) 511 if err != nil { 512 return err 513 } 514 515 if spindle.Verified != nil { 516 err = i.Enforcer.RemoveSpindle(instance) 517 if err != nil { 518 return err 519 } 520 } 521 522 err = tx.Commit() 523 if err != nil { 524 return err 525 } 526 527 err = i.Enforcer.E.SavePolicy() 528 if err != nil { 529 return err 530 } 531 } 532 533 return nil 534}