1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/config"
16 "tangled.sh/tangled.sh/core/appview/db"
17 "tangled.sh/tangled.sh/core/appview/spindleverify"
18 "tangled.sh/tangled.sh/core/idresolver"
19 "tangled.sh/tangled.sh/core/rbac"
20)
21
22type Ingester struct {
23 Db db.DbWrapper
24 Enforcer *rbac.Enforcer
25 IdResolver *idresolver.Resolver
26 Config *config.Config
27 Logger *slog.Logger
28}
29
30type processFunc func(ctx context.Context, e *models.Event) error
31
32func (i *Ingester) Ingest() processFunc {
33 return func(ctx context.Context, e *models.Event) error {
34 var err error
35 defer func() {
36 eventTime := e.TimeUS
37 lastTimeUs := eventTime + 1
38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
40 }
41 }()
42
43 l := i.Logger.With("kind", e.Kind)
44 switch e.Kind {
45 case models.EventKindAccount:
46 if !e.Account.Active && *e.Account.Status == "deactivated" {
47 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
48 }
49 case models.EventKindIdentity:
50 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
51 case models.EventKindCommit:
52 switch e.Commit.Collection {
53 case tangled.GraphFollowNSID:
54 err = i.ingestFollow(e)
55 case tangled.FeedStarNSID:
56 err = i.ingestStar(e)
57 case tangled.PublicKeyNSID:
58 err = i.ingestPublicKey(e)
59 case tangled.RepoArtifactNSID:
60 err = i.ingestArtifact(e)
61 case tangled.ActorProfileNSID:
62 err = i.ingestProfile(e)
63 case tangled.SpindleMemberNSID:
64 err = i.ingestSpindleMember(e)
65 case tangled.SpindleNSID:
66 err = i.ingestSpindle(e)
67 case tangled.StringNSID:
68 err = i.ingestString(e)
69 }
70 l = i.Logger.With("nsid", e.Commit.Collection)
71 }
72
73 if err != nil {
74 l.Error("error ingesting record", "err", err)
75 }
76
77 return err
78 }
79}
80
81func (i *Ingester) ingestStar(e *models.Event) error {
82 var err error
83 did := e.Did
84
85 l := i.Logger.With("handler", "ingestStar")
86 l = l.With("nsid", e.Commit.Collection)
87
88 switch e.Commit.Operation {
89 case models.CommitOperationCreate, models.CommitOperationUpdate:
90 var subjectUri syntax.ATURI
91
92 raw := json.RawMessage(e.Commit.Record)
93 record := tangled.FeedStar{}
94 err := json.Unmarshal(raw, &record)
95 if err != nil {
96 l.Error("invalid record", "err", err)
97 return err
98 }
99
100 subjectUri, err = syntax.ParseATURI(record.Subject)
101 if err != nil {
102 l.Error("invalid record", "err", err)
103 return err
104 }
105 err = db.AddStar(i.Db, &db.Star{
106 StarredByDid: did,
107 RepoAt: subjectUri,
108 Rkey: e.Commit.RKey,
109 })
110 case models.CommitOperationDelete:
111 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
112 }
113
114 if err != nil {
115 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
116 }
117
118 return nil
119}
120
121func (i *Ingester) ingestFollow(e *models.Event) error {
122 var err error
123 did := e.Did
124
125 l := i.Logger.With("handler", "ingestFollow")
126 l = l.With("nsid", e.Commit.Collection)
127
128 switch e.Commit.Operation {
129 case models.CommitOperationCreate, models.CommitOperationUpdate:
130 raw := json.RawMessage(e.Commit.Record)
131 record := tangled.GraphFollow{}
132 err = json.Unmarshal(raw, &record)
133 if err != nil {
134 l.Error("invalid record", "err", err)
135 return err
136 }
137
138 err = db.AddFollow(i.Db, &db.Follow{
139 UserDid: did,
140 SubjectDid: record.Subject,
141 Rkey: e.Commit.RKey,
142 })
143 case models.CommitOperationDelete:
144 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
145 }
146
147 if err != nil {
148 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
149 }
150
151 return nil
152}
153
154func (i *Ingester) ingestPublicKey(e *models.Event) error {
155 did := e.Did
156 var err error
157
158 l := i.Logger.With("handler", "ingestPublicKey")
159 l = l.With("nsid", e.Commit.Collection)
160
161 switch e.Commit.Operation {
162 case models.CommitOperationCreate, models.CommitOperationUpdate:
163 l.Debug("processing add of pubkey")
164 raw := json.RawMessage(e.Commit.Record)
165 record := tangled.PublicKey{}
166 err = json.Unmarshal(raw, &record)
167 if err != nil {
168 l.Error("invalid record", "err", err)
169 return err
170 }
171
172 name := record.Name
173 key := record.Key
174 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
175 case models.CommitOperationDelete:
176 l.Debug("processing delete of pubkey")
177 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
178 }
179
180 if err != nil {
181 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
182 }
183
184 return nil
185}
186
187func (i *Ingester) ingestArtifact(e *models.Event) error {
188 did := e.Did
189 var err error
190
191 l := i.Logger.With("handler", "ingestArtifact")
192 l = l.With("nsid", e.Commit.Collection)
193
194 switch e.Commit.Operation {
195 case models.CommitOperationCreate, models.CommitOperationUpdate:
196 raw := json.RawMessage(e.Commit.Record)
197 record := tangled.RepoArtifact{}
198 err = json.Unmarshal(raw, &record)
199 if err != nil {
200 l.Error("invalid record", "err", err)
201 return err
202 }
203
204 repoAt, err := syntax.ParseATURI(record.Repo)
205 if err != nil {
206 return err
207 }
208
209 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
210 if err != nil {
211 return err
212 }
213
214 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
215 if err != nil || !ok {
216 return err
217 }
218
219 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
220 if err != nil {
221 createdAt = time.Now()
222 }
223
224 artifact := db.Artifact{
225 Did: did,
226 Rkey: e.Commit.RKey,
227 RepoAt: repoAt,
228 Tag: plumbing.Hash(record.Tag),
229 CreatedAt: createdAt,
230 BlobCid: cid.Cid(record.Artifact.Ref),
231 Name: record.Name,
232 Size: uint64(record.Artifact.Size),
233 MimeType: record.Artifact.MimeType,
234 }
235
236 err = db.AddArtifact(i.Db, artifact)
237 case models.CommitOperationDelete:
238 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
239 }
240
241 if err != nil {
242 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
243 }
244
245 return nil
246}
247
248func (i *Ingester) ingestProfile(e *models.Event) error {
249 did := e.Did
250 var err error
251
252 l := i.Logger.With("handler", "ingestProfile")
253 l = l.With("nsid", e.Commit.Collection)
254
255 if e.Commit.RKey != "self" {
256 return fmt.Errorf("ingestProfile only ingests `self` record")
257 }
258
259 switch e.Commit.Operation {
260 case models.CommitOperationCreate, models.CommitOperationUpdate:
261 raw := json.RawMessage(e.Commit.Record)
262 record := tangled.ActorProfile{}
263 err = json.Unmarshal(raw, &record)
264 if err != nil {
265 l.Error("invalid record", "err", err)
266 return err
267 }
268
269 description := ""
270 if record.Description != nil {
271 description = *record.Description
272 }
273
274 includeBluesky := record.Bluesky
275
276 location := ""
277 if record.Location != nil {
278 location = *record.Location
279 }
280
281 var links [5]string
282 for i, l := range record.Links {
283 if i < 5 {
284 links[i] = l
285 }
286 }
287
288 var stats [2]db.VanityStat
289 for i, s := range record.Stats {
290 if i < 2 {
291 stats[i].Kind = db.VanityStatKind(s)
292 }
293 }
294
295 var pinned [6]syntax.ATURI
296 for i, r := range record.PinnedRepositories {
297 if i < 6 {
298 pinned[i] = syntax.ATURI(r)
299 }
300 }
301
302 profile := db.Profile{
303 Did: did,
304 Description: description,
305 IncludeBluesky: includeBluesky,
306 Location: location,
307 Links: links,
308 Stats: stats,
309 PinnedRepos: pinned,
310 }
311
312 ddb, ok := i.Db.Execer.(*db.DB)
313 if !ok {
314 return fmt.Errorf("failed to index profile record, invalid db cast")
315 }
316
317 tx, err := ddb.Begin()
318 if err != nil {
319 return fmt.Errorf("failed to start transaction")
320 }
321
322 err = db.ValidateProfile(tx, &profile)
323 if err != nil {
324 return fmt.Errorf("invalid profile record")
325 }
326
327 err = db.UpsertProfile(tx, &profile)
328 case models.CommitOperationDelete:
329 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
330 }
331
332 if err != nil {
333 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
334 }
335
336 return nil
337}
338
339func (i *Ingester) ingestSpindleMember(e *models.Event) error {
340 did := e.Did
341 var err error
342
343 l := i.Logger.With("handler", "ingestSpindleMember")
344 l = l.With("nsid", e.Commit.Collection)
345
346 switch e.Commit.Operation {
347 case models.CommitOperationCreate:
348 raw := json.RawMessage(e.Commit.Record)
349 record := tangled.SpindleMember{}
350 err = json.Unmarshal(raw, &record)
351 if err != nil {
352 l.Error("invalid record", "err", err)
353 return err
354 }
355
356 // only spindle owner can invite to spindles
357 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
358 if err != nil || !ok {
359 return fmt.Errorf("failed to enforce permissions: %w", err)
360 }
361
362 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
363 if err != nil {
364 return err
365 }
366
367 if memberId.Handle.IsInvalidHandle() {
368 return err
369 }
370
371 ddb, ok := i.Db.Execer.(*db.DB)
372 if !ok {
373 return fmt.Errorf("failed to index profile record, invalid db cast")
374 }
375
376 err = db.AddSpindleMember(ddb, db.SpindleMember{
377 Did: syntax.DID(did),
378 Rkey: e.Commit.RKey,
379 Instance: record.Instance,
380 Subject: memberId.DID,
381 })
382 if !ok {
383 return fmt.Errorf("failed to add to db: %w", err)
384 }
385
386 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
387 if err != nil {
388 return fmt.Errorf("failed to update ACLs: %w", err)
389 }
390 case models.CommitOperationDelete:
391 rkey := e.Commit.RKey
392
393 ddb, ok := i.Db.Execer.(*db.DB)
394 if !ok {
395 return fmt.Errorf("failed to index profile record, invalid db cast")
396 }
397
398 // get record from db first
399 members, err := db.GetSpindleMembers(
400 ddb,
401 db.FilterEq("did", did),
402 db.FilterEq("rkey", rkey),
403 )
404 if err != nil || len(members) != 1 {
405 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
406 }
407 member := members[0]
408
409 tx, err := ddb.Begin()
410 if err != nil {
411 return fmt.Errorf("failed to start txn: %w", err)
412 }
413
414 // remove record by rkey && update enforcer
415 if err = db.RemoveSpindleMember(
416 tx,
417 db.FilterEq("did", did),
418 db.FilterEq("rkey", rkey),
419 ); err != nil {
420 return fmt.Errorf("failed to remove from db: %w", err)
421 }
422
423 // update enforcer
424 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
425 if err != nil {
426 return fmt.Errorf("failed to update ACLs: %w", err)
427 }
428
429 if err = tx.Commit(); err != nil {
430 return fmt.Errorf("failed to commit txn: %w", err)
431 }
432
433 if err = i.Enforcer.E.SavePolicy(); err != nil {
434 return fmt.Errorf("failed to save ACLs: %w", err)
435 }
436 }
437
438 return nil
439}
440
441func (i *Ingester) ingestSpindle(e *models.Event) error {
442 did := e.Did
443 var err error
444
445 l := i.Logger.With("handler", "ingestSpindle")
446 l = l.With("nsid", e.Commit.Collection)
447
448 switch e.Commit.Operation {
449 case models.CommitOperationCreate:
450 raw := json.RawMessage(e.Commit.Record)
451 record := tangled.Spindle{}
452 err = json.Unmarshal(raw, &record)
453 if err != nil {
454 l.Error("invalid record", "err", err)
455 return err
456 }
457
458 instance := e.Commit.RKey
459
460 ddb, ok := i.Db.Execer.(*db.DB)
461 if !ok {
462 return fmt.Errorf("failed to index profile record, invalid db cast")
463 }
464
465 err := db.AddSpindle(ddb, db.Spindle{
466 Owner: syntax.DID(did),
467 Instance: instance,
468 })
469 if err != nil {
470 l.Error("failed to add spindle to db", "err", err, "instance", instance)
471 return err
472 }
473
474 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev)
475 if err != nil {
476 l.Error("failed to add spindle to db", "err", err, "instance", instance)
477 return err
478 }
479
480 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did)
481 if err != nil {
482 return fmt.Errorf("failed to mark verified: %w", err)
483 }
484
485 return nil
486
487 case models.CommitOperationDelete:
488 instance := e.Commit.RKey
489
490 ddb, ok := i.Db.Execer.(*db.DB)
491 if !ok {
492 return fmt.Errorf("failed to index profile record, invalid db cast")
493 }
494
495 // get record from db first
496 spindles, err := db.GetSpindles(
497 ddb,
498 db.FilterEq("owner", did),
499 db.FilterEq("instance", instance),
500 )
501 if err != nil || len(spindles) != 1 {
502 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
503 }
504 spindle := spindles[0]
505
506 tx, err := ddb.Begin()
507 if err != nil {
508 return err
509 }
510 defer func() {
511 tx.Rollback()
512 i.Enforcer.E.LoadPolicy()
513 }()
514
515 // remove spindle members first
516 err = db.RemoveSpindleMember(
517 tx,
518 db.FilterEq("owner", did),
519 db.FilterEq("instance", instance),
520 )
521 if err != nil {
522 return err
523 }
524
525 err = db.DeleteSpindle(
526 tx,
527 db.FilterEq("owner", did),
528 db.FilterEq("instance", instance),
529 )
530 if err != nil {
531 return err
532 }
533
534 if spindle.Verified != nil {
535 err = i.Enforcer.RemoveSpindle(instance)
536 if err != nil {
537 return err
538 }
539 }
540
541 err = tx.Commit()
542 if err != nil {
543 return err
544 }
545
546 err = i.Enforcer.E.SavePolicy()
547 if err != nil {
548 return err
549 }
550 }
551
552 return nil
553}
554
555func (i *Ingester) ingestString(e *models.Event) error {
556 did := e.Did
557 rkey := e.Commit.RKey
558
559 var err error
560
561 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
562 l.Info("ingesting record")
563
564 ddb, ok := i.Db.Execer.(*db.DB)
565 if !ok {
566 return fmt.Errorf("failed to index string record, invalid db cast")
567 }
568
569 switch e.Commit.Operation {
570 case models.CommitOperationCreate, models.CommitOperationUpdate:
571 raw := json.RawMessage(e.Commit.Record)
572 record := tangled.String{}
573 err = json.Unmarshal(raw, &record)
574 if err != nil {
575 l.Error("invalid record", "err", err)
576 return err
577 }
578
579 string := db.StringFromRecord(did, rkey, record)
580
581 if err = string.Validate(); err != nil {
582 l.Error("invalid record", "err", err)
583 return err
584 }
585
586 if err = db.AddString(ddb, string); err != nil {
587 l.Error("failed to add string", "err", err)
588 return err
589 }
590
591 return nil
592
593 case models.CommitOperationDelete:
594 if err := db.DeleteString(
595 ddb,
596 db.FilterEq("did", did),
597 db.FilterEq("rkey", rkey),
598 ); err != nil {
599 l.Error("failed to delete", "err", err)
600 return fmt.Errorf("failed to delete string record: %w", err)
601 }
602
603 return nil
604 }
605
606 return nil
607}