1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/config"
16 "tangled.sh/tangled.sh/core/appview/db"
17 "tangled.sh/tangled.sh/core/appview/idresolver"
18 "tangled.sh/tangled.sh/core/appview/spindleverify"
19 "tangled.sh/tangled.sh/core/rbac"
20)
21
22type Ingester struct {
23 Db db.DbWrapper
24 Enforcer *rbac.Enforcer
25 IdResolver *idresolver.Resolver
26 Config *config.Config
27 Logger *slog.Logger
28}
29
30type processFunc func(ctx context.Context, e *models.Event) error
31
32func (i *Ingester) Ingest() processFunc {
33 return func(ctx context.Context, e *models.Event) error {
34 var err error
35 defer func() {
36 eventTime := e.TimeUS
37 lastTimeUs := eventTime + 1
38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
40 }
41 }()
42
43 l := i.Logger.With("kind", e.Kind)
44 switch e.Kind {
45 case models.EventKindAccount:
46 if !e.Account.Active && *e.Account.Status == "deactivated" {
47 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
48 }
49 case models.EventKindIdentity:
50 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
51 case models.EventKindCommit:
52 switch e.Commit.Collection {
53 case tangled.GraphFollowNSID:
54 err = i.ingestFollow(e)
55 case tangled.FeedStarNSID:
56 err = i.ingestStar(e)
57 case tangled.PublicKeyNSID:
58 err = i.ingestPublicKey(e)
59 case tangled.RepoArtifactNSID:
60 err = i.ingestArtifact(e)
61 case tangled.ActorProfileNSID:
62 err = i.ingestProfile(e)
63 case tangled.SpindleMemberNSID:
64 err = i.ingestSpindleMember(e)
65 case tangled.SpindleNSID:
66 err = i.ingestSpindle(e)
67 }
68 l = i.Logger.With("nsid", e.Commit.Collection)
69 }
70
71 if err != nil {
72 l.Error("error ingesting record", "err", err)
73 }
74
75 return err
76 }
77}
78
79func (i *Ingester) ingestStar(e *models.Event) error {
80 var err error
81 did := e.Did
82
83 l := i.Logger.With("handler", "ingestStar")
84 l = l.With("nsid", e.Commit.Collection)
85
86 switch e.Commit.Operation {
87 case models.CommitOperationCreate, models.CommitOperationUpdate:
88 var subjectUri syntax.ATURI
89
90 raw := json.RawMessage(e.Commit.Record)
91 record := tangled.FeedStar{}
92 err := json.Unmarshal(raw, &record)
93 if err != nil {
94 l.Error("invalid record", "err", err)
95 return err
96 }
97
98 subjectUri, err = syntax.ParseATURI(record.Subject)
99 if err != nil {
100 l.Error("invalid record", "err", err)
101 return err
102 }
103 err = db.AddStar(i.Db, did, subjectUri, e.Commit.RKey)
104 case models.CommitOperationDelete:
105 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
106 }
107
108 if err != nil {
109 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
110 }
111
112 return nil
113}
114
115func (i *Ingester) ingestFollow(e *models.Event) error {
116 var err error
117 did := e.Did
118
119 l := i.Logger.With("handler", "ingestFollow")
120 l = l.With("nsid", e.Commit.Collection)
121
122 switch e.Commit.Operation {
123 case models.CommitOperationCreate, models.CommitOperationUpdate:
124 raw := json.RawMessage(e.Commit.Record)
125 record := tangled.GraphFollow{}
126 err = json.Unmarshal(raw, &record)
127 if err != nil {
128 l.Error("invalid record", "err", err)
129 return err
130 }
131
132 subjectDid := record.Subject
133 err = db.AddFollow(i.Db, did, subjectDid, e.Commit.RKey)
134 case models.CommitOperationDelete:
135 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
136 }
137
138 if err != nil {
139 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
140 }
141
142 return nil
143}
144
145func (i *Ingester) ingestPublicKey(e *models.Event) error {
146 did := e.Did
147 var err error
148
149 l := i.Logger.With("handler", "ingestPublicKey")
150 l = l.With("nsid", e.Commit.Collection)
151
152 switch e.Commit.Operation {
153 case models.CommitOperationCreate, models.CommitOperationUpdate:
154 l.Debug("processing add of pubkey")
155 raw := json.RawMessage(e.Commit.Record)
156 record := tangled.PublicKey{}
157 err = json.Unmarshal(raw, &record)
158 if err != nil {
159 l.Error("invalid record", "err", err)
160 return err
161 }
162
163 name := record.Name
164 key := record.Key
165 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
166 case models.CommitOperationDelete:
167 l.Debug("processing delete of pubkey")
168 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
169 }
170
171 if err != nil {
172 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
173 }
174
175 return nil
176}
177
178func (i *Ingester) ingestArtifact(e *models.Event) error {
179 did := e.Did
180 var err error
181
182 l := i.Logger.With("handler", "ingestArtifact")
183 l = l.With("nsid", e.Commit.Collection)
184
185 switch e.Commit.Operation {
186 case models.CommitOperationCreate, models.CommitOperationUpdate:
187 raw := json.RawMessage(e.Commit.Record)
188 record := tangled.RepoArtifact{}
189 err = json.Unmarshal(raw, &record)
190 if err != nil {
191 l.Error("invalid record", "err", err)
192 return err
193 }
194
195 repoAt, err := syntax.ParseATURI(record.Repo)
196 if err != nil {
197 return err
198 }
199
200 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
201 if err != nil {
202 return err
203 }
204
205 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
206 if err != nil || !ok {
207 return err
208 }
209
210 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
211 if err != nil {
212 createdAt = time.Now()
213 }
214
215 artifact := db.Artifact{
216 Did: did,
217 Rkey: e.Commit.RKey,
218 RepoAt: repoAt,
219 Tag: plumbing.Hash(record.Tag),
220 CreatedAt: createdAt,
221 BlobCid: cid.Cid(record.Artifact.Ref),
222 Name: record.Name,
223 Size: uint64(record.Artifact.Size),
224 MimeType: record.Artifact.MimeType,
225 }
226
227 err = db.AddArtifact(i.Db, artifact)
228 case models.CommitOperationDelete:
229 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
230 }
231
232 if err != nil {
233 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
234 }
235
236 return nil
237}
238
239func (i *Ingester) ingestProfile(e *models.Event) error {
240 did := e.Did
241 var err error
242
243 l := i.Logger.With("handler", "ingestProfile")
244 l = l.With("nsid", e.Commit.Collection)
245
246 if e.Commit.RKey != "self" {
247 return fmt.Errorf("ingestProfile only ingests `self` record")
248 }
249
250 switch e.Commit.Operation {
251 case models.CommitOperationCreate, models.CommitOperationUpdate:
252 raw := json.RawMessage(e.Commit.Record)
253 record := tangled.ActorProfile{}
254 err = json.Unmarshal(raw, &record)
255 if err != nil {
256 l.Error("invalid record", "err", err)
257 return err
258 }
259
260 description := ""
261 if record.Description != nil {
262 description = *record.Description
263 }
264
265 includeBluesky := record.Bluesky
266
267 location := ""
268 if record.Location != nil {
269 location = *record.Location
270 }
271
272 var links [5]string
273 for i, l := range record.Links {
274 if i < 5 {
275 links[i] = l
276 }
277 }
278
279 var stats [2]db.VanityStat
280 for i, s := range record.Stats {
281 if i < 2 {
282 stats[i].Kind = db.VanityStatKind(s)
283 }
284 }
285
286 var pinned [6]syntax.ATURI
287 for i, r := range record.PinnedRepositories {
288 if i < 6 {
289 pinned[i] = syntax.ATURI(r)
290 }
291 }
292
293 profile := db.Profile{
294 Did: did,
295 Description: description,
296 IncludeBluesky: includeBluesky,
297 Location: location,
298 Links: links,
299 Stats: stats,
300 PinnedRepos: pinned,
301 }
302
303 ddb, ok := i.Db.Execer.(*db.DB)
304 if !ok {
305 return fmt.Errorf("failed to index profile record, invalid db cast")
306 }
307
308 tx, err := ddb.Begin()
309 if err != nil {
310 return fmt.Errorf("failed to start transaction")
311 }
312
313 err = db.ValidateProfile(tx, &profile)
314 if err != nil {
315 return fmt.Errorf("invalid profile record")
316 }
317
318 err = db.UpsertProfile(tx, &profile)
319 case models.CommitOperationDelete:
320 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
321 }
322
323 if err != nil {
324 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
325 }
326
327 return nil
328}
329
330func (i *Ingester) ingestSpindleMember(e *models.Event) error {
331 did := e.Did
332 var err error
333
334 l := i.Logger.With("handler", "ingestSpindleMember")
335 l = l.With("nsid", e.Commit.Collection)
336
337 switch e.Commit.Operation {
338 case models.CommitOperationCreate:
339 raw := json.RawMessage(e.Commit.Record)
340 record := tangled.SpindleMember{}
341 err = json.Unmarshal(raw, &record)
342 if err != nil {
343 l.Error("invalid record", "err", err)
344 return err
345 }
346
347 // only spindle owner can invite to spindles
348 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
349 if err != nil || !ok {
350 return fmt.Errorf("failed to enforce permissions: %w", err)
351 }
352
353 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
354 if err != nil {
355 return err
356 }
357
358 if memberId.Handle.IsInvalidHandle() {
359 return err
360 }
361
362 ddb, ok := i.Db.Execer.(*db.DB)
363 if !ok {
364 return fmt.Errorf("failed to index profile record, invalid db cast")
365 }
366
367 err = db.AddSpindleMember(ddb, db.SpindleMember{
368 Did: syntax.DID(did),
369 Rkey: e.Commit.RKey,
370 Instance: record.Instance,
371 Subject: memberId.DID,
372 })
373 if !ok {
374 return fmt.Errorf("failed to add to db: %w", err)
375 }
376
377 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
378 if err != nil {
379 return fmt.Errorf("failed to update ACLs: %w", err)
380 }
381 case models.CommitOperationDelete:
382 rkey := e.Commit.RKey
383
384 ddb, ok := i.Db.Execer.(*db.DB)
385 if !ok {
386 return fmt.Errorf("failed to index profile record, invalid db cast")
387 }
388
389 // get record from db first
390 members, err := db.GetSpindleMembers(
391 ddb,
392 db.FilterEq("did", did),
393 db.FilterEq("rkey", rkey),
394 )
395 if err != nil || len(members) != 1 {
396 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
397 }
398 member := members[0]
399
400 tx, err := ddb.Begin()
401 if err != nil {
402 return fmt.Errorf("failed to start txn: %w", err)
403 }
404
405 // remove record by rkey && update enforcer
406 if err = db.RemoveSpindleMember(
407 tx,
408 db.FilterEq("did", did),
409 db.FilterEq("rkey", rkey),
410 ); err != nil {
411 return fmt.Errorf("failed to remove from db: %w", err)
412 }
413
414 // update enforcer
415 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
416 if err != nil {
417 return fmt.Errorf("failed to update ACLs: %w", err)
418 }
419
420 if err = tx.Commit(); err != nil {
421 return fmt.Errorf("failed to commit txn: %w", err)
422 }
423
424 if err = i.Enforcer.E.SavePolicy(); err != nil {
425 return fmt.Errorf("failed to save ACLs: %w", err)
426 }
427 }
428
429 return nil
430}
431
432func (i *Ingester) ingestSpindle(e *models.Event) error {
433 did := e.Did
434 var err error
435
436 l := i.Logger.With("handler", "ingestSpindle")
437 l = l.With("nsid", e.Commit.Collection)
438
439 switch e.Commit.Operation {
440 case models.CommitOperationCreate:
441 raw := json.RawMessage(e.Commit.Record)
442 record := tangled.Spindle{}
443 err = json.Unmarshal(raw, &record)
444 if err != nil {
445 l.Error("invalid record", "err", err)
446 return err
447 }
448
449 instance := e.Commit.RKey
450
451 ddb, ok := i.Db.Execer.(*db.DB)
452 if !ok {
453 return fmt.Errorf("failed to index profile record, invalid db cast")
454 }
455
456 err := db.AddSpindle(ddb, db.Spindle{
457 Owner: syntax.DID(did),
458 Instance: instance,
459 })
460 if err != nil {
461 l.Error("failed to add spindle to db", "err", err, "instance", instance)
462 return err
463 }
464
465 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev)
466 if err != nil {
467 l.Error("failed to add spindle to db", "err", err, "instance", instance)
468 return err
469 }
470
471 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did)
472 if err != nil {
473 return fmt.Errorf("failed to mark verified: %w", err)
474 }
475
476 return nil
477
478 case models.CommitOperationDelete:
479 instance := e.Commit.RKey
480
481 ddb, ok := i.Db.Execer.(*db.DB)
482 if !ok {
483 return fmt.Errorf("failed to index profile record, invalid db cast")
484 }
485
486 // get record from db first
487 spindles, err := db.GetSpindles(
488 ddb,
489 db.FilterEq("owner", did),
490 db.FilterEq("instance", instance),
491 )
492 if err != nil || len(spindles) != 1 {
493 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
494 }
495 spindle := spindles[0]
496
497 tx, err := ddb.Begin()
498 if err != nil {
499 return err
500 }
501 defer func() {
502 tx.Rollback()
503 i.Enforcer.E.LoadPolicy()
504 }()
505
506 err = db.DeleteSpindle(
507 tx,
508 db.FilterEq("owner", did),
509 db.FilterEq("instance", instance),
510 )
511 if err != nil {
512 return err
513 }
514
515 if spindle.Verified != nil {
516 err = i.Enforcer.RemoveSpindle(instance)
517 if err != nil {
518 return err
519 }
520 }
521
522 err = tx.Commit()
523 if err != nil {
524 return err
525 }
526
527 err = i.Enforcer.E.SavePolicy()
528 if err != nil {
529 return err
530 }
531 }
532
533 return nil
534}