1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/config"
16 "tangled.sh/tangled.sh/core/appview/db"
17 "tangled.sh/tangled.sh/core/appview/spindleverify"
18 "tangled.sh/tangled.sh/core/idresolver"
19 "tangled.sh/tangled.sh/core/rbac"
20)
21
22type Ingester struct {
23 Db db.DbWrapper
24 Enforcer *rbac.Enforcer
25 IdResolver *idresolver.Resolver
26 Config *config.Config
27 Logger *slog.Logger
28}
29
30type processFunc func(ctx context.Context, e *models.Event) error
31
32func (i *Ingester) Ingest() processFunc {
33 return func(ctx context.Context, e *models.Event) error {
34 var err error
35 defer func() {
36 eventTime := e.TimeUS
37 lastTimeUs := eventTime + 1
38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
40 }
41 }()
42
43 l := i.Logger.With("kind", e.Kind)
44 switch e.Kind {
45 case models.EventKindAccount:
46 if !e.Account.Active && *e.Account.Status == "deactivated" {
47 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
48 }
49 case models.EventKindIdentity:
50 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
51 case models.EventKindCommit:
52 switch e.Commit.Collection {
53 case tangled.GraphFollowNSID:
54 err = i.ingestFollow(e)
55 case tangled.FeedStarNSID:
56 err = i.ingestStar(e)
57 case tangled.PublicKeyNSID:
58 err = i.ingestPublicKey(e)
59 case tangled.RepoArtifactNSID:
60 err = i.ingestArtifact(e)
61 case tangled.ActorProfileNSID:
62 err = i.ingestProfile(e)
63 case tangled.SpindleMemberNSID:
64 err = i.ingestSpindleMember(e)
65 case tangled.SpindleNSID:
66 err = i.ingestSpindle(e)
67 }
68 l = i.Logger.With("nsid", e.Commit.Collection)
69 }
70
71 if err != nil {
72 l.Error("error ingesting record", "err", err)
73 }
74
75 return err
76 }
77}
78
79func (i *Ingester) ingestStar(e *models.Event) error {
80 var err error
81 did := e.Did
82
83 l := i.Logger.With("handler", "ingestStar")
84 l = l.With("nsid", e.Commit.Collection)
85
86 switch e.Commit.Operation {
87 case models.CommitOperationCreate, models.CommitOperationUpdate:
88 var subjectUri syntax.ATURI
89
90 raw := json.RawMessage(e.Commit.Record)
91 record := tangled.FeedStar{}
92 err := json.Unmarshal(raw, &record)
93 if err != nil {
94 l.Error("invalid record", "err", err)
95 return err
96 }
97
98 subjectUri, err = syntax.ParseATURI(record.Subject)
99 if err != nil {
100 l.Error("invalid record", "err", err)
101 return err
102 }
103 err = db.AddStar(i.Db, &db.Star{
104 StarredByDid: did,
105 RepoAt: subjectUri,
106 Rkey: e.Commit.RKey,
107 })
108 case models.CommitOperationDelete:
109 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
110 }
111
112 if err != nil {
113 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
114 }
115
116 return nil
117}
118
119func (i *Ingester) ingestFollow(e *models.Event) error {
120 var err error
121 did := e.Did
122
123 l := i.Logger.With("handler", "ingestFollow")
124 l = l.With("nsid", e.Commit.Collection)
125
126 switch e.Commit.Operation {
127 case models.CommitOperationCreate, models.CommitOperationUpdate:
128 raw := json.RawMessage(e.Commit.Record)
129 record := tangled.GraphFollow{}
130 err = json.Unmarshal(raw, &record)
131 if err != nil {
132 l.Error("invalid record", "err", err)
133 return err
134 }
135
136 err = db.AddFollow(i.Db, &db.Follow{
137 UserDid: did,
138 SubjectDid: record.Subject,
139 Rkey: e.Commit.RKey,
140 })
141 case models.CommitOperationDelete:
142 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
143 }
144
145 if err != nil {
146 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
147 }
148
149 return nil
150}
151
152func (i *Ingester) ingestPublicKey(e *models.Event) error {
153 did := e.Did
154 var err error
155
156 l := i.Logger.With("handler", "ingestPublicKey")
157 l = l.With("nsid", e.Commit.Collection)
158
159 switch e.Commit.Operation {
160 case models.CommitOperationCreate, models.CommitOperationUpdate:
161 l.Debug("processing add of pubkey")
162 raw := json.RawMessage(e.Commit.Record)
163 record := tangled.PublicKey{}
164 err = json.Unmarshal(raw, &record)
165 if err != nil {
166 l.Error("invalid record", "err", err)
167 return err
168 }
169
170 name := record.Name
171 key := record.Key
172 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
173 case models.CommitOperationDelete:
174 l.Debug("processing delete of pubkey")
175 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
176 }
177
178 if err != nil {
179 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
180 }
181
182 return nil
183}
184
185func (i *Ingester) ingestArtifact(e *models.Event) error {
186 did := e.Did
187 var err error
188
189 l := i.Logger.With("handler", "ingestArtifact")
190 l = l.With("nsid", e.Commit.Collection)
191
192 switch e.Commit.Operation {
193 case models.CommitOperationCreate, models.CommitOperationUpdate:
194 raw := json.RawMessage(e.Commit.Record)
195 record := tangled.RepoArtifact{}
196 err = json.Unmarshal(raw, &record)
197 if err != nil {
198 l.Error("invalid record", "err", err)
199 return err
200 }
201
202 repoAt, err := syntax.ParseATURI(record.Repo)
203 if err != nil {
204 return err
205 }
206
207 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
208 if err != nil {
209 return err
210 }
211
212 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
213 if err != nil || !ok {
214 return err
215 }
216
217 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
218 if err != nil {
219 createdAt = time.Now()
220 }
221
222 artifact := db.Artifact{
223 Did: did,
224 Rkey: e.Commit.RKey,
225 RepoAt: repoAt,
226 Tag: plumbing.Hash(record.Tag),
227 CreatedAt: createdAt,
228 BlobCid: cid.Cid(record.Artifact.Ref),
229 Name: record.Name,
230 Size: uint64(record.Artifact.Size),
231 MimeType: record.Artifact.MimeType,
232 }
233
234 err = db.AddArtifact(i.Db, artifact)
235 case models.CommitOperationDelete:
236 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
237 }
238
239 if err != nil {
240 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
241 }
242
243 return nil
244}
245
246func (i *Ingester) ingestProfile(e *models.Event) error {
247 did := e.Did
248 var err error
249
250 l := i.Logger.With("handler", "ingestProfile")
251 l = l.With("nsid", e.Commit.Collection)
252
253 if e.Commit.RKey != "self" {
254 return fmt.Errorf("ingestProfile only ingests `self` record")
255 }
256
257 switch e.Commit.Operation {
258 case models.CommitOperationCreate, models.CommitOperationUpdate:
259 raw := json.RawMessage(e.Commit.Record)
260 record := tangled.ActorProfile{}
261 err = json.Unmarshal(raw, &record)
262 if err != nil {
263 l.Error("invalid record", "err", err)
264 return err
265 }
266
267 description := ""
268 if record.Description != nil {
269 description = *record.Description
270 }
271
272 includeBluesky := record.Bluesky
273
274 location := ""
275 if record.Location != nil {
276 location = *record.Location
277 }
278
279 var links [5]string
280 for i, l := range record.Links {
281 if i < 5 {
282 links[i] = l
283 }
284 }
285
286 var stats [2]db.VanityStat
287 for i, s := range record.Stats {
288 if i < 2 {
289 stats[i].Kind = db.VanityStatKind(s)
290 }
291 }
292
293 var pinned [6]syntax.ATURI
294 for i, r := range record.PinnedRepositories {
295 if i < 6 {
296 pinned[i] = syntax.ATURI(r)
297 }
298 }
299
300 profile := db.Profile{
301 Did: did,
302 Description: description,
303 IncludeBluesky: includeBluesky,
304 Location: location,
305 Links: links,
306 Stats: stats,
307 PinnedRepos: pinned,
308 }
309
310 ddb, ok := i.Db.Execer.(*db.DB)
311 if !ok {
312 return fmt.Errorf("failed to index profile record, invalid db cast")
313 }
314
315 tx, err := ddb.Begin()
316 if err != nil {
317 return fmt.Errorf("failed to start transaction")
318 }
319
320 err = db.ValidateProfile(tx, &profile)
321 if err != nil {
322 return fmt.Errorf("invalid profile record")
323 }
324
325 err = db.UpsertProfile(tx, &profile)
326 case models.CommitOperationDelete:
327 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
328 }
329
330 if err != nil {
331 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
332 }
333
334 return nil
335}
336
337func (i *Ingester) ingestSpindleMember(e *models.Event) error {
338 did := e.Did
339 var err error
340
341 l := i.Logger.With("handler", "ingestSpindleMember")
342 l = l.With("nsid", e.Commit.Collection)
343
344 switch e.Commit.Operation {
345 case models.CommitOperationCreate:
346 raw := json.RawMessage(e.Commit.Record)
347 record := tangled.SpindleMember{}
348 err = json.Unmarshal(raw, &record)
349 if err != nil {
350 l.Error("invalid record", "err", err)
351 return err
352 }
353
354 // only spindle owner can invite to spindles
355 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
356 if err != nil || !ok {
357 return fmt.Errorf("failed to enforce permissions: %w", err)
358 }
359
360 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
361 if err != nil {
362 return err
363 }
364
365 if memberId.Handle.IsInvalidHandle() {
366 return err
367 }
368
369 ddb, ok := i.Db.Execer.(*db.DB)
370 if !ok {
371 return fmt.Errorf("failed to index profile record, invalid db cast")
372 }
373
374 err = db.AddSpindleMember(ddb, db.SpindleMember{
375 Did: syntax.DID(did),
376 Rkey: e.Commit.RKey,
377 Instance: record.Instance,
378 Subject: memberId.DID,
379 })
380 if !ok {
381 return fmt.Errorf("failed to add to db: %w", err)
382 }
383
384 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
385 if err != nil {
386 return fmt.Errorf("failed to update ACLs: %w", err)
387 }
388 case models.CommitOperationDelete:
389 rkey := e.Commit.RKey
390
391 ddb, ok := i.Db.Execer.(*db.DB)
392 if !ok {
393 return fmt.Errorf("failed to index profile record, invalid db cast")
394 }
395
396 // get record from db first
397 members, err := db.GetSpindleMembers(
398 ddb,
399 db.FilterEq("did", did),
400 db.FilterEq("rkey", rkey),
401 )
402 if err != nil || len(members) != 1 {
403 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
404 }
405 member := members[0]
406
407 tx, err := ddb.Begin()
408 if err != nil {
409 return fmt.Errorf("failed to start txn: %w", err)
410 }
411
412 // remove record by rkey && update enforcer
413 if err = db.RemoveSpindleMember(
414 tx,
415 db.FilterEq("did", did),
416 db.FilterEq("rkey", rkey),
417 ); err != nil {
418 return fmt.Errorf("failed to remove from db: %w", err)
419 }
420
421 // update enforcer
422 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
423 if err != nil {
424 return fmt.Errorf("failed to update ACLs: %w", err)
425 }
426
427 if err = tx.Commit(); err != nil {
428 return fmt.Errorf("failed to commit txn: %w", err)
429 }
430
431 if err = i.Enforcer.E.SavePolicy(); err != nil {
432 return fmt.Errorf("failed to save ACLs: %w", err)
433 }
434 }
435
436 return nil
437}
438
439func (i *Ingester) ingestSpindle(e *models.Event) error {
440 did := e.Did
441 var err error
442
443 l := i.Logger.With("handler", "ingestSpindle")
444 l = l.With("nsid", e.Commit.Collection)
445
446 switch e.Commit.Operation {
447 case models.CommitOperationCreate:
448 raw := json.RawMessage(e.Commit.Record)
449 record := tangled.Spindle{}
450 err = json.Unmarshal(raw, &record)
451 if err != nil {
452 l.Error("invalid record", "err", err)
453 return err
454 }
455
456 instance := e.Commit.RKey
457
458 ddb, ok := i.Db.Execer.(*db.DB)
459 if !ok {
460 return fmt.Errorf("failed to index profile record, invalid db cast")
461 }
462
463 err := db.AddSpindle(ddb, db.Spindle{
464 Owner: syntax.DID(did),
465 Instance: instance,
466 })
467 if err != nil {
468 l.Error("failed to add spindle to db", "err", err, "instance", instance)
469 return err
470 }
471
472 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev)
473 if err != nil {
474 l.Error("failed to add spindle to db", "err", err, "instance", instance)
475 return err
476 }
477
478 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did)
479 if err != nil {
480 return fmt.Errorf("failed to mark verified: %w", err)
481 }
482
483 return nil
484
485 case models.CommitOperationDelete:
486 instance := e.Commit.RKey
487
488 ddb, ok := i.Db.Execer.(*db.DB)
489 if !ok {
490 return fmt.Errorf("failed to index profile record, invalid db cast")
491 }
492
493 // get record from db first
494 spindles, err := db.GetSpindles(
495 ddb,
496 db.FilterEq("owner", did),
497 db.FilterEq("instance", instance),
498 )
499 if err != nil || len(spindles) != 1 {
500 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
501 }
502 spindle := spindles[0]
503
504 tx, err := ddb.Begin()
505 if err != nil {
506 return err
507 }
508 defer func() {
509 tx.Rollback()
510 i.Enforcer.E.LoadPolicy()
511 }()
512
513 // remove spindle members first
514 err = db.RemoveSpindleMember(
515 tx,
516 db.FilterEq("owner", did),
517 db.FilterEq("instance", instance),
518 )
519 if err != nil {
520 return err
521 }
522
523 err = db.DeleteSpindle(
524 tx,
525 db.FilterEq("owner", did),
526 db.FilterEq("instance", instance),
527 )
528 if err != nil {
529 return err
530 }
531
532 if spindle.Verified != nil {
533 err = i.Enforcer.RemoveSpindle(instance)
534 if err != nil {
535 return err
536 }
537 }
538
539 err = tx.Commit()
540 if err != nil {
541 return err
542 }
543
544 err = i.Enforcer.E.SavePolicy()
545 if err != nil {
546 return err
547 }
548 }
549
550 return nil
551}