forked from tangled.org/core
this repo has no description
at opengraph 6.7 kB view raw
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/go-git/go-git/v5/plumbing" 13 "github.com/ipfs/go-cid" 14 "tangled.sh/tangled.sh/core/api/tangled" 15 "tangled.sh/tangled.sh/core/appview/db" 16 "tangled.sh/tangled.sh/core/rbac" 17) 18 19type Ingester func(ctx context.Context, e *models.Event) error 20 21func Ingest(d db.DbWrapper, enforcer *rbac.Enforcer) Ingester { 22 return func(ctx context.Context, e *models.Event) error { 23 var err error 24 defer func() { 25 eventTime := e.TimeUS 26 lastTimeUs := eventTime + 1 27 if err := d.SaveLastTimeUs(lastTimeUs); err != nil { 28 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 29 } 30 }() 31 32 if e.Kind != models.EventKindCommit { 33 return nil 34 } 35 36 switch e.Commit.Collection { 37 case tangled.GraphFollowNSID: 38 ingestFollow(&d, e) 39 case tangled.FeedStarNSID: 40 ingestStar(&d, e) 41 case tangled.PublicKeyNSID: 42 ingestPublicKey(&d, e) 43 case tangled.RepoArtifactNSID: 44 ingestArtifact(&d, e, enforcer) 45 case tangled.ActorProfileNSID: 46 ingestProfile(&d, e) 47 } 48 49 return err 50 } 51} 52 53func ingestStar(d *db.DbWrapper, e *models.Event) error { 54 var err error 55 did := e.Did 56 57 switch e.Commit.Operation { 58 case models.CommitOperationCreate, models.CommitOperationUpdate: 59 var subjectUri syntax.ATURI 60 61 raw := json.RawMessage(e.Commit.Record) 62 record := tangled.FeedStar{} 63 err := json.Unmarshal(raw, &record) 64 if err != nil { 65 log.Println("invalid record") 66 return err 67 } 68 69 subjectUri, err = syntax.ParseATURI(record.Subject) 70 if err != nil { 71 log.Println("invalid record") 72 return err 73 } 74 err = db.AddStar(d, did, subjectUri, e.Commit.RKey) 75 case models.CommitOperationDelete: 76 err = db.DeleteStarByRkey(d, did, e.Commit.RKey) 77 } 78 79 if err != nil { 80 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 81 } 82 83 return nil 84} 85 86func ingestFollow(d *db.DbWrapper, e *models.Event) error { 87 var err error 88 did := e.Did 89 90 switch e.Commit.Operation { 91 case models.CommitOperationCreate, models.CommitOperationUpdate: 92 raw := json.RawMessage(e.Commit.Record) 93 record := tangled.GraphFollow{} 94 err = json.Unmarshal(raw, &record) 95 if err != nil { 96 log.Println("invalid record") 97 return err 98 } 99 100 subjectDid := record.Subject 101 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey) 102 case models.CommitOperationDelete: 103 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey) 104 } 105 106 if err != nil { 107 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 108 } 109 110 return nil 111} 112 113func ingestPublicKey(d *db.DbWrapper, e *models.Event) error { 114 did := e.Did 115 var err error 116 117 switch e.Commit.Operation { 118 case models.CommitOperationCreate, models.CommitOperationUpdate: 119 log.Println("processing add of pubkey") 120 raw := json.RawMessage(e.Commit.Record) 121 record := tangled.PublicKey{} 122 err = json.Unmarshal(raw, &record) 123 if err != nil { 124 log.Printf("invalid record: %s", err) 125 return err 126 } 127 128 name := record.Name 129 key := record.Key 130 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey) 131 case models.CommitOperationDelete: 132 log.Println("processing delete of pubkey") 133 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey) 134 } 135 136 if err != nil { 137 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 138 } 139 140 return nil 141} 142 143func ingestArtifact(d *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error { 144 did := e.Did 145 var err error 146 147 switch e.Commit.Operation { 148 case models.CommitOperationCreate, models.CommitOperationUpdate: 149 raw := json.RawMessage(e.Commit.Record) 150 record := tangled.RepoArtifact{} 151 err = json.Unmarshal(raw, &record) 152 if err != nil { 153 log.Printf("invalid record: %s", err) 154 return err 155 } 156 157 repoAt, err := syntax.ParseATURI(record.Repo) 158 if err != nil { 159 return err 160 } 161 162 repo, err := db.GetRepoByAtUri(d, repoAt.String()) 163 if err != nil { 164 return err 165 } 166 167 ok, err := enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 168 if err != nil || !ok { 169 return err 170 } 171 172 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 173 if err != nil { 174 createdAt = time.Now() 175 } 176 177 artifact := db.Artifact{ 178 Did: did, 179 Rkey: e.Commit.RKey, 180 RepoAt: repoAt, 181 Tag: plumbing.Hash(record.Tag), 182 CreatedAt: createdAt, 183 BlobCid: cid.Cid(record.Artifact.Ref), 184 Name: record.Name, 185 Size: uint64(record.Artifact.Size), 186 MimeType: record.Artifact.MimeType, 187 } 188 189 err = db.AddArtifact(d, artifact) 190 case models.CommitOperationDelete: 191 err = db.DeleteArtifact(d, db.Filter("did", did), db.Filter("rkey", e.Commit.RKey)) 192 } 193 194 if err != nil { 195 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 196 } 197 198 return nil 199} 200 201func ingestProfile(d *db.DbWrapper, e *models.Event) error { 202 did := e.Did 203 var err error 204 205 if e.Commit.RKey != "self" { 206 return fmt.Errorf("ingestProfile only ingests `self` record") 207 } 208 209 switch e.Commit.Operation { 210 case models.CommitOperationCreate, models.CommitOperationUpdate: 211 raw := json.RawMessage(e.Commit.Record) 212 record := tangled.ActorProfile{} 213 err = json.Unmarshal(raw, &record) 214 if err != nil { 215 log.Printf("invalid record: %s", err) 216 return err 217 } 218 219 description := "" 220 if record.Description != nil { 221 description = *record.Description 222 } 223 224 includeBluesky := record.Bluesky 225 226 location := "" 227 if record.Location != nil { 228 location = *record.Location 229 } 230 231 var links [5]string 232 for i, l := range record.Links { 233 if i < 5 { 234 links[i] = l 235 } 236 } 237 238 var stats [2]db.VanityStat 239 for i, s := range record.Stats { 240 if i < 2 { 241 stats[i].Kind = db.VanityStatKind(s) 242 } 243 } 244 245 var pinned [6]syntax.ATURI 246 for i, r := range record.PinnedRepositories { 247 if i < 6 { 248 pinned[i] = syntax.ATURI(r) 249 } 250 } 251 252 profile := db.Profile{ 253 Did: did, 254 Description: description, 255 IncludeBluesky: includeBluesky, 256 Location: location, 257 Links: links, 258 Stats: stats, 259 PinnedRepos: pinned, 260 } 261 262 ddb, ok := d.Execer.(*db.DB) 263 if !ok { 264 return fmt.Errorf("failed to index profile record, invalid db cast") 265 } 266 267 tx, err := ddb.Begin() 268 if err != nil { 269 return fmt.Errorf("failed to start transaction") 270 } 271 272 err = db.ValidateProfile(tx, &profile) 273 if err != nil { 274 return fmt.Errorf("invalid profile record") 275 } 276 277 err = db.UpsertProfile(tx, &profile) 278 case models.CommitOperationDelete: 279 err = db.DeleteArtifact(d, db.Filter("did", did), db.Filter("rkey", e.Commit.RKey)) 280 } 281 282 if err != nil { 283 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 284 } 285 286 return nil 287}