1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "path/filepath"
8
9 "tangled.sh/tangled.sh/core/api/tangled"
10 "tangled.sh/tangled.sh/core/eventconsumer"
11 "tangled.sh/tangled.sh/core/rbac"
12
13 "github.com/bluesky-social/jetstream/pkg/models"
14)
15
16type Ingester func(ctx context.Context, e *models.Event) error
17
18func (s *Spindle) ingest() Ingester {
19 return func(ctx context.Context, e *models.Event) error {
20 var err error
21 defer func() {
22 eventTime := e.TimeUS
23 lastTimeUs := eventTime + 1
24 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil {
25 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
26 }
27 }()
28
29 if e.Kind != models.EventKindCommit {
30 return nil
31 }
32
33 switch e.Commit.Collection {
34 case tangled.SpindleMemberNSID:
35 s.ingestMember(ctx, e)
36 case tangled.RepoNSID:
37 s.ingestRepo(ctx, e)
38 }
39
40 return err
41 }
42}
43
44func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
45 did := e.Did
46 var err error
47
48 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
49
50 switch e.Commit.Operation {
51 case models.CommitOperationCreate, models.CommitOperationUpdate:
52 raw := e.Commit.Record
53 record := tangled.SpindleMember{}
54 err = json.Unmarshal(raw, &record)
55 if err != nil {
56 l.Error("invalid record", "error", err)
57 return err
58 }
59
60 domain := s.cfg.Server.Hostname
61 if s.cfg.Server.Dev {
62 domain = s.cfg.Server.ListenAddr
63 }
64 recordInstance := record.Instance
65
66 if recordInstance != domain {
67 l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
68 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
69 }
70
71 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
72 if err != nil || !ok {
73 l.Error("failed to add member", "did", did, "error", err)
74 return fmt.Errorf("failed to enforce permissions: %w", err)
75 }
76
77 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil {
78 l.Error("failed to add member", "error", err)
79 return fmt.Errorf("failed to add member: %w", err)
80 }
81 l.Info("added member from firehose", "member", record.Subject)
82
83 if err := s.db.AddDid(record.Subject); err != nil {
84 l.Error("failed to add did", "error", err)
85 return fmt.Errorf("failed to add did: %w", err)
86 }
87 s.jc.AddDid(record.Subject)
88
89 return nil
90
91 }
92 return nil
93}
94
95func (s *Spindle) ingestRepo(_ context.Context, e *models.Event) error {
96 var err error
97
98 l := s.l.With("component", "ingester", "record", tangled.RepoNSID)
99
100 l.Info("ingesting repo record")
101
102 switch e.Commit.Operation {
103 case models.CommitOperationCreate, models.CommitOperationUpdate:
104 raw := e.Commit.Record
105 record := tangled.Repo{}
106 err = json.Unmarshal(raw, &record)
107 if err != nil {
108 l.Error("invalid record", "error", err)
109 return err
110 }
111
112 domain := s.cfg.Server.Hostname
113
114 // no spindle configured for this repo
115 if record.Spindle == nil {
116 l.Info("no spindle configured", "did", record.Owner, "name", record.Name)
117 return nil
118 }
119
120 // this repo did not want this spindle
121 if *record.Spindle != domain {
122 l.Info("different spindle configured", "did", record.Owner, "name", record.Name, "spindle", *record.Spindle, "domain", domain)
123 return nil
124 }
125
126 // add this repo to the watch list
127 if err := s.db.AddRepo(record.Knot, record.Owner, record.Name); err != nil {
128 l.Error("failed to add repo", "error", err)
129 return fmt.Errorf("failed to add repo: %w", err)
130 }
131
132 // add repo to rbac
133 if err := s.e.AddRepo(record.Owner, rbac.ThisServer, filepath.Join(record.Owner, record.Name)); err != nil {
134 l.Error("failed to add repo to enforcer", "error", err)
135 return fmt.Errorf("failed to add repo: %w", err)
136 }
137
138 // add this knot to the event consumer
139 src := eventconsumer.NewKnotSource(record.Knot)
140 s.ks.AddSource(context.Background(), src)
141
142 return nil
143
144 }
145 return nil
146}