forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7
8 "github.com/bluesky-social/jetstream/pkg/models"
9 "tangled.sh/tangled.sh/core/api/tangled"
10)
11
12type Ingester func(ctx context.Context, e *models.Event) error
13
14func (s *Spindle) ingest() Ingester {
15 return func(ctx context.Context, e *models.Event) error {
16 var err error
17 defer func() {
18 eventTime := e.TimeUS
19 lastTimeUs := eventTime + 1
20 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil {
21 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
22 }
23 }()
24
25 if e.Kind != models.EventKindCommit {
26 return nil
27 }
28
29 switch e.Commit.Collection {
30 case tangled.SpindleMemberNSID:
31 s.ingestMember(ctx, e)
32 }
33
34 return err
35 }
36}
37
38func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
39 did := e.Did
40 var err error
41
42 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
43
44 switch e.Commit.Operation {
45 case models.CommitOperationCreate, models.CommitOperationUpdate:
46 raw := e.Commit.Record
47 record := tangled.SpindleMember{}
48 err = json.Unmarshal(raw, &record)
49 if err != nil {
50 l.Error("invalid record", "error", err)
51 return err
52 }
53
54 domain := s.cfg.Server.Hostname
55 if s.cfg.Server.Dev {
56 domain = s.cfg.Server.ListenAddr
57 }
58 recordInstance := *record.Instance
59
60 if recordInstance != domain {
61 l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
62 return fmt.Errorf("domain mismatch: %s != %s", *record.Instance, domain)
63 }
64
65 ok, err := s.e.E.Enforce(did, rbacDomain, rbacDomain, "server:invite")
66 if err != nil || !ok {
67 l.Error("failed to add member", "did", did)
68 return fmt.Errorf("failed to enforce permissions: %w", err)
69 }
70
71 if err := s.e.AddMember(rbacDomain, record.Subject); err != nil {
72 l.Error("failed to add member", "error", err)
73 return fmt.Errorf("failed to add member: %w", err)
74 }
75 l.Info("added member from firehose", "member", record.Subject)
76
77 if err := s.db.AddDid(did); err != nil {
78 l.Error("failed to add did", "error", err)
79 return fmt.Errorf("failed to add did: %w", err)
80 }
81 s.jc.AddDid(did)
82
83 return nil
84
85 }
86 return nil
87}