forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log"
10 "net/http"
11 "strings"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/jetstream/pkg/models"
16 "github.com/go-git/go-git/v5/plumbing"
17 "github.com/ipfs/go-cid"
18 "tangled.sh/tangled.sh/core/api/tangled"
19 "tangled.sh/tangled.sh/core/appview/db"
20 "tangled.sh/tangled.sh/core/rbac"
21)
22
23type Ingester func(ctx context.Context, e *models.Event) error
24
25func Ingest(d db.DbWrapper, enforcer *rbac.Enforcer) Ingester {
26 return func(ctx context.Context, e *models.Event) error {
27 var err error
28 defer func() {
29 eventTime := e.TimeUS
30 lastTimeUs := eventTime + 1
31 if err := d.SaveLastTimeUs(lastTimeUs); err != nil {
32 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
33 }
34 }()
35
36 if e.Kind != models.EventKindCommit {
37 return nil
38 }
39
40 switch e.Commit.Collection {
41 case tangled.GraphFollowNSID:
42 ingestFollow(&d, e)
43 case tangled.FeedStarNSID:
44 ingestStar(&d, e)
45 case tangled.PublicKeyNSID:
46 ingestPublicKey(&d, e)
47 case tangled.RepoArtifactNSID:
48 ingestArtifact(&d, e, enforcer)
49 case tangled.ActorProfileNSID:
50 ingestProfile(&d, e)
51 case tangled.SpindleMemberNSID:
52 ingestSpindleMember(&d, e, enforcer)
53 case tangled.SpindleNSID:
54 ingestSpindle(&d, e, true) // TODO: change this to dynamic
55 }
56
57 return err
58 }
59}
60
61func ingestStar(d *db.DbWrapper, e *models.Event) error {
62 var err error
63 did := e.Did
64
65 switch e.Commit.Operation {
66 case models.CommitOperationCreate, models.CommitOperationUpdate:
67 var subjectUri syntax.ATURI
68
69 raw := json.RawMessage(e.Commit.Record)
70 record := tangled.FeedStar{}
71 err := json.Unmarshal(raw, &record)
72 if err != nil {
73 log.Println("invalid record")
74 return err
75 }
76
77 subjectUri, err = syntax.ParseATURI(record.Subject)
78 if err != nil {
79 log.Println("invalid record")
80 return err
81 }
82 err = db.AddStar(d, did, subjectUri, e.Commit.RKey)
83 case models.CommitOperationDelete:
84 err = db.DeleteStarByRkey(d, did, e.Commit.RKey)
85 }
86
87 if err != nil {
88 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
89 }
90
91 return nil
92}
93
94func ingestFollow(d *db.DbWrapper, e *models.Event) error {
95 var err error
96 did := e.Did
97
98 switch e.Commit.Operation {
99 case models.CommitOperationCreate, models.CommitOperationUpdate:
100 raw := json.RawMessage(e.Commit.Record)
101 record := tangled.GraphFollow{}
102 err = json.Unmarshal(raw, &record)
103 if err != nil {
104 log.Println("invalid record")
105 return err
106 }
107
108 subjectDid := record.Subject
109 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey)
110 case models.CommitOperationDelete:
111 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey)
112 }
113
114 if err != nil {
115 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
116 }
117
118 return nil
119}
120
121func ingestPublicKey(d *db.DbWrapper, e *models.Event) error {
122 did := e.Did
123 var err error
124
125 switch e.Commit.Operation {
126 case models.CommitOperationCreate, models.CommitOperationUpdate:
127 log.Println("processing add of pubkey")
128 raw := json.RawMessage(e.Commit.Record)
129 record := tangled.PublicKey{}
130 err = json.Unmarshal(raw, &record)
131 if err != nil {
132 log.Printf("invalid record: %s", err)
133 return err
134 }
135
136 name := record.Name
137 key := record.Key
138 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey)
139 case models.CommitOperationDelete:
140 log.Println("processing delete of pubkey")
141 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey)
142 }
143
144 if err != nil {
145 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
146 }
147
148 return nil
149}
150
151func ingestArtifact(d *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error {
152 did := e.Did
153 var err error
154
155 switch e.Commit.Operation {
156 case models.CommitOperationCreate, models.CommitOperationUpdate:
157 raw := json.RawMessage(e.Commit.Record)
158 record := tangled.RepoArtifact{}
159 err = json.Unmarshal(raw, &record)
160 if err != nil {
161 log.Printf("invalid record: %s", err)
162 return err
163 }
164
165 repoAt, err := syntax.ParseATURI(record.Repo)
166 if err != nil {
167 return err
168 }
169
170 repo, err := db.GetRepoByAtUri(d, repoAt.String())
171 if err != nil {
172 return err
173 }
174
175 ok, err := enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
176 if err != nil || !ok {
177 return err
178 }
179
180 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
181 if err != nil {
182 createdAt = time.Now()
183 }
184
185 artifact := db.Artifact{
186 Did: did,
187 Rkey: e.Commit.RKey,
188 RepoAt: repoAt,
189 Tag: plumbing.Hash(record.Tag),
190 CreatedAt: createdAt,
191 BlobCid: cid.Cid(record.Artifact.Ref),
192 Name: record.Name,
193 Size: uint64(record.Artifact.Size),
194 MimeType: record.Artifact.MimeType,
195 }
196
197 err = db.AddArtifact(d, artifact)
198 case models.CommitOperationDelete:
199 err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
200 }
201
202 if err != nil {
203 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
204 }
205
206 return nil
207}
208
209func ingestProfile(d *db.DbWrapper, e *models.Event) error {
210 did := e.Did
211 var err error
212
213 if e.Commit.RKey != "self" {
214 return fmt.Errorf("ingestProfile only ingests `self` record")
215 }
216
217 switch e.Commit.Operation {
218 case models.CommitOperationCreate, models.CommitOperationUpdate:
219 raw := json.RawMessage(e.Commit.Record)
220 record := tangled.ActorProfile{}
221 err = json.Unmarshal(raw, &record)
222 if err != nil {
223 log.Printf("invalid record: %s", err)
224 return err
225 }
226
227 description := ""
228 if record.Description != nil {
229 description = *record.Description
230 }
231
232 includeBluesky := record.Bluesky
233
234 location := ""
235 if record.Location != nil {
236 location = *record.Location
237 }
238
239 var links [5]string
240 for i, l := range record.Links {
241 if i < 5 {
242 links[i] = l
243 }
244 }
245
246 var stats [2]db.VanityStat
247 for i, s := range record.Stats {
248 if i < 2 {
249 stats[i].Kind = db.VanityStatKind(s)
250 }
251 }
252
253 var pinned [6]syntax.ATURI
254 for i, r := range record.PinnedRepositories {
255 if i < 6 {
256 pinned[i] = syntax.ATURI(r)
257 }
258 }
259
260 profile := db.Profile{
261 Did: did,
262 Description: description,
263 IncludeBluesky: includeBluesky,
264 Location: location,
265 Links: links,
266 Stats: stats,
267 PinnedRepos: pinned,
268 }
269
270 ddb, ok := d.Execer.(*db.DB)
271 if !ok {
272 return fmt.Errorf("failed to index profile record, invalid db cast")
273 }
274
275 tx, err := ddb.Begin()
276 if err != nil {
277 return fmt.Errorf("failed to start transaction")
278 }
279
280 err = db.ValidateProfile(tx, &profile)
281 if err != nil {
282 return fmt.Errorf("invalid profile record")
283 }
284
285 err = db.UpsertProfile(tx, &profile)
286 case models.CommitOperationDelete:
287 err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
288 }
289
290 if err != nil {
291 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
292 }
293
294 return nil
295}
296
297func ingestSpindleMember(d *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error {
298 did := e.Did
299 var err error
300
301 switch e.Commit.Operation {
302 case models.CommitOperationCreate:
303 raw := json.RawMessage(e.Commit.Record)
304 record := tangled.SpindleMember{}
305 err = json.Unmarshal(raw, &record)
306 if err != nil {
307 log.Printf("invalid record: %s", err)
308 return err
309 }
310
311 // only spindle owner can invite to spindles
312 ok, err := enforcer.IsSpindleInviteAllowed(did, record.Instance)
313 if err != nil || !ok {
314 return fmt.Errorf("failed to enforce permissions: %w", err)
315 }
316
317 err = enforcer.AddSpindleMember(record.Instance, record.Subject)
318 if err != nil {
319 return fmt.Errorf("failed to add member: %w", err)
320 }
321 }
322
323 return nil
324}
325
326func ingestSpindle(d *db.DbWrapper, e *models.Event, dev bool) error {
327 did := e.Did
328 var err error
329
330 switch e.Commit.Operation {
331 case models.CommitOperationCreate:
332 raw := json.RawMessage(e.Commit.Record)
333 record := tangled.Spindle{}
334 err = json.Unmarshal(raw, &record)
335 if err != nil {
336 log.Printf("invalid record: %s", err)
337 return err
338 }
339
340 // this is a special record whose rkey is the instance of the spindle itself
341 domain := e.Commit.RKey
342
343 owner, err := fetchOwner(context.TODO(), domain, true)
344 if err != nil {
345 log.Printf("failed to verify owner of %s: %w", domain, err)
346 return err
347 }
348
349 // verify that the spindle owner points back to this did
350 if owner != did {
351 log.Printf("incorrect owner for domain: %s, %s != %s", domain, owner, did)
352 return err
353 }
354
355 // mark this spindle as registered
356 }
357
358 return nil
359}
360
361func fetchOwner(ctx context.Context, domain string, dev bool) (string, error) {
362 scheme := "https"
363 if dev {
364 scheme = "http"
365 }
366
367 url := fmt.Sprintf("%s://%s/owner", scheme, domain)
368 req, err := http.NewRequest("GET", url, nil)
369 if err != nil {
370 return "", err
371 }
372
373 client := &http.Client{
374 Timeout: 1 * time.Second,
375 }
376
377 resp, err := client.Do(req.WithContext(ctx))
378 if err != nil || resp.StatusCode != 200 {
379 return "", errors.New("failed to fetch /owner")
380 }
381
382 body, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) // read atmost 1kb of data
383 if err != nil {
384 return "", fmt.Errorf("failed to read /owner response: %w", err)
385 }
386
387 did := strings.TrimSpace(string(body))
388 if did == "" {
389 return "", errors.New("empty DID in /owner response")
390 }
391
392 return did, nil
393}