forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8
9 "tangled.sh/tangled.sh/core/api/tangled"
10 "tangled.sh/tangled.sh/core/eventconsumer"
11 "tangled.sh/tangled.sh/core/idresolver"
12 "tangled.sh/tangled.sh/core/rbac"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/identity"
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "github.com/bluesky-social/indigo/xrpc"
18 "github.com/bluesky-social/jetstream/pkg/models"
19 securejoin "github.com/cyphar/filepath-securejoin"
20)
21
22type Ingester func(ctx context.Context, e *models.Event) error
23
24func (s *Spindle) ingest() Ingester {
25 return func(ctx context.Context, e *models.Event) error {
26 var err error
27 defer func() {
28 eventTime := e.TimeUS
29 lastTimeUs := eventTime + 1
30 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil {
31 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
32 }
33 }()
34
35 if e.Kind != models.EventKindCommit {
36 return nil
37 }
38
39 switch e.Commit.Collection {
40 case tangled.SpindleMemberNSID:
41 s.ingestMember(ctx, e)
42 case tangled.RepoNSID:
43 s.ingestRepo(ctx, e)
44 case tangled.RepoCollaboratorNSID:
45 s.ingestCollaborator(ctx, e)
46 }
47
48 return err
49 }
50}
51
52func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
53 did := e.Did
54 var err error
55
56 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
57
58 switch e.Commit.Operation {
59 case models.CommitOperationCreate, models.CommitOperationUpdate:
60 raw := e.Commit.Record
61 record := tangled.SpindleMember{}
62 err = json.Unmarshal(raw, &record)
63 if err != nil {
64 l.Error("invalid record", "error", err)
65 return err
66 }
67
68 domain := s.cfg.Server.Hostname
69 if s.cfg.Server.Dev {
70 domain = s.cfg.Server.ListenAddr
71 }
72 recordInstance := record.Instance
73
74 if recordInstance != domain {
75 l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
76 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
77 }
78
79 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
80 if err != nil || !ok {
81 l.Error("failed to add member", "did", did, "error", err)
82 return fmt.Errorf("failed to enforce permissions: %w", err)
83 }
84
85 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil {
86 l.Error("failed to add member", "error", err)
87 return fmt.Errorf("failed to add member: %w", err)
88 }
89 l.Info("added member from firehose", "member", record.Subject)
90
91 if err := s.db.AddDid(record.Subject); err != nil {
92 l.Error("failed to add did", "error", err)
93 return fmt.Errorf("failed to add did: %w", err)
94 }
95 s.jc.AddDid(record.Subject)
96
97 return nil
98
99 }
100 return nil
101}
102
103func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error {
104 var err error
105 did := e.Did
106 resolver := idresolver.DefaultResolver()
107
108 l := s.l.With("component", "ingester", "record", tangled.RepoNSID)
109
110 l.Info("ingesting repo record")
111
112 switch e.Commit.Operation {
113 case models.CommitOperationCreate, models.CommitOperationUpdate:
114 raw := e.Commit.Record
115 record := tangled.Repo{}
116 err = json.Unmarshal(raw, &record)
117 if err != nil {
118 l.Error("invalid record", "error", err)
119 return err
120 }
121
122 domain := s.cfg.Server.Hostname
123
124 // no spindle configured for this repo
125 if record.Spindle == nil {
126 l.Info("no spindle configured", "did", record.Owner, "name", record.Name)
127 return nil
128 }
129
130 // this repo did not want this spindle
131 if *record.Spindle != domain {
132 l.Info("different spindle configured", "did", record.Owner, "name", record.Name, "spindle", *record.Spindle, "domain", domain)
133 return nil
134 }
135
136 // add this repo to the watch list
137 if err := s.db.AddRepo(record.Knot, record.Owner, record.Name); err != nil {
138 l.Error("failed to add repo", "error", err)
139 return fmt.Errorf("failed to add repo: %w", err)
140 }
141
142 didSlashRepo, err := securejoin.SecureJoin(record.Owner, record.Name)
143 if err != nil {
144 return err
145 }
146
147 // add repo to rbac
148 if err := s.e.AddRepo(record.Owner, rbac.ThisServer, didSlashRepo); err != nil {
149 l.Error("failed to add repo to enforcer", "error", err)
150 return fmt.Errorf("failed to add repo: %w", err)
151 }
152
153 // add collaborators to rbac
154 owner, err := resolver.ResolveIdent(ctx, did)
155 if err != nil || owner.Handle.IsInvalidHandle() {
156 return err
157 }
158 if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil {
159 return err
160 }
161
162 // add this knot to the event consumer
163 src := eventconsumer.NewKnotSource(record.Knot)
164 s.ks.AddSource(context.Background(), src)
165
166 return nil
167
168 }
169 return nil
170}
171
172func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error {
173 var err error
174
175 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did)
176
177 l.Info("ingesting collaborator record")
178
179 switch e.Commit.Operation {
180 case models.CommitOperationCreate, models.CommitOperationUpdate:
181 raw := e.Commit.Record
182 record := tangled.RepoCollaborator{}
183 err = json.Unmarshal(raw, &record)
184 if err != nil {
185 l.Error("invalid record", "error", err)
186 return err
187 }
188
189 resolver := idresolver.DefaultResolver()
190
191 subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
192 if err != nil || subjectId.Handle.IsInvalidHandle() {
193 return err
194 }
195
196 repoAt, err := syntax.ParseATURI(record.Repo)
197 if err != nil {
198 l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo)
199 return nil
200 }
201
202 // TODO: get rid of this entirely
203 // resolve this aturi to extract the repo record
204 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
205 if err != nil || owner.Handle.IsInvalidHandle() {
206 return fmt.Errorf("failed to resolve handle: %w", err)
207 }
208
209 xrpcc := xrpc.Client{
210 Host: owner.PDSEndpoint(),
211 }
212
213 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
214 if err != nil {
215 return err
216 }
217
218 repo := resp.Value.Val.(*tangled.Repo)
219 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
220
221 // check perms for this user
222 if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
223 return fmt.Errorf("insufficient permissions: %w", err)
224 }
225
226 // add collaborator to rbac
227 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
228 l.Error("failed to add repo to enforcer", "error", err)
229 return fmt.Errorf("failed to add repo: %w", err)
230 }
231
232 return nil
233 }
234 return nil
235}
236
237func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error {
238 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators")
239
240 l.Info("fetching and adding existing collaborators")
241
242 xrpcc := xrpc.Client{
243 Host: owner.PDSEndpoint(),
244 }
245
246 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false)
247 if err != nil {
248 return err
249 }
250
251 var errs error
252 for _, r := range resp.Records {
253 if r == nil {
254 continue
255 }
256 record := r.Value.Val.(*tangled.RepoCollaborator)
257
258 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
259 l.Error("failed to add repo to enforcer", "error", err)
260 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err))
261 }
262 }
263
264 return errs
265}