forked from tangled.org/core
this repo has no description
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/go-git/go-git/v5/plumbing" 13 "github.com/ipfs/go-cid" 14 "tangled.sh/tangled.sh/core/api/tangled" 15 "tangled.sh/tangled.sh/core/appview/config" 16 "tangled.sh/tangled.sh/core/appview/db" 17 "tangled.sh/tangled.sh/core/appview/spindleverify" 18 "tangled.sh/tangled.sh/core/idresolver" 19 "tangled.sh/tangled.sh/core/rbac" 20) 21 22type Ingester struct { 23 Db db.DbWrapper 24 Enforcer *rbac.Enforcer 25 IdResolver *idresolver.Resolver 26 Config *config.Config 27 Logger *slog.Logger 28} 29 30type processFunc func(ctx context.Context, e *models.Event) error 31 32func (i *Ingester) Ingest() processFunc { 33 return func(ctx context.Context, e *models.Event) error { 34 var err error 35 defer func() { 36 eventTime := e.TimeUS 37 lastTimeUs := eventTime + 1 38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 40 } 41 }() 42 43 l := i.Logger.With("kind", e.Kind) 44 switch e.Kind { 45 case models.EventKindAccount: 46 if !e.Account.Active && *e.Account.Status == "deactivated" { 47 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 48 } 49 case models.EventKindIdentity: 50 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 51 case models.EventKindCommit: 52 switch e.Commit.Collection { 53 case tangled.GraphFollowNSID: 54 err = i.ingestFollow(e) 55 case tangled.FeedStarNSID: 56 err = i.ingestStar(e) 57 case tangled.PublicKeyNSID: 58 err = i.ingestPublicKey(e) 59 case tangled.RepoArtifactNSID: 60 err = i.ingestArtifact(e) 61 case tangled.ActorProfileNSID: 62 err = i.ingestProfile(e) 63 case tangled.SpindleMemberNSID: 64 err = i.ingestSpindleMember(e) 65 case tangled.SpindleNSID: 66 err = i.ingestSpindle(e) 67 } 68 l = i.Logger.With("nsid", e.Commit.Collection) 69 } 70 71 if err != nil { 72 l.Error("error ingesting record", "err", err) 73 } 74 75 return err 76 } 77} 78 79func (i *Ingester) ingestStar(e *models.Event) error { 80 var err error 81 did := e.Did 82 83 l := i.Logger.With("handler", "ingestStar") 84 l = l.With("nsid", e.Commit.Collection) 85 86 switch e.Commit.Operation { 87 case models.CommitOperationCreate, models.CommitOperationUpdate: 88 var subjectUri syntax.ATURI 89 90 raw := json.RawMessage(e.Commit.Record) 91 record := tangled.FeedStar{} 92 err := json.Unmarshal(raw, &record) 93 if err != nil { 94 l.Error("invalid record", "err", err) 95 return err 96 } 97 98 subjectUri, err = syntax.ParseATURI(record.Subject) 99 if err != nil { 100 l.Error("invalid record", "err", err) 101 return err 102 } 103 err = db.AddStar(i.Db, &db.Star{ 104 StarredByDid: did, 105 RepoAt: subjectUri, 106 Rkey: e.Commit.RKey, 107 }) 108 case models.CommitOperationDelete: 109 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 110 } 111 112 if err != nil { 113 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 114 } 115 116 return nil 117} 118 119func (i *Ingester) ingestFollow(e *models.Event) error { 120 var err error 121 did := e.Did 122 123 l := i.Logger.With("handler", "ingestFollow") 124 l = l.With("nsid", e.Commit.Collection) 125 126 switch e.Commit.Operation { 127 case models.CommitOperationCreate, models.CommitOperationUpdate: 128 raw := json.RawMessage(e.Commit.Record) 129 record := tangled.GraphFollow{} 130 err = json.Unmarshal(raw, &record) 131 if err != nil { 132 l.Error("invalid record", "err", err) 133 return err 134 } 135 136 err = db.AddFollow(i.Db, &db.Follow{ 137 UserDid: did, 138 SubjectDid: record.Subject, 139 Rkey: e.Commit.RKey, 140 }) 141 case models.CommitOperationDelete: 142 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 143 } 144 145 if err != nil { 146 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 147 } 148 149 return nil 150} 151 152func (i *Ingester) ingestPublicKey(e *models.Event) error { 153 did := e.Did 154 var err error 155 156 l := i.Logger.With("handler", "ingestPublicKey") 157 l = l.With("nsid", e.Commit.Collection) 158 159 switch e.Commit.Operation { 160 case models.CommitOperationCreate, models.CommitOperationUpdate: 161 l.Debug("processing add of pubkey") 162 raw := json.RawMessage(e.Commit.Record) 163 record := tangled.PublicKey{} 164 err = json.Unmarshal(raw, &record) 165 if err != nil { 166 l.Error("invalid record", "err", err) 167 return err 168 } 169 170 name := record.Name 171 key := record.Key 172 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 173 case models.CommitOperationDelete: 174 l.Debug("processing delete of pubkey") 175 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 176 } 177 178 if err != nil { 179 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 180 } 181 182 return nil 183} 184 185func (i *Ingester) ingestArtifact(e *models.Event) error { 186 did := e.Did 187 var err error 188 189 l := i.Logger.With("handler", "ingestArtifact") 190 l = l.With("nsid", e.Commit.Collection) 191 192 switch e.Commit.Operation { 193 case models.CommitOperationCreate, models.CommitOperationUpdate: 194 raw := json.RawMessage(e.Commit.Record) 195 record := tangled.RepoArtifact{} 196 err = json.Unmarshal(raw, &record) 197 if err != nil { 198 l.Error("invalid record", "err", err) 199 return err 200 } 201 202 repoAt, err := syntax.ParseATURI(record.Repo) 203 if err != nil { 204 return err 205 } 206 207 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 208 if err != nil { 209 return err 210 } 211 212 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 213 if err != nil || !ok { 214 return err 215 } 216 217 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 218 if err != nil { 219 createdAt = time.Now() 220 } 221 222 artifact := db.Artifact{ 223 Did: did, 224 Rkey: e.Commit.RKey, 225 RepoAt: repoAt, 226 Tag: plumbing.Hash(record.Tag), 227 CreatedAt: createdAt, 228 BlobCid: cid.Cid(record.Artifact.Ref), 229 Name: record.Name, 230 Size: uint64(record.Artifact.Size), 231 MimeType: record.Artifact.MimeType, 232 } 233 234 err = db.AddArtifact(i.Db, artifact) 235 case models.CommitOperationDelete: 236 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 237 } 238 239 if err != nil { 240 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 241 } 242 243 return nil 244} 245 246func (i *Ingester) ingestProfile(e *models.Event) error { 247 did := e.Did 248 var err error 249 250 l := i.Logger.With("handler", "ingestProfile") 251 l = l.With("nsid", e.Commit.Collection) 252 253 if e.Commit.RKey != "self" { 254 return fmt.Errorf("ingestProfile only ingests `self` record") 255 } 256 257 switch e.Commit.Operation { 258 case models.CommitOperationCreate, models.CommitOperationUpdate: 259 raw := json.RawMessage(e.Commit.Record) 260 record := tangled.ActorProfile{} 261 err = json.Unmarshal(raw, &record) 262 if err != nil { 263 l.Error("invalid record", "err", err) 264 return err 265 } 266 267 description := "" 268 if record.Description != nil { 269 description = *record.Description 270 } 271 272 includeBluesky := record.Bluesky 273 274 location := "" 275 if record.Location != nil { 276 location = *record.Location 277 } 278 279 var links [5]string 280 for i, l := range record.Links { 281 if i < 5 { 282 links[i] = l 283 } 284 } 285 286 var stats [2]db.VanityStat 287 for i, s := range record.Stats { 288 if i < 2 { 289 stats[i].Kind = db.VanityStatKind(s) 290 } 291 } 292 293 var pinned [6]syntax.ATURI 294 for i, r := range record.PinnedRepositories { 295 if i < 6 { 296 pinned[i] = syntax.ATURI(r) 297 } 298 } 299 300 profile := db.Profile{ 301 Did: did, 302 Description: description, 303 IncludeBluesky: includeBluesky, 304 Location: location, 305 Links: links, 306 Stats: stats, 307 PinnedRepos: pinned, 308 } 309 310 ddb, ok := i.Db.Execer.(*db.DB) 311 if !ok { 312 return fmt.Errorf("failed to index profile record, invalid db cast") 313 } 314 315 tx, err := ddb.Begin() 316 if err != nil { 317 return fmt.Errorf("failed to start transaction") 318 } 319 320 err = db.ValidateProfile(tx, &profile) 321 if err != nil { 322 return fmt.Errorf("invalid profile record") 323 } 324 325 err = db.UpsertProfile(tx, &profile) 326 case models.CommitOperationDelete: 327 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 328 } 329 330 if err != nil { 331 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 332 } 333 334 return nil 335} 336 337func (i *Ingester) ingestSpindleMember(e *models.Event) error { 338 did := e.Did 339 var err error 340 341 l := i.Logger.With("handler", "ingestSpindleMember") 342 l = l.With("nsid", e.Commit.Collection) 343 344 switch e.Commit.Operation { 345 case models.CommitOperationCreate: 346 raw := json.RawMessage(e.Commit.Record) 347 record := tangled.SpindleMember{} 348 err = json.Unmarshal(raw, &record) 349 if err != nil { 350 l.Error("invalid record", "err", err) 351 return err 352 } 353 354 // only spindle owner can invite to spindles 355 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 356 if err != nil || !ok { 357 return fmt.Errorf("failed to enforce permissions: %w", err) 358 } 359 360 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 361 if err != nil { 362 return err 363 } 364 365 if memberId.Handle.IsInvalidHandle() { 366 return err 367 } 368 369 ddb, ok := i.Db.Execer.(*db.DB) 370 if !ok { 371 return fmt.Errorf("failed to index profile record, invalid db cast") 372 } 373 374 err = db.AddSpindleMember(ddb, db.SpindleMember{ 375 Did: syntax.DID(did), 376 Rkey: e.Commit.RKey, 377 Instance: record.Instance, 378 Subject: memberId.DID, 379 }) 380 if !ok { 381 return fmt.Errorf("failed to add to db: %w", err) 382 } 383 384 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 385 if err != nil { 386 return fmt.Errorf("failed to update ACLs: %w", err) 387 } 388 case models.CommitOperationDelete: 389 rkey := e.Commit.RKey 390 391 ddb, ok := i.Db.Execer.(*db.DB) 392 if !ok { 393 return fmt.Errorf("failed to index profile record, invalid db cast") 394 } 395 396 // get record from db first 397 members, err := db.GetSpindleMembers( 398 ddb, 399 db.FilterEq("did", did), 400 db.FilterEq("rkey", rkey), 401 ) 402 if err != nil || len(members) != 1 { 403 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 404 } 405 member := members[0] 406 407 tx, err := ddb.Begin() 408 if err != nil { 409 return fmt.Errorf("failed to start txn: %w", err) 410 } 411 412 // remove record by rkey && update enforcer 413 if err = db.RemoveSpindleMember( 414 tx, 415 db.FilterEq("did", did), 416 db.FilterEq("rkey", rkey), 417 ); err != nil { 418 return fmt.Errorf("failed to remove from db: %w", err) 419 } 420 421 // update enforcer 422 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 423 if err != nil { 424 return fmt.Errorf("failed to update ACLs: %w", err) 425 } 426 427 if err = tx.Commit(); err != nil { 428 return fmt.Errorf("failed to commit txn: %w", err) 429 } 430 431 if err = i.Enforcer.E.SavePolicy(); err != nil { 432 return fmt.Errorf("failed to save ACLs: %w", err) 433 } 434 } 435 436 return nil 437} 438 439func (i *Ingester) ingestSpindle(e *models.Event) error { 440 did := e.Did 441 var err error 442 443 l := i.Logger.With("handler", "ingestSpindle") 444 l = l.With("nsid", e.Commit.Collection) 445 446 switch e.Commit.Operation { 447 case models.CommitOperationCreate: 448 raw := json.RawMessage(e.Commit.Record) 449 record := tangled.Spindle{} 450 err = json.Unmarshal(raw, &record) 451 if err != nil { 452 l.Error("invalid record", "err", err) 453 return err 454 } 455 456 instance := e.Commit.RKey 457 458 ddb, ok := i.Db.Execer.(*db.DB) 459 if !ok { 460 return fmt.Errorf("failed to index profile record, invalid db cast") 461 } 462 463 err := db.AddSpindle(ddb, db.Spindle{ 464 Owner: syntax.DID(did), 465 Instance: instance, 466 }) 467 if err != nil { 468 l.Error("failed to add spindle to db", "err", err, "instance", instance) 469 return err 470 } 471 472 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev) 473 if err != nil { 474 l.Error("failed to add spindle to db", "err", err, "instance", instance) 475 return err 476 } 477 478 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did) 479 if err != nil { 480 return fmt.Errorf("failed to mark verified: %w", err) 481 } 482 483 return nil 484 485 case models.CommitOperationDelete: 486 instance := e.Commit.RKey 487 488 ddb, ok := i.Db.Execer.(*db.DB) 489 if !ok { 490 return fmt.Errorf("failed to index profile record, invalid db cast") 491 } 492 493 // get record from db first 494 spindles, err := db.GetSpindles( 495 ddb, 496 db.FilterEq("owner", did), 497 db.FilterEq("instance", instance), 498 ) 499 if err != nil || len(spindles) != 1 { 500 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 501 } 502 spindle := spindles[0] 503 504 tx, err := ddb.Begin() 505 if err != nil { 506 return err 507 } 508 defer func() { 509 tx.Rollback() 510 i.Enforcer.E.LoadPolicy() 511 }() 512 513 err = db.DeleteSpindle( 514 tx, 515 db.FilterEq("owner", did), 516 db.FilterEq("instance", instance), 517 ) 518 if err != nil { 519 return err 520 } 521 522 if spindle.Verified != nil { 523 err = i.Enforcer.RemoveSpindle(instance) 524 if err != nil { 525 return err 526 } 527 } 528 529 err = tx.Commit() 530 if err != nil { 531 return err 532 } 533 534 err = i.Enforcer.E.SavePolicy() 535 if err != nil { 536 return err 537 } 538 } 539 540 return nil 541}