forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/go-git/go-git/v5/plumbing" 13 "github.com/ipfs/go-cid" 14 "tangled.sh/tangled.sh/core/api/tangled" 15 "tangled.sh/tangled.sh/core/appview/config" 16 "tangled.sh/tangled.sh/core/appview/db" 17 "tangled.sh/tangled.sh/core/appview/idresolver" 18 "tangled.sh/tangled.sh/core/appview/spindleverify" 19 "tangled.sh/tangled.sh/core/rbac" 20) 21 22type Ingester struct { 23 Db db.DbWrapper 24 Enforcer *rbac.Enforcer 25 IdResolver *idresolver.Resolver 26 Config *config.Config 27 Logger *slog.Logger 28} 29 30type processFunc func(ctx context.Context, e *models.Event) error 31 32func (i *Ingester) Ingest() processFunc { 33 return func(ctx context.Context, e *models.Event) error { 34 var err error 35 defer func() { 36 eventTime := e.TimeUS 37 lastTimeUs := eventTime + 1 38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 40 } 41 }() 42 43 if e.Kind != models.EventKindCommit { 44 return nil 45 } 46 47 switch e.Commit.Collection { 48 case tangled.GraphFollowNSID: 49 err = i.ingestFollow(e) 50 case tangled.FeedStarNSID: 51 err = i.ingestStar(e) 52 case tangled.PublicKeyNSID: 53 err = i.ingestPublicKey(e) 54 case tangled.RepoArtifactNSID: 55 err = i.ingestArtifact(e) 56 case tangled.ActorProfileNSID: 57 err = i.ingestProfile(e) 58 case tangled.SpindleMemberNSID: 59 err = i.ingestSpindleMember(e) 60 case tangled.SpindleNSID: 61 err = i.ingestSpindle(e) 62 } 63 64 if err != nil { 65 l := i.Logger.With("nsid", e.Commit.Collection) 66 l.Error("error ingesting record", "err", err) 67 } 68 69 return err 70 } 71} 72 73func (i *Ingester) ingestStar(e *models.Event) error { 74 var err error 75 did := e.Did 76 77 l := i.Logger.With("handler", "ingestStar") 78 l = l.With("nsid", e.Commit.Collection) 79 80 switch e.Commit.Operation { 81 case models.CommitOperationCreate, models.CommitOperationUpdate: 82 var subjectUri syntax.ATURI 83 84 raw := json.RawMessage(e.Commit.Record) 85 record := tangled.FeedStar{} 86 err := json.Unmarshal(raw, &record) 87 if err != nil { 88 l.Error("invalid record", "err", err) 89 return err 90 } 91 92 subjectUri, err = syntax.ParseATURI(record.Subject) 93 if err != nil { 94 l.Error("invalid record", "err", err) 95 return err 96 } 97 err = db.AddStar(i.Db, did, subjectUri, e.Commit.RKey) 98 case models.CommitOperationDelete: 99 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 100 } 101 102 if err != nil { 103 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 104 } 105 106 return nil 107} 108 109func (i *Ingester) ingestFollow(e *models.Event) error { 110 var err error 111 did := e.Did 112 113 l := i.Logger.With("handler", "ingestFollow") 114 l = l.With("nsid", e.Commit.Collection) 115 116 switch e.Commit.Operation { 117 case models.CommitOperationCreate, models.CommitOperationUpdate: 118 raw := json.RawMessage(e.Commit.Record) 119 record := tangled.GraphFollow{} 120 err = json.Unmarshal(raw, &record) 121 if err != nil { 122 l.Error("invalid record", "err", err) 123 return err 124 } 125 126 subjectDid := record.Subject 127 err = db.AddFollow(i.Db, did, subjectDid, e.Commit.RKey) 128 case models.CommitOperationDelete: 129 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 130 } 131 132 if err != nil { 133 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 134 } 135 136 return nil 137} 138 139func (i *Ingester) ingestPublicKey(e *models.Event) error { 140 did := e.Did 141 var err error 142 143 l := i.Logger.With("handler", "ingestPublicKey") 144 l = l.With("nsid", e.Commit.Collection) 145 146 switch e.Commit.Operation { 147 case models.CommitOperationCreate, models.CommitOperationUpdate: 148 l.Debug("processing add of pubkey") 149 raw := json.RawMessage(e.Commit.Record) 150 record := tangled.PublicKey{} 151 err = json.Unmarshal(raw, &record) 152 if err != nil { 153 l.Error("invalid record", "err", err) 154 return err 155 } 156 157 name := record.Name 158 key := record.Key 159 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 160 case models.CommitOperationDelete: 161 l.Debug("processing delete of pubkey") 162 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 163 } 164 165 if err != nil { 166 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 167 } 168 169 return nil 170} 171 172func (i *Ingester) ingestArtifact(e *models.Event) error { 173 did := e.Did 174 var err error 175 176 l := i.Logger.With("handler", "ingestArtifact") 177 l = l.With("nsid", e.Commit.Collection) 178 179 switch e.Commit.Operation { 180 case models.CommitOperationCreate, models.CommitOperationUpdate: 181 raw := json.RawMessage(e.Commit.Record) 182 record := tangled.RepoArtifact{} 183 err = json.Unmarshal(raw, &record) 184 if err != nil { 185 l.Error("invalid record", "err", err) 186 return err 187 } 188 189 repoAt, err := syntax.ParseATURI(record.Repo) 190 if err != nil { 191 return err 192 } 193 194 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 195 if err != nil { 196 return err 197 } 198 199 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 200 if err != nil || !ok { 201 return err 202 } 203 204 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 205 if err != nil { 206 createdAt = time.Now() 207 } 208 209 artifact := db.Artifact{ 210 Did: did, 211 Rkey: e.Commit.RKey, 212 RepoAt: repoAt, 213 Tag: plumbing.Hash(record.Tag), 214 CreatedAt: createdAt, 215 BlobCid: cid.Cid(record.Artifact.Ref), 216 Name: record.Name, 217 Size: uint64(record.Artifact.Size), 218 MimeType: record.Artifact.MimeType, 219 } 220 221 err = db.AddArtifact(i.Db, artifact) 222 case models.CommitOperationDelete: 223 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 224 } 225 226 if err != nil { 227 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 228 } 229 230 return nil 231} 232 233func (i *Ingester) ingestProfile(e *models.Event) error { 234 did := e.Did 235 var err error 236 237 l := i.Logger.With("handler", "ingestProfile") 238 l = l.With("nsid", e.Commit.Collection) 239 240 if e.Commit.RKey != "self" { 241 return fmt.Errorf("ingestProfile only ingests `self` record") 242 } 243 244 switch e.Commit.Operation { 245 case models.CommitOperationCreate, models.CommitOperationUpdate: 246 raw := json.RawMessage(e.Commit.Record) 247 record := tangled.ActorProfile{} 248 err = json.Unmarshal(raw, &record) 249 if err != nil { 250 l.Error("invalid record", "err", err) 251 return err 252 } 253 254 description := "" 255 if record.Description != nil { 256 description = *record.Description 257 } 258 259 includeBluesky := record.Bluesky 260 261 location := "" 262 if record.Location != nil { 263 location = *record.Location 264 } 265 266 var links [5]string 267 for i, l := range record.Links { 268 if i < 5 { 269 links[i] = l 270 } 271 } 272 273 var stats [2]db.VanityStat 274 for i, s := range record.Stats { 275 if i < 2 { 276 stats[i].Kind = db.VanityStatKind(s) 277 } 278 } 279 280 var pinned [6]syntax.ATURI 281 for i, r := range record.PinnedRepositories { 282 if i < 6 { 283 pinned[i] = syntax.ATURI(r) 284 } 285 } 286 287 profile := db.Profile{ 288 Did: did, 289 Description: description, 290 IncludeBluesky: includeBluesky, 291 Location: location, 292 Links: links, 293 Stats: stats, 294 PinnedRepos: pinned, 295 } 296 297 ddb, ok := i.Db.Execer.(*db.DB) 298 if !ok { 299 return fmt.Errorf("failed to index profile record, invalid db cast") 300 } 301 302 tx, err := ddb.Begin() 303 if err != nil { 304 return fmt.Errorf("failed to start transaction") 305 } 306 307 err = db.ValidateProfile(tx, &profile) 308 if err != nil { 309 return fmt.Errorf("invalid profile record") 310 } 311 312 err = db.UpsertProfile(tx, &profile) 313 case models.CommitOperationDelete: 314 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 315 } 316 317 if err != nil { 318 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 319 } 320 321 return nil 322} 323 324func (i *Ingester) ingestSpindleMember(e *models.Event) error { 325 did := e.Did 326 var err error 327 328 l := i.Logger.With("handler", "ingestSpindleMember") 329 l = l.With("nsid", e.Commit.Collection) 330 331 switch e.Commit.Operation { 332 case models.CommitOperationCreate: 333 raw := json.RawMessage(e.Commit.Record) 334 record := tangled.SpindleMember{} 335 err = json.Unmarshal(raw, &record) 336 if err != nil { 337 l.Error("invalid record", "err", err) 338 return err 339 } 340 341 // only spindle owner can invite to spindles 342 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 343 if err != nil || !ok { 344 return fmt.Errorf("failed to enforce permissions: %w", err) 345 } 346 347 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 348 if err != nil { 349 return err 350 } 351 352 if memberId.Handle.IsInvalidHandle() { 353 return err 354 } 355 356 ddb, ok := i.Db.Execer.(*db.DB) 357 if !ok { 358 return fmt.Errorf("failed to index profile record, invalid db cast") 359 } 360 361 err = db.AddSpindleMember(ddb, db.SpindleMember{ 362 Did: syntax.DID(did), 363 Rkey: e.Commit.RKey, 364 Instance: record.Instance, 365 Subject: memberId.DID, 366 }) 367 if !ok { 368 return fmt.Errorf("failed to add to db: %w", err) 369 } 370 371 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 372 if err != nil { 373 return fmt.Errorf("failed to update ACLs: %w", err) 374 } 375 case models.CommitOperationDelete: 376 rkey := e.Commit.RKey 377 378 ddb, ok := i.Db.Execer.(*db.DB) 379 if !ok { 380 return fmt.Errorf("failed to index profile record, invalid db cast") 381 } 382 383 // get record from db first 384 members, err := db.GetSpindleMembers( 385 ddb, 386 db.FilterEq("did", did), 387 db.FilterEq("rkey", rkey), 388 ) 389 if err != nil || len(members) != 1 { 390 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 391 } 392 member := members[0] 393 394 tx, err := ddb.Begin() 395 if err != nil { 396 return fmt.Errorf("failed to start txn: %w", err) 397 } 398 399 // remove record by rkey && update enforcer 400 if err = db.RemoveSpindleMember( 401 tx, 402 db.FilterEq("did", did), 403 db.FilterEq("rkey", rkey), 404 ); err != nil { 405 return fmt.Errorf("failed to remove from db: %w", err) 406 } 407 408 // update enforcer 409 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 410 if err != nil { 411 return fmt.Errorf("failed to update ACLs: %w", err) 412 } 413 414 if err = tx.Commit(); err != nil { 415 return fmt.Errorf("failed to commit txn: %w", err) 416 } 417 418 if err = i.Enforcer.E.SavePolicy(); err != nil { 419 return fmt.Errorf("failed to save ACLs: %w", err) 420 } 421 } 422 423 return nil 424} 425 426func (i *Ingester) ingestSpindle(e *models.Event) error { 427 did := e.Did 428 var err error 429 430 l := i.Logger.With("handler", "ingestSpindle") 431 l = l.With("nsid", e.Commit.Collection) 432 433 switch e.Commit.Operation { 434 case models.CommitOperationCreate: 435 raw := json.RawMessage(e.Commit.Record) 436 record := tangled.Spindle{} 437 err = json.Unmarshal(raw, &record) 438 if err != nil { 439 l.Error("invalid record", "err", err) 440 return err 441 } 442 443 instance := e.Commit.RKey 444 445 ddb, ok := i.Db.Execer.(*db.DB) 446 if !ok { 447 return fmt.Errorf("failed to index profile record, invalid db cast") 448 } 449 450 err := db.AddSpindle(ddb, db.Spindle{ 451 Owner: syntax.DID(did), 452 Instance: instance, 453 }) 454 if err != nil { 455 l.Error("failed to add spindle to db", "err", err, "instance", instance) 456 return err 457 } 458 459 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev) 460 if err != nil { 461 l.Error("failed to add spindle to db", "err", err, "instance", instance) 462 return err 463 } 464 465 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did) 466 if err != nil { 467 return fmt.Errorf("failed to mark verified: %w", err) 468 } 469 470 return nil 471 472 case models.CommitOperationDelete: 473 instance := e.Commit.RKey 474 475 ddb, ok := i.Db.Execer.(*db.DB) 476 if !ok { 477 return fmt.Errorf("failed to index profile record, invalid db cast") 478 } 479 480 tx, err := ddb.Begin() 481 if err != nil { 482 return err 483 } 484 defer func() { 485 tx.Rollback() 486 i.Enforcer.E.LoadPolicy() 487 }() 488 489 err = db.DeleteSpindle( 490 tx, 491 db.FilterEq("owner", did), 492 db.FilterEq("instance", instance), 493 ) 494 if err != nil { 495 return err 496 } 497 498 err = i.Enforcer.RemoveSpindle(instance) 499 if err != nil { 500 return err 501 } 502 503 err = tx.Commit() 504 if err != nil { 505 return err 506 } 507 508 err = i.Enforcer.E.SavePolicy() 509 if err != nil { 510 return err 511 } 512 } 513 514 return nil 515}