forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log" 10 "net/http" 11 "strings" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/jetstream/pkg/models" 16 "github.com/go-git/go-git/v5/plumbing" 17 "github.com/ipfs/go-cid" 18 "tangled.sh/tangled.sh/core/api/tangled" 19 "tangled.sh/tangled.sh/core/appview/db" 20 "tangled.sh/tangled.sh/core/rbac" 21) 22 23type Ingester func(ctx context.Context, e *models.Event) error 24 25func Ingest(d db.DbWrapper, enforcer *rbac.Enforcer) Ingester { 26 return func(ctx context.Context, e *models.Event) error { 27 var err error 28 defer func() { 29 eventTime := e.TimeUS 30 lastTimeUs := eventTime + 1 31 if err := d.SaveLastTimeUs(lastTimeUs); err != nil { 32 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 33 } 34 }() 35 36 if e.Kind != models.EventKindCommit { 37 return nil 38 } 39 40 switch e.Commit.Collection { 41 case tangled.GraphFollowNSID: 42 ingestFollow(&d, e) 43 case tangled.FeedStarNSID: 44 ingestStar(&d, e) 45 case tangled.PublicKeyNSID: 46 ingestPublicKey(&d, e) 47 case tangled.RepoArtifactNSID: 48 ingestArtifact(&d, e, enforcer) 49 case tangled.ActorProfileNSID: 50 ingestProfile(&d, e) 51 case tangled.SpindleMemberNSID: 52 ingestSpindleMember(&d, e, enforcer) 53 case tangled.SpindleNSID: 54 ingestSpindle(&d, e, true) // TODO: change this to dynamic 55 } 56 57 return err 58 } 59} 60 61func ingestStar(d *db.DbWrapper, e *models.Event) error { 62 var err error 63 did := e.Did 64 65 switch e.Commit.Operation { 66 case models.CommitOperationCreate, models.CommitOperationUpdate: 67 var subjectUri syntax.ATURI 68 69 raw := json.RawMessage(e.Commit.Record) 70 record := tangled.FeedStar{} 71 err := json.Unmarshal(raw, &record) 72 if err != nil { 73 log.Println("invalid record") 74 return err 75 } 76 77 subjectUri, err = syntax.ParseATURI(record.Subject) 78 if err != nil { 79 log.Println("invalid record") 80 return err 81 } 82 err = db.AddStar(d, did, subjectUri, e.Commit.RKey) 83 case models.CommitOperationDelete: 84 err = db.DeleteStarByRkey(d, did, e.Commit.RKey) 85 } 86 87 if err != nil { 88 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 89 } 90 91 return nil 92} 93 94func ingestFollow(d *db.DbWrapper, e *models.Event) error { 95 var err error 96 did := e.Did 97 98 switch e.Commit.Operation { 99 case models.CommitOperationCreate, models.CommitOperationUpdate: 100 raw := json.RawMessage(e.Commit.Record) 101 record := tangled.GraphFollow{} 102 err = json.Unmarshal(raw, &record) 103 if err != nil { 104 log.Println("invalid record") 105 return err 106 } 107 108 subjectDid := record.Subject 109 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey) 110 case models.CommitOperationDelete: 111 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey) 112 } 113 114 if err != nil { 115 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 116 } 117 118 return nil 119} 120 121func ingestPublicKey(d *db.DbWrapper, e *models.Event) error { 122 did := e.Did 123 var err error 124 125 switch e.Commit.Operation { 126 case models.CommitOperationCreate, models.CommitOperationUpdate: 127 log.Println("processing add of pubkey") 128 raw := json.RawMessage(e.Commit.Record) 129 record := tangled.PublicKey{} 130 err = json.Unmarshal(raw, &record) 131 if err != nil { 132 log.Printf("invalid record: %s", err) 133 return err 134 } 135 136 name := record.Name 137 key := record.Key 138 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey) 139 case models.CommitOperationDelete: 140 log.Println("processing delete of pubkey") 141 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey) 142 } 143 144 if err != nil { 145 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 146 } 147 148 return nil 149} 150 151func ingestArtifact(d *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error { 152 did := e.Did 153 var err error 154 155 switch e.Commit.Operation { 156 case models.CommitOperationCreate, models.CommitOperationUpdate: 157 raw := json.RawMessage(e.Commit.Record) 158 record := tangled.RepoArtifact{} 159 err = json.Unmarshal(raw, &record) 160 if err != nil { 161 log.Printf("invalid record: %s", err) 162 return err 163 } 164 165 repoAt, err := syntax.ParseATURI(record.Repo) 166 if err != nil { 167 return err 168 } 169 170 repo, err := db.GetRepoByAtUri(d, repoAt.String()) 171 if err != nil { 172 return err 173 } 174 175 ok, err := enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 176 if err != nil || !ok { 177 return err 178 } 179 180 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 181 if err != nil { 182 createdAt = time.Now() 183 } 184 185 artifact := db.Artifact{ 186 Did: did, 187 Rkey: e.Commit.RKey, 188 RepoAt: repoAt, 189 Tag: plumbing.Hash(record.Tag), 190 CreatedAt: createdAt, 191 BlobCid: cid.Cid(record.Artifact.Ref), 192 Name: record.Name, 193 Size: uint64(record.Artifact.Size), 194 MimeType: record.Artifact.MimeType, 195 } 196 197 err = db.AddArtifact(d, artifact) 198 case models.CommitOperationDelete: 199 err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 200 } 201 202 if err != nil { 203 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 204 } 205 206 return nil 207} 208 209func ingestProfile(d *db.DbWrapper, e *models.Event) error { 210 did := e.Did 211 var err error 212 213 if e.Commit.RKey != "self" { 214 return fmt.Errorf("ingestProfile only ingests `self` record") 215 } 216 217 switch e.Commit.Operation { 218 case models.CommitOperationCreate, models.CommitOperationUpdate: 219 raw := json.RawMessage(e.Commit.Record) 220 record := tangled.ActorProfile{} 221 err = json.Unmarshal(raw, &record) 222 if err != nil { 223 log.Printf("invalid record: %s", err) 224 return err 225 } 226 227 description := "" 228 if record.Description != nil { 229 description = *record.Description 230 } 231 232 includeBluesky := record.Bluesky 233 234 location := "" 235 if record.Location != nil { 236 location = *record.Location 237 } 238 239 var links [5]string 240 for i, l := range record.Links { 241 if i < 5 { 242 links[i] = l 243 } 244 } 245 246 var stats [2]db.VanityStat 247 for i, s := range record.Stats { 248 if i < 2 { 249 stats[i].Kind = db.VanityStatKind(s) 250 } 251 } 252 253 var pinned [6]syntax.ATURI 254 for i, r := range record.PinnedRepositories { 255 if i < 6 { 256 pinned[i] = syntax.ATURI(r) 257 } 258 } 259 260 profile := db.Profile{ 261 Did: did, 262 Description: description, 263 IncludeBluesky: includeBluesky, 264 Location: location, 265 Links: links, 266 Stats: stats, 267 PinnedRepos: pinned, 268 } 269 270 ddb, ok := d.Execer.(*db.DB) 271 if !ok { 272 return fmt.Errorf("failed to index profile record, invalid db cast") 273 } 274 275 tx, err := ddb.Begin() 276 if err != nil { 277 return fmt.Errorf("failed to start transaction") 278 } 279 280 err = db.ValidateProfile(tx, &profile) 281 if err != nil { 282 return fmt.Errorf("invalid profile record") 283 } 284 285 err = db.UpsertProfile(tx, &profile) 286 case models.CommitOperationDelete: 287 err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 288 } 289 290 if err != nil { 291 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 292 } 293 294 return nil 295} 296 297func ingestSpindleMember(d *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error { 298 did := e.Did 299 var err error 300 301 switch e.Commit.Operation { 302 case models.CommitOperationCreate: 303 raw := json.RawMessage(e.Commit.Record) 304 record := tangled.SpindleMember{} 305 err = json.Unmarshal(raw, &record) 306 if err != nil { 307 log.Printf("invalid record: %s", err) 308 return err 309 } 310 311 // only spindle owner can invite to spindles 312 ok, err := enforcer.IsSpindleInviteAllowed(did, record.Instance) 313 if err != nil || !ok { 314 return fmt.Errorf("failed to enforce permissions: %w", err) 315 } 316 317 err = enforcer.AddSpindleMember(record.Instance, record.Subject) 318 if err != nil { 319 return fmt.Errorf("failed to add member: %w", err) 320 } 321 } 322 323 return nil 324} 325 326func ingestSpindle(d *db.DbWrapper, e *models.Event, dev bool) error { 327 did := e.Did 328 var err error 329 330 switch e.Commit.Operation { 331 case models.CommitOperationCreate: 332 raw := json.RawMessage(e.Commit.Record) 333 record := tangled.Spindle{} 334 err = json.Unmarshal(raw, &record) 335 if err != nil { 336 log.Printf("invalid record: %s", err) 337 return err 338 } 339 340 // this is a special record whose rkey is the instance of the spindle itself 341 domain := e.Commit.RKey 342 343 owner, err := fetchOwner(context.TODO(), domain, true) 344 if err != nil { 345 log.Printf("failed to verify owner of %s: %w", domain, err) 346 return err 347 } 348 349 // verify that the spindle owner points back to this did 350 if owner != did { 351 log.Printf("incorrect owner for domain: %s, %s != %s", domain, owner, did) 352 return err 353 } 354 355 // mark this spindle as registered 356 } 357 358 return nil 359} 360 361func fetchOwner(ctx context.Context, domain string, dev bool) (string, error) { 362 scheme := "https" 363 if dev { 364 scheme = "http" 365 } 366 367 url := fmt.Sprintf("%s://%s/owner", scheme, domain) 368 req, err := http.NewRequest("GET", url, nil) 369 if err != nil { 370 return "", err 371 } 372 373 client := &http.Client{ 374 Timeout: 1 * time.Second, 375 } 376 377 resp, err := client.Do(req.WithContext(ctx)) 378 if err != nil || resp.StatusCode != 200 { 379 return "", errors.New("failed to fetch /owner") 380 } 381 382 body, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) // read atmost 1kb of data 383 if err != nil { 384 return "", fmt.Errorf("failed to read /owner response: %w", err) 385 } 386 387 did := strings.TrimSpace(string(body)) 388 if did == "" { 389 return "", errors.New("empty DID in /owner response") 390 } 391 392 return did, nil 393}