forked from tangled.org/core
this repo has no description
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/go-git/go-git/v5/plumbing" 13 "github.com/ipfs/go-cid" 14 "tangled.sh/tangled.sh/core/api/tangled" 15 "tangled.sh/tangled.sh/core/appview/db" 16) 17 18type Ingester func(ctx context.Context, e *models.Event) error 19 20func Ingest(d db.DbWrapper) Ingester { 21 return func(ctx context.Context, e *models.Event) error { 22 var err error 23 defer func() { 24 eventTime := e.TimeUS 25 lastTimeUs := eventTime + 1 26 if err := d.SaveLastTimeUs(lastTimeUs); err != nil { 27 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 28 } 29 }() 30 31 if e.Kind != models.EventKindCommit { 32 return nil 33 } 34 35 switch e.Commit.Collection { 36 case tangled.GraphFollowNSID: 37 ingestFollow(&d, e) 38 case tangled.FeedStarNSID: 39 ingestStar(&d, e) 40 case tangled.PublicKeyNSID: 41 ingestPublicKey(&d, e) 42 case tangled.RepoArtifactNSID: 43 ingestArtifact(&d, e) 44 case tangled.ActorProfileNSID: 45 ingestProfile(&d, e) 46 } 47 48 return err 49 } 50} 51 52func ingestStar(d *db.DbWrapper, e *models.Event) error { 53 var err error 54 did := e.Did 55 56 switch e.Commit.Operation { 57 case models.CommitOperationCreate, models.CommitOperationUpdate: 58 var subjectUri syntax.ATURI 59 60 raw := json.RawMessage(e.Commit.Record) 61 record := tangled.FeedStar{} 62 err := json.Unmarshal(raw, &record) 63 if err != nil { 64 log.Println("invalid record") 65 return err 66 } 67 68 subjectUri, err = syntax.ParseATURI(record.Subject) 69 if err != nil { 70 log.Println("invalid record") 71 return err 72 } 73 err = db.AddStar(d, did, subjectUri, e.Commit.RKey) 74 case models.CommitOperationDelete: 75 err = db.DeleteStarByRkey(d, did, e.Commit.RKey) 76 } 77 78 if err != nil { 79 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 80 } 81 82 return nil 83} 84 85func ingestFollow(d *db.DbWrapper, e *models.Event) error { 86 var err error 87 did := e.Did 88 89 switch e.Commit.Operation { 90 case models.CommitOperationCreate, models.CommitOperationUpdate: 91 raw := json.RawMessage(e.Commit.Record) 92 record := tangled.GraphFollow{} 93 err = json.Unmarshal(raw, &record) 94 if err != nil { 95 log.Println("invalid record") 96 return err 97 } 98 99 subjectDid := record.Subject 100 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey) 101 case models.CommitOperationDelete: 102 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey) 103 } 104 105 if err != nil { 106 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 107 } 108 109 return nil 110} 111 112func ingestPublicKey(d *db.DbWrapper, e *models.Event) error { 113 did := e.Did 114 var err error 115 116 switch e.Commit.Operation { 117 case models.CommitOperationCreate, models.CommitOperationUpdate: 118 log.Println("processing add of pubkey") 119 raw := json.RawMessage(e.Commit.Record) 120 record := tangled.PublicKey{} 121 err = json.Unmarshal(raw, &record) 122 if err != nil { 123 log.Printf("invalid record: %s", err) 124 return err 125 } 126 127 name := record.Name 128 key := record.Key 129 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey) 130 case models.CommitOperationDelete: 131 log.Println("processing delete of pubkey") 132 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey) 133 } 134 135 if err != nil { 136 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 137 } 138 139 return nil 140} 141 142func ingestArtifact(d *db.DbWrapper, e *models.Event) error { 143 did := e.Did 144 var err error 145 146 switch e.Commit.Operation { 147 case models.CommitOperationCreate, models.CommitOperationUpdate: 148 raw := json.RawMessage(e.Commit.Record) 149 record := tangled.RepoArtifact{} 150 err = json.Unmarshal(raw, &record) 151 if err != nil { 152 log.Printf("invalid record: %s", err) 153 return err 154 } 155 156 repoAt, err := syntax.ParseATURI(record.Repo) 157 if err != nil { 158 return err 159 } 160 161 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 162 if err != nil { 163 createdAt = time.Now() 164 } 165 166 artifact := db.Artifact{ 167 Did: did, 168 Rkey: e.Commit.RKey, 169 RepoAt: repoAt, 170 Tag: plumbing.Hash(record.Tag), 171 CreatedAt: createdAt, 172 BlobCid: cid.Cid(record.Artifact.Ref), 173 Name: record.Name, 174 Size: uint64(record.Artifact.Size), 175 MimeType: record.Artifact.MimeType, 176 } 177 178 err = db.AddArtifact(d, artifact) 179 case models.CommitOperationDelete: 180 err = db.DeleteArtifact(d, db.Filter("did", did), db.Filter("rkey", e.Commit.RKey)) 181 } 182 183 if err != nil { 184 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 185 } 186 187 return nil 188} 189 190func ingestProfile(d *db.DbWrapper, e *models.Event) error { 191 did := e.Did 192 var err error 193 194 if e.Commit.RKey != "self" { 195 return fmt.Errorf("ingestProfile only ingests `self` record") 196 } 197 198 switch e.Commit.Operation { 199 case models.CommitOperationCreate, models.CommitOperationUpdate: 200 raw := json.RawMessage(e.Commit.Record) 201 record := tangled.ActorProfile{} 202 err = json.Unmarshal(raw, &record) 203 if err != nil { 204 log.Printf("invalid record: %s", err) 205 return err 206 } 207 208 description := "" 209 if record.Description != nil { 210 description = *record.Description 211 } 212 213 includeBluesky := false 214 if record.Bluesky != nil { 215 includeBluesky = *record.Bluesky 216 } 217 218 location := "" 219 if record.Location != nil { 220 location = *record.Location 221 } 222 223 var links [5]string 224 for i, l := range record.Links { 225 if i < 5 { 226 links[i] = l 227 } 228 } 229 230 var stats [2]db.VanityStat 231 for i, s := range record.Stats { 232 if i < 2 { 233 stats[i].Kind = db.VanityStatKind(s) 234 } 235 } 236 237 var pinned [6]syntax.ATURI 238 for i, r := range record.PinnedRepositories { 239 if i < 6 { 240 pinned[i] = syntax.ATURI(r) 241 } 242 } 243 244 profile := db.Profile{ 245 Did: did, 246 Description: description, 247 IncludeBluesky: includeBluesky, 248 Location: location, 249 Links: links, 250 Stats: stats, 251 PinnedRepos: pinned, 252 } 253 254 ddb, ok := d.Execer.(*db.DB) 255 if !ok { 256 return fmt.Errorf("failed to index profile record, invalid db cast") 257 } 258 259 tx, err := ddb.Begin() 260 if err != nil { 261 return fmt.Errorf("failed to start transaction") 262 } 263 264 err = db.ValidateProfile(tx, &profile) 265 if err != nil { 266 return fmt.Errorf("invalid profile record") 267 } 268 269 err = db.UpsertProfile(tx, &profile) 270 case models.CommitOperationDelete: 271 err = db.DeleteArtifact(d, db.Filter("did", did), db.Filter("rkey", e.Commit.RKey)) 272 } 273 274 if err != nil { 275 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 276 } 277 278 return nil 279}