forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "github.com/bluesky-social/jetstream/pkg/models" 13 "github.com/go-git/go-git/v5/plumbing" 14 "github.com/ipfs/go-cid" 15 "tangled.sh/tangled.sh/core/api/tangled" 16 "tangled.sh/tangled.sh/core/appview/db" 17 "tangled.sh/tangled.sh/core/knotclient" 18 "tangled.sh/tangled.sh/core/rbac" 19) 20 21type Ingester func(ctx context.Context, e *models.Event) error 22 23func Ingest(d db.DbWrapper, enforcer *rbac.Enforcer, dev bool) Ingester { 24 return func(ctx context.Context, e *models.Event) error { 25 var err error 26 defer func() { 27 eventTime := e.TimeUS 28 lastTimeUs := eventTime + 1 29 if err := d.SaveLastTimeUs(lastTimeUs); err != nil { 30 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 31 } 32 }() 33 34 if e.Kind != models.EventKindCommit { 35 return nil 36 } 37 38 switch e.Commit.Collection { 39 case tangled.GraphFollowNSID: 40 ingestFollow(&d, e) 41 case tangled.FeedStarNSID: 42 ingestStar(&d, e) 43 case tangled.PublicKeyNSID: 44 ingestPublicKey(&d, e) 45 case tangled.RepoArtifactNSID: 46 ingestArtifact(&d, e, enforcer) 47 case tangled.ActorProfileNSID: 48 ingestProfile(&d, e) 49 case tangled.KnotNSID: 50 ingestKnot(&d, e, dev) 51 } 52 53 return err 54 } 55} 56 57func ingestStar(d *db.DbWrapper, e *models.Event) error { 58 var err error 59 did := e.Did 60 61 switch e.Commit.Operation { 62 case models.CommitOperationCreate, models.CommitOperationUpdate: 63 var subjectUri syntax.ATURI 64 65 raw := json.RawMessage(e.Commit.Record) 66 record := tangled.FeedStar{} 67 err := json.Unmarshal(raw, &record) 68 if err != nil { 69 log.Println("invalid record") 70 return err 71 } 72 73 subjectUri, err = syntax.ParseATURI(record.Subject) 74 if err != nil { 75 log.Println("invalid record") 76 return err 77 } 78 err = db.AddStar(d, did, subjectUri, e.Commit.RKey) 79 case models.CommitOperationDelete: 80 err = db.DeleteStarByRkey(d, did, e.Commit.RKey) 81 } 82 83 if err != nil { 84 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 85 } 86 87 return nil 88} 89 90func ingestFollow(d *db.DbWrapper, e *models.Event) error { 91 var err error 92 did := e.Did 93 94 switch e.Commit.Operation { 95 case models.CommitOperationCreate, models.CommitOperationUpdate: 96 raw := json.RawMessage(e.Commit.Record) 97 record := tangled.GraphFollow{} 98 err = json.Unmarshal(raw, &record) 99 if err != nil { 100 log.Println("invalid record") 101 return err 102 } 103 104 subjectDid := record.Subject 105 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey) 106 case models.CommitOperationDelete: 107 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey) 108 } 109 110 if err != nil { 111 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 112 } 113 114 return nil 115} 116 117func ingestPublicKey(d *db.DbWrapper, e *models.Event) error { 118 did := e.Did 119 var err error 120 121 switch e.Commit.Operation { 122 case models.CommitOperationCreate, models.CommitOperationUpdate: 123 log.Println("processing add of pubkey") 124 raw := json.RawMessage(e.Commit.Record) 125 record := tangled.PublicKey{} 126 err = json.Unmarshal(raw, &record) 127 if err != nil { 128 log.Printf("invalid record: %s", err) 129 return err 130 } 131 132 name := record.Name 133 key := record.Key 134 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey) 135 case models.CommitOperationDelete: 136 log.Println("processing delete of pubkey") 137 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey) 138 } 139 140 if err != nil { 141 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 142 } 143 144 return nil 145} 146 147func ingestArtifact(d *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error { 148 did := e.Did 149 var err error 150 151 switch e.Commit.Operation { 152 case models.CommitOperationCreate, models.CommitOperationUpdate: 153 raw := json.RawMessage(e.Commit.Record) 154 record := tangled.RepoArtifact{} 155 err = json.Unmarshal(raw, &record) 156 if err != nil { 157 log.Printf("invalid record: %s", err) 158 return err 159 } 160 161 repoAt, err := syntax.ParseATURI(record.Repo) 162 if err != nil { 163 return err 164 } 165 166 repo, err := db.GetRepoByAtUri(d, repoAt.String()) 167 if err != nil { 168 return err 169 } 170 171 ok, err := enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 172 if err != nil || !ok { 173 return err 174 } 175 176 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 177 if err != nil { 178 createdAt = time.Now() 179 } 180 181 artifact := db.Artifact{ 182 Did: did, 183 Rkey: e.Commit.RKey, 184 RepoAt: repoAt, 185 Tag: plumbing.Hash(record.Tag), 186 CreatedAt: createdAt, 187 BlobCid: cid.Cid(record.Artifact.Ref), 188 Name: record.Name, 189 Size: uint64(record.Artifact.Size), 190 MimeType: record.Artifact.MimeType, 191 } 192 193 err = db.AddArtifact(d, artifact) 194 case models.CommitOperationDelete: 195 err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 196 } 197 198 if err != nil { 199 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 200 } 201 202 return nil 203} 204 205func ingestProfile(d *db.DbWrapper, e *models.Event) error { 206 did := e.Did 207 var err error 208 209 if e.Commit.RKey != "self" { 210 return fmt.Errorf("ingestProfile only ingests `self` record") 211 } 212 213 switch e.Commit.Operation { 214 case models.CommitOperationCreate, models.CommitOperationUpdate: 215 raw := json.RawMessage(e.Commit.Record) 216 record := tangled.ActorProfile{} 217 err = json.Unmarshal(raw, &record) 218 if err != nil { 219 log.Printf("invalid record: %s", err) 220 return err 221 } 222 223 description := "" 224 if record.Description != nil { 225 description = *record.Description 226 } 227 228 includeBluesky := record.Bluesky 229 230 location := "" 231 if record.Location != nil { 232 location = *record.Location 233 } 234 235 var links [5]string 236 for i, l := range record.Links { 237 if i < 5 { 238 links[i] = l 239 } 240 } 241 242 var stats [2]db.VanityStat 243 for i, s := range record.Stats { 244 if i < 2 { 245 stats[i].Kind = db.VanityStatKind(s) 246 } 247 } 248 249 var pinned [6]syntax.ATURI 250 for i, r := range record.PinnedRepositories { 251 if i < 6 { 252 pinned[i] = syntax.ATURI(r) 253 } 254 } 255 256 profile := db.Profile{ 257 Did: did, 258 Description: description, 259 IncludeBluesky: includeBluesky, 260 Location: location, 261 Links: links, 262 Stats: stats, 263 PinnedRepos: pinned, 264 } 265 266 ddb, ok := d.Execer.(*db.DB) 267 if !ok { 268 return fmt.Errorf("failed to index profile record, invalid db cast") 269 } 270 271 tx, err := ddb.Begin() 272 if err != nil { 273 return fmt.Errorf("failed to start transaction") 274 } 275 276 err = db.ValidateProfile(tx, &profile) 277 if err != nil { 278 return fmt.Errorf("invalid profile record") 279 } 280 281 err = db.UpsertProfile(tx, &profile) 282 case models.CommitOperationDelete: 283 err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 284 } 285 286 if err != nil { 287 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 288 } 289 290 return nil 291} 292 293func ingestKnot(d *db.DbWrapper, e *models.Event, dev bool) error { 294 did := e.Did 295 var err error 296 297 switch e.Commit.Operation { 298 case models.CommitOperationCreate: 299 log.Println("processing knot creation") 300 raw := json.RawMessage(e.Commit.Record) 301 record := tangled.Knot{} 302 err = json.Unmarshal(raw, &record) 303 if err != nil { 304 log.Printf("invalid record: %s", err) 305 return err 306 } 307 308 host := record.Host 309 310 if strings.HasPrefix(host, "localhost") && !dev { 311 // localhost knots are not ingested except in dev mode 312 return fmt.Errorf("localhost knots not registered this appview: %s", host) 313 } 314 315 // two-way confirmation that this knot is owned by this did 316 us, err := knotclient.NewUnsignedClient(host, dev) 317 if err != nil { 318 return err 319 } 320 321 resp, err := us.Owner() 322 if err != nil { 323 return err 324 } 325 326 if resp.OwnerDid != did { 327 return fmt.Errorf("incorrect owner reported from knot %s: wanted: %s, got: %s", host, resp.OwnerDid, did) 328 } 329 330 err = db.RegisterV2(d, host, resp.OwnerDid) 331 default: 332 log.Println("this operation is not yet handled", e.Commit.Operation) 333 } 334 335 if err != nil { 336 return fmt.Errorf("failed to %s knot record: %w", e.Commit.Operation, err) 337 } 338 339 return nil 340}