1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "github.com/bluesky-social/jetstream/pkg/models"
13 "github.com/go-git/go-git/v5/plumbing"
14 "github.com/ipfs/go-cid"
15 "tangled.sh/tangled.sh/core/api/tangled"
16 "tangled.sh/tangled.sh/core/appview/db"
17 "tangled.sh/tangled.sh/core/knotclient"
18 "tangled.sh/tangled.sh/core/rbac"
19)
20
21type Ingester func(ctx context.Context, e *models.Event) error
22
23func Ingest(d db.DbWrapper, enforcer *rbac.Enforcer, dev bool) Ingester {
24 return func(ctx context.Context, e *models.Event) error {
25 var err error
26 defer func() {
27 eventTime := e.TimeUS
28 lastTimeUs := eventTime + 1
29 if err := d.SaveLastTimeUs(lastTimeUs); err != nil {
30 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
31 }
32 }()
33
34 if e.Kind != models.EventKindCommit {
35 return nil
36 }
37
38 switch e.Commit.Collection {
39 case tangled.GraphFollowNSID:
40 ingestFollow(&d, e)
41 case tangled.FeedStarNSID:
42 ingestStar(&d, e)
43 case tangled.PublicKeyNSID:
44 ingestPublicKey(&d, e)
45 case tangled.RepoArtifactNSID:
46 ingestArtifact(&d, e, enforcer)
47 case tangled.ActorProfileNSID:
48 ingestProfile(&d, e)
49 case tangled.KnotNSID:
50 ingestKnot(&d, e, dev)
51 }
52
53 return err
54 }
55}
56
57func ingestStar(d *db.DbWrapper, e *models.Event) error {
58 var err error
59 did := e.Did
60
61 switch e.Commit.Operation {
62 case models.CommitOperationCreate, models.CommitOperationUpdate:
63 var subjectUri syntax.ATURI
64
65 raw := json.RawMessage(e.Commit.Record)
66 record := tangled.FeedStar{}
67 err := json.Unmarshal(raw, &record)
68 if err != nil {
69 log.Println("invalid record")
70 return err
71 }
72
73 subjectUri, err = syntax.ParseATURI(record.Subject)
74 if err != nil {
75 log.Println("invalid record")
76 return err
77 }
78 err = db.AddStar(d, did, subjectUri, e.Commit.RKey)
79 case models.CommitOperationDelete:
80 err = db.DeleteStarByRkey(d, did, e.Commit.RKey)
81 }
82
83 if err != nil {
84 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
85 }
86
87 return nil
88}
89
90func ingestFollow(d *db.DbWrapper, e *models.Event) error {
91 var err error
92 did := e.Did
93
94 switch e.Commit.Operation {
95 case models.CommitOperationCreate, models.CommitOperationUpdate:
96 raw := json.RawMessage(e.Commit.Record)
97 record := tangled.GraphFollow{}
98 err = json.Unmarshal(raw, &record)
99 if err != nil {
100 log.Println("invalid record")
101 return err
102 }
103
104 subjectDid := record.Subject
105 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey)
106 case models.CommitOperationDelete:
107 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey)
108 }
109
110 if err != nil {
111 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
112 }
113
114 return nil
115}
116
117func ingestPublicKey(d *db.DbWrapper, e *models.Event) error {
118 did := e.Did
119 var err error
120
121 switch e.Commit.Operation {
122 case models.CommitOperationCreate, models.CommitOperationUpdate:
123 log.Println("processing add of pubkey")
124 raw := json.RawMessage(e.Commit.Record)
125 record := tangled.PublicKey{}
126 err = json.Unmarshal(raw, &record)
127 if err != nil {
128 log.Printf("invalid record: %s", err)
129 return err
130 }
131
132 name := record.Name
133 key := record.Key
134 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey)
135 case models.CommitOperationDelete:
136 log.Println("processing delete of pubkey")
137 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey)
138 }
139
140 if err != nil {
141 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
142 }
143
144 return nil
145}
146
147func ingestArtifact(d *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error {
148 did := e.Did
149 var err error
150
151 switch e.Commit.Operation {
152 case models.CommitOperationCreate, models.CommitOperationUpdate:
153 raw := json.RawMessage(e.Commit.Record)
154 record := tangled.RepoArtifact{}
155 err = json.Unmarshal(raw, &record)
156 if err != nil {
157 log.Printf("invalid record: %s", err)
158 return err
159 }
160
161 repoAt, err := syntax.ParseATURI(record.Repo)
162 if err != nil {
163 return err
164 }
165
166 repo, err := db.GetRepoByAtUri(d, repoAt.String())
167 if err != nil {
168 return err
169 }
170
171 ok, err := enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
172 if err != nil || !ok {
173 return err
174 }
175
176 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
177 if err != nil {
178 createdAt = time.Now()
179 }
180
181 artifact := db.Artifact{
182 Did: did,
183 Rkey: e.Commit.RKey,
184 RepoAt: repoAt,
185 Tag: plumbing.Hash(record.Tag),
186 CreatedAt: createdAt,
187 BlobCid: cid.Cid(record.Artifact.Ref),
188 Name: record.Name,
189 Size: uint64(record.Artifact.Size),
190 MimeType: record.Artifact.MimeType,
191 }
192
193 err = db.AddArtifact(d, artifact)
194 case models.CommitOperationDelete:
195 err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
196 }
197
198 if err != nil {
199 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
200 }
201
202 return nil
203}
204
205func ingestProfile(d *db.DbWrapper, e *models.Event) error {
206 did := e.Did
207 var err error
208
209 if e.Commit.RKey != "self" {
210 return fmt.Errorf("ingestProfile only ingests `self` record")
211 }
212
213 switch e.Commit.Operation {
214 case models.CommitOperationCreate, models.CommitOperationUpdate:
215 raw := json.RawMessage(e.Commit.Record)
216 record := tangled.ActorProfile{}
217 err = json.Unmarshal(raw, &record)
218 if err != nil {
219 log.Printf("invalid record: %s", err)
220 return err
221 }
222
223 description := ""
224 if record.Description != nil {
225 description = *record.Description
226 }
227
228 includeBluesky := record.Bluesky
229
230 location := ""
231 if record.Location != nil {
232 location = *record.Location
233 }
234
235 var links [5]string
236 for i, l := range record.Links {
237 if i < 5 {
238 links[i] = l
239 }
240 }
241
242 var stats [2]db.VanityStat
243 for i, s := range record.Stats {
244 if i < 2 {
245 stats[i].Kind = db.VanityStatKind(s)
246 }
247 }
248
249 var pinned [6]syntax.ATURI
250 for i, r := range record.PinnedRepositories {
251 if i < 6 {
252 pinned[i] = syntax.ATURI(r)
253 }
254 }
255
256 profile := db.Profile{
257 Did: did,
258 Description: description,
259 IncludeBluesky: includeBluesky,
260 Location: location,
261 Links: links,
262 Stats: stats,
263 PinnedRepos: pinned,
264 }
265
266 ddb, ok := d.Execer.(*db.DB)
267 if !ok {
268 return fmt.Errorf("failed to index profile record, invalid db cast")
269 }
270
271 tx, err := ddb.Begin()
272 if err != nil {
273 return fmt.Errorf("failed to start transaction")
274 }
275
276 err = db.ValidateProfile(tx, &profile)
277 if err != nil {
278 return fmt.Errorf("invalid profile record")
279 }
280
281 err = db.UpsertProfile(tx, &profile)
282 case models.CommitOperationDelete:
283 err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
284 }
285
286 if err != nil {
287 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
288 }
289
290 return nil
291}
292
293func ingestKnot(d *db.DbWrapper, e *models.Event, dev bool) error {
294 did := e.Did
295 var err error
296
297 switch e.Commit.Operation {
298 case models.CommitOperationCreate:
299 log.Println("processing knot creation")
300 raw := json.RawMessage(e.Commit.Record)
301 record := tangled.Knot{}
302 err = json.Unmarshal(raw, &record)
303 if err != nil {
304 log.Printf("invalid record: %s", err)
305 return err
306 }
307
308 host := record.Host
309
310 if strings.HasPrefix(host, "localhost") && !dev {
311 // localhost knots are not ingested except in dev mode
312 return fmt.Errorf("localhost knots not registered this appview: %s", host)
313 }
314
315 // two-way confirmation that this knot is owned by this did
316 us, err := knotclient.NewUnsignedClient(host, dev)
317 if err != nil {
318 return err
319 }
320
321 resp, err := us.Owner()
322 if err != nil {
323 return err
324 }
325
326 if resp.OwnerDid != did {
327 return fmt.Errorf("incorrect owner reported from knot %s: wanted: %s, got: %s", host, resp.OwnerDid, did)
328 }
329
330 err = db.RegisterV2(d, host, resp.OwnerDid)
331 default:
332 log.Println("this operation is not yet handled", e.Commit.Operation)
333 }
334
335 if err != nil {
336 return fmt.Errorf("failed to %s knot record: %w", e.Commit.Operation, err)
337 }
338
339 return nil
340}