1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/config"
16 "tangled.sh/tangled.sh/core/appview/db"
17 "tangled.sh/tangled.sh/core/appview/spindleverify"
18 "tangled.sh/tangled.sh/core/idresolver"
19 "tangled.sh/tangled.sh/core/rbac"
20)
21
22type Ingester struct {
23 Db db.DbWrapper
24 Enforcer *rbac.Enforcer
25 IdResolver *idresolver.Resolver
26 Config *config.Config
27 Logger *slog.Logger
28}
29
30type processFunc func(ctx context.Context, e *models.Event) error
31
32func (i *Ingester) Ingest() processFunc {
33 return func(ctx context.Context, e *models.Event) error {
34 var err error
35 defer func() {
36 eventTime := e.TimeUS
37 lastTimeUs := eventTime + 1
38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
40 }
41 }()
42
43 l := i.Logger.With("kind", e.Kind)
44 switch e.Kind {
45 case models.EventKindAccount:
46 if !e.Account.Active && *e.Account.Status == "deactivated" {
47 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
48 }
49 case models.EventKindIdentity:
50 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
51 case models.EventKindCommit:
52 switch e.Commit.Collection {
53 case tangled.GraphFollowNSID:
54 err = i.ingestFollow(e)
55 case tangled.FeedStarNSID:
56 err = i.ingestStar(e)
57 case tangled.PublicKeyNSID:
58 err = i.ingestPublicKey(e)
59 case tangled.RepoArtifactNSID:
60 err = i.ingestArtifact(e)
61 case tangled.ActorProfileNSID:
62 err = i.ingestProfile(e)
63 case tangled.SpindleMemberNSID:
64 err = i.ingestSpindleMember(e)
65 case tangled.SpindleNSID:
66 err = i.ingestSpindle(e)
67 case tangled.StringNSID:
68 err = i.ingestString(e)
69 }
70 l = i.Logger.With("nsid", e.Commit.Collection)
71 }
72
73 if err != nil {
74 l.Error("error ingesting record", "err", err)
75 }
76
77 return err
78 }
79}
80
81func (i *Ingester) ingestStar(e *models.Event) error {
82 var err error
83 did := e.Did
84
85 l := i.Logger.With("handler", "ingestStar")
86 l = l.With("nsid", e.Commit.Collection)
87
88 switch e.Commit.Operation {
89 case models.CommitOperationCreate, models.CommitOperationUpdate:
90 var subjectUri syntax.ATURI
91
92 raw := json.RawMessage(e.Commit.Record)
93 record := tangled.FeedStar{}
94 err := json.Unmarshal(raw, &record)
95 if err != nil {
96 l.Error("invalid record", "err", err)
97 return err
98 }
99
100 subjectUri, err = syntax.ParseATURI(record.Subject)
101 if err != nil {
102 l.Error("invalid record", "err", err)
103 return err
104 }
105 err = db.AddStar(i.Db, &db.Star{
106 StarredByDid: did,
107 RepoAt: subjectUri,
108 Rkey: e.Commit.RKey,
109 })
110 case models.CommitOperationDelete:
111 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
112 }
113
114 if err != nil {
115 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
116 }
117
118 return nil
119}
120
121func (i *Ingester) ingestFollow(e *models.Event) error {
122 var err error
123 did := e.Did
124
125 l := i.Logger.With("handler", "ingestFollow")
126 l = l.With("nsid", e.Commit.Collection)
127
128 switch e.Commit.Operation {
129 case models.CommitOperationCreate, models.CommitOperationUpdate:
130 raw := json.RawMessage(e.Commit.Record)
131 record := tangled.GraphFollow{}
132 err = json.Unmarshal(raw, &record)
133 if err != nil {
134 l.Error("invalid record", "err", err)
135 return err
136 }
137
138 err = db.AddFollow(i.Db, &db.Follow{
139 UserDid: did,
140 SubjectDid: record.Subject,
141 Rkey: e.Commit.RKey,
142 })
143 case models.CommitOperationDelete:
144 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
145 }
146
147 if err != nil {
148 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
149 }
150
151 return nil
152}
153
154func (i *Ingester) ingestPublicKey(e *models.Event) error {
155 did := e.Did
156 var err error
157
158 l := i.Logger.With("handler", "ingestPublicKey")
159 l = l.With("nsid", e.Commit.Collection)
160
161 switch e.Commit.Operation {
162 case models.CommitOperationCreate, models.CommitOperationUpdate:
163 l.Debug("processing add of pubkey")
164 raw := json.RawMessage(e.Commit.Record)
165 record := tangled.PublicKey{}
166 err = json.Unmarshal(raw, &record)
167 if err != nil {
168 l.Error("invalid record", "err", err)
169 return err
170 }
171
172 name := record.Name
173 key := record.Key
174 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
175 case models.CommitOperationDelete:
176 l.Debug("processing delete of pubkey")
177 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
178 }
179
180 if err != nil {
181 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
182 }
183
184 return nil
185}
186
187func (i *Ingester) ingestArtifact(e *models.Event) error {
188 did := e.Did
189 var err error
190
191 l := i.Logger.With("handler", "ingestArtifact")
192 l = l.With("nsid", e.Commit.Collection)
193
194 switch e.Commit.Operation {
195 case models.CommitOperationCreate, models.CommitOperationUpdate:
196 raw := json.RawMessage(e.Commit.Record)
197 record := tangled.RepoArtifact{}
198 err = json.Unmarshal(raw, &record)
199 if err != nil {
200 l.Error("invalid record", "err", err)
201 return err
202 }
203
204 repoAt, err := syntax.ParseATURI(record.Repo)
205 if err != nil {
206 return err
207 }
208
209 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
210 if err != nil {
211 return err
212 }
213
214 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
215 if err != nil || !ok {
216 return err
217 }
218
219 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
220 if err != nil {
221 createdAt = time.Now()
222 }
223
224 artifact := db.Artifact{
225 Did: did,
226 Rkey: e.Commit.RKey,
227 RepoAt: repoAt,
228 Tag: plumbing.Hash(record.Tag),
229 CreatedAt: createdAt,
230 BlobCid: cid.Cid(record.Artifact.Ref),
231 Name: record.Name,
232 Size: uint64(record.Artifact.Size),
233 MimeType: record.Artifact.MimeType,
234 }
235
236 err = db.AddArtifact(i.Db, artifact)
237 case models.CommitOperationDelete:
238 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
239 }
240
241 if err != nil {
242 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
243 }
244
245 return nil
246}
247
248func (i *Ingester) ingestProfile(e *models.Event) error {
249 did := e.Did
250 var err error
251
252 l := i.Logger.With("handler", "ingestProfile")
253 l = l.With("nsid", e.Commit.Collection)
254
255 if e.Commit.RKey != "self" {
256 return fmt.Errorf("ingestProfile only ingests `self` record")
257 }
258
259 switch e.Commit.Operation {
260 case models.CommitOperationCreate, models.CommitOperationUpdate:
261 raw := json.RawMessage(e.Commit.Record)
262 record := tangled.ActorProfile{}
263 err = json.Unmarshal(raw, &record)
264 if err != nil {
265 l.Error("invalid record", "err", err)
266 return err
267 }
268
269 description := ""
270 if record.Description != nil {
271 description = *record.Description
272 }
273
274 includeBluesky := record.Bluesky
275
276 location := ""
277 if record.Location != nil {
278 location = *record.Location
279 }
280
281 var links [5]string
282 for i, l := range record.Links {
283 if i < 5 {
284 links[i] = l
285 }
286 }
287
288 var stats [2]db.VanityStat
289 for i, s := range record.Stats {
290 if i < 2 {
291 stats[i].Kind = db.VanityStatKind(s)
292 }
293 }
294
295 var pinned [6]syntax.ATURI
296 for i, r := range record.PinnedRepositories {
297 if i < 6 {
298 pinned[i] = syntax.ATURI(r)
299 }
300 }
301
302 profile := db.Profile{
303 Did: did,
304 Description: description,
305 IncludeBluesky: includeBluesky,
306 Location: location,
307 Links: links,
308 Stats: stats,
309 PinnedRepos: pinned,
310 }
311
312 ddb, ok := i.Db.Execer.(*db.DB)
313 if !ok {
314 return fmt.Errorf("failed to index profile record, invalid db cast")
315 }
316
317 tx, err := ddb.Begin()
318 if err != nil {
319 return fmt.Errorf("failed to start transaction")
320 }
321
322 err = db.ValidateProfile(tx, &profile)
323 if err != nil {
324 return fmt.Errorf("invalid profile record")
325 }
326
327 err = db.UpsertProfile(tx, &profile)
328 case models.CommitOperationDelete:
329 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
330 }
331
332 if err != nil {
333 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
334 }
335
336 return nil
337}
338
339func (i *Ingester) ingestSpindleMember(e *models.Event) error {
340 did := e.Did
341 var err error
342
343 l := i.Logger.With("handler", "ingestSpindleMember")
344 l = l.With("nsid", e.Commit.Collection)
345
346 switch e.Commit.Operation {
347 case models.CommitOperationCreate:
348 raw := json.RawMessage(e.Commit.Record)
349 record := tangled.SpindleMember{}
350 err = json.Unmarshal(raw, &record)
351 if err != nil {
352 l.Error("invalid record", "err", err)
353 return err
354 }
355
356 // only spindle owner can invite to spindles
357 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
358 if err != nil || !ok {
359 return fmt.Errorf("failed to enforce permissions: %w", err)
360 }
361
362 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
363 if err != nil {
364 return err
365 }
366
367 if memberId.Handle.IsInvalidHandle() {
368 return err
369 }
370
371 ddb, ok := i.Db.Execer.(*db.DB)
372 if !ok {
373 return fmt.Errorf("failed to index profile record, invalid db cast")
374 }
375
376 err = db.AddSpindleMember(ddb, db.SpindleMember{
377 Did: syntax.DID(did),
378 Rkey: e.Commit.RKey,
379 Instance: record.Instance,
380 Subject: memberId.DID,
381 })
382 if !ok {
383 return fmt.Errorf("failed to add to db: %w", err)
384 }
385
386 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
387 if err != nil {
388 return fmt.Errorf("failed to update ACLs: %w", err)
389 }
390
391 l.Info("added spindle member")
392 case models.CommitOperationDelete:
393 rkey := e.Commit.RKey
394
395 ddb, ok := i.Db.Execer.(*db.DB)
396 if !ok {
397 return fmt.Errorf("failed to index profile record, invalid db cast")
398 }
399
400 // get record from db first
401 members, err := db.GetSpindleMembers(
402 ddb,
403 db.FilterEq("did", did),
404 db.FilterEq("rkey", rkey),
405 )
406 if err != nil || len(members) != 1 {
407 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
408 }
409 member := members[0]
410
411 tx, err := ddb.Begin()
412 if err != nil {
413 return fmt.Errorf("failed to start txn: %w", err)
414 }
415
416 // remove record by rkey && update enforcer
417 if err = db.RemoveSpindleMember(
418 tx,
419 db.FilterEq("did", did),
420 db.FilterEq("rkey", rkey),
421 ); err != nil {
422 return fmt.Errorf("failed to remove from db: %w", err)
423 }
424
425 // update enforcer
426 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
427 if err != nil {
428 return fmt.Errorf("failed to update ACLs: %w", err)
429 }
430
431 if err = tx.Commit(); err != nil {
432 return fmt.Errorf("failed to commit txn: %w", err)
433 }
434
435 if err = i.Enforcer.E.SavePolicy(); err != nil {
436 return fmt.Errorf("failed to save ACLs: %w", err)
437 }
438
439 l.Info("removed spindle member")
440 }
441
442 return nil
443}
444
445func (i *Ingester) ingestSpindle(e *models.Event) error {
446 did := e.Did
447 var err error
448
449 l := i.Logger.With("handler", "ingestSpindle")
450 l = l.With("nsid", e.Commit.Collection)
451
452 switch e.Commit.Operation {
453 case models.CommitOperationCreate:
454 raw := json.RawMessage(e.Commit.Record)
455 record := tangled.Spindle{}
456 err = json.Unmarshal(raw, &record)
457 if err != nil {
458 l.Error("invalid record", "err", err)
459 return err
460 }
461
462 instance := e.Commit.RKey
463
464 ddb, ok := i.Db.Execer.(*db.DB)
465 if !ok {
466 return fmt.Errorf("failed to index profile record, invalid db cast")
467 }
468
469 err := db.AddSpindle(ddb, db.Spindle{
470 Owner: syntax.DID(did),
471 Instance: instance,
472 })
473 if err != nil {
474 l.Error("failed to add spindle to db", "err", err, "instance", instance)
475 return err
476 }
477
478 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev)
479 if err != nil {
480 l.Error("failed to add spindle to db", "err", err, "instance", instance)
481 return err
482 }
483
484 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did)
485 if err != nil {
486 return fmt.Errorf("failed to mark verified: %w", err)
487 }
488
489 return nil
490
491 case models.CommitOperationDelete:
492 instance := e.Commit.RKey
493
494 ddb, ok := i.Db.Execer.(*db.DB)
495 if !ok {
496 return fmt.Errorf("failed to index profile record, invalid db cast")
497 }
498
499 // get record from db first
500 spindles, err := db.GetSpindles(
501 ddb,
502 db.FilterEq("owner", did),
503 db.FilterEq("instance", instance),
504 )
505 if err != nil || len(spindles) != 1 {
506 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
507 }
508 spindle := spindles[0]
509
510 tx, err := ddb.Begin()
511 if err != nil {
512 return err
513 }
514 defer func() {
515 tx.Rollback()
516 i.Enforcer.E.LoadPolicy()
517 }()
518
519 // remove spindle members first
520 err = db.RemoveSpindleMember(
521 tx,
522 db.FilterEq("owner", did),
523 db.FilterEq("instance", instance),
524 )
525 if err != nil {
526 return err
527 }
528
529 err = db.DeleteSpindle(
530 tx,
531 db.FilterEq("owner", did),
532 db.FilterEq("instance", instance),
533 )
534 if err != nil {
535 return err
536 }
537
538 if spindle.Verified != nil {
539 err = i.Enforcer.RemoveSpindle(instance)
540 if err != nil {
541 return err
542 }
543 }
544
545 err = tx.Commit()
546 if err != nil {
547 return err
548 }
549
550 err = i.Enforcer.E.SavePolicy()
551 if err != nil {
552 return err
553 }
554 }
555
556 return nil
557}
558
559func (i *Ingester) ingestString(e *models.Event) error {
560 did := e.Did
561 rkey := e.Commit.RKey
562
563 var err error
564
565 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
566 l.Info("ingesting record")
567
568 ddb, ok := i.Db.Execer.(*db.DB)
569 if !ok {
570 return fmt.Errorf("failed to index string record, invalid db cast")
571 }
572
573 switch e.Commit.Operation {
574 case models.CommitOperationCreate, models.CommitOperationUpdate:
575 raw := json.RawMessage(e.Commit.Record)
576 record := tangled.String{}
577 err = json.Unmarshal(raw, &record)
578 if err != nil {
579 l.Error("invalid record", "err", err)
580 return err
581 }
582
583 string := db.StringFromRecord(did, rkey, record)
584
585 if err = string.Validate(); err != nil {
586 l.Error("invalid record", "err", err)
587 return err
588 }
589
590 if err = db.AddString(ddb, string); err != nil {
591 l.Error("failed to add string", "err", err)
592 return err
593 }
594
595 return nil
596
597 case models.CommitOperationDelete:
598 if err := db.DeleteString(
599 ddb,
600 db.FilterEq("did", did),
601 db.FilterEq("rkey", rkey),
602 ); err != nil {
603 l.Error("failed to delete", "err", err)
604 return fmt.Errorf("failed to delete string record: %w", err)
605 }
606
607 return nil
608 }
609
610 return nil
611}