forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "time"
9
10 "tangled.org/core/api/tangled"
11 "tangled.org/core/eventconsumer"
12 "tangled.org/core/rbac"
13 "tangled.org/core/spindle/db"
14
15 comatproto "github.com/bluesky-social/indigo/api/atproto"
16 "github.com/bluesky-social/indigo/atproto/identity"
17 "github.com/bluesky-social/indigo/atproto/syntax"
18 "github.com/bluesky-social/indigo/xrpc"
19 "github.com/bluesky-social/jetstream/pkg/models"
20 securejoin "github.com/cyphar/filepath-securejoin"
21)
22
23type Ingester func(ctx context.Context, e *models.Event) error
24
25func (s *Spindle) ingest() Ingester {
26 return func(ctx context.Context, e *models.Event) error {
27 var err error
28 defer func() {
29 eventTime := e.TimeUS
30 lastTimeUs := eventTime + 1
31 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil {
32 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
33 }
34 }()
35
36 if e.Kind != models.EventKindCommit {
37 return nil
38 }
39
40 switch e.Commit.Collection {
41 case tangled.SpindleMemberNSID:
42 err = s.ingestMember(ctx, e)
43 case tangled.RepoNSID:
44 err = s.ingestRepo(ctx, e)
45 case tangled.RepoCollaboratorNSID:
46 err = s.ingestCollaborator(ctx, e)
47 }
48
49 if err != nil {
50 s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err)
51 }
52
53 return nil
54 }
55}
56
57func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
58 var err error
59 did := e.Did
60 rkey := e.Commit.RKey
61
62 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
63
64 switch e.Commit.Operation {
65 case models.CommitOperationCreate, models.CommitOperationUpdate:
66 raw := e.Commit.Record
67 record := tangled.SpindleMember{}
68 err = json.Unmarshal(raw, &record)
69 if err != nil {
70 l.Error("invalid record", "error", err)
71 return err
72 }
73
74 domain := s.cfg.Server.Hostname
75 recordInstance := record.Instance
76
77 if recordInstance != domain {
78 l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
79 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
80 }
81
82 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
83 if err != nil || !ok {
84 l.Error("failed to add member", "did", did, "error", err)
85 return fmt.Errorf("failed to enforce permissions: %w", err)
86 }
87
88 if err := db.AddSpindleMember(s.db, db.SpindleMember{
89 Did: syntax.DID(did),
90 Rkey: rkey,
91 Instance: recordInstance,
92 Subject: syntax.DID(record.Subject),
93 Created: time.Now(),
94 }); err != nil {
95 l.Error("failed to add member", "error", err)
96 return fmt.Errorf("failed to add member: %w", err)
97 }
98
99 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil {
100 l.Error("failed to add member", "error", err)
101 return fmt.Errorf("failed to add member: %w", err)
102 }
103 l.Info("added member from firehose", "member", record.Subject)
104
105 if err := s.db.AddDid(record.Subject); err != nil {
106 l.Error("failed to add did", "error", err)
107 return fmt.Errorf("failed to add did: %w", err)
108 }
109 s.jc.AddDid(record.Subject)
110
111 return nil
112
113 case models.CommitOperationDelete:
114 record, err := db.GetSpindleMember(s.db, did, rkey)
115 if err != nil {
116 l.Error("failed to find member", "error", err)
117 return fmt.Errorf("failed to find member: %w", err)
118 }
119
120 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
121 l.Error("failed to remove member", "error", err)
122 return fmt.Errorf("failed to remove member: %w", err)
123 }
124
125 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil {
126 l.Error("failed to add member", "error", err)
127 return fmt.Errorf("failed to add member: %w", err)
128 }
129 l.Info("added member from firehose", "member", record.Subject)
130
131 if err := s.db.RemoveDid(record.Subject.String()); err != nil {
132 l.Error("failed to add did", "error", err)
133 return fmt.Errorf("failed to add did: %w", err)
134 }
135 s.jc.RemoveDid(record.Subject.String())
136
137 }
138 return nil
139}
140
141func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error {
142 var err error
143 did := e.Did
144
145 l := s.l.With("component", "ingester", "record", tangled.RepoNSID)
146
147 l.Info("ingesting repo record", "did", did)
148
149 switch e.Commit.Operation {
150 case models.CommitOperationCreate, models.CommitOperationUpdate:
151 raw := e.Commit.Record
152 record := tangled.Repo{}
153 err = json.Unmarshal(raw, &record)
154 if err != nil {
155 l.Error("invalid record", "error", err)
156 return err
157 }
158
159 domain := s.cfg.Server.Hostname
160
161 // no spindle configured for this repo
162 if record.Spindle == nil {
163 l.Info("no spindle configured", "name", record.Name)
164 return nil
165 }
166
167 // this repo did not want this spindle
168 if *record.Spindle != domain {
169 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain)
170 return nil
171 }
172
173 // add this repo to the watch list
174 if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil {
175 l.Error("failed to add repo", "error", err)
176 return fmt.Errorf("failed to add repo: %w", err)
177 }
178
179 didSlashRepo, err := securejoin.SecureJoin(did, record.Name)
180 if err != nil {
181 return err
182 }
183
184 // add repo to rbac
185 if err := s.e.AddRepo(did, rbac.ThisServer, didSlashRepo); err != nil {
186 l.Error("failed to add repo to enforcer", "error", err)
187 return fmt.Errorf("failed to add repo: %w", err)
188 }
189
190 // add collaborators to rbac
191 owner, err := s.res.ResolveIdent(ctx, did)
192 if err != nil || owner.Handle.IsInvalidHandle() {
193 return err
194 }
195 if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil {
196 return err
197 }
198
199 // add this knot to the event consumer
200 src := eventconsumer.NewKnotSource(record.Knot)
201 s.ks.AddSource(context.Background(), src)
202
203 return nil
204
205 }
206 return nil
207}
208
209func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error {
210 var err error
211
212 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did)
213
214 l.Info("ingesting collaborator record")
215
216 switch e.Commit.Operation {
217 case models.CommitOperationCreate, models.CommitOperationUpdate:
218 raw := e.Commit.Record
219 record := tangled.RepoCollaborator{}
220 err = json.Unmarshal(raw, &record)
221 if err != nil {
222 l.Error("invalid record", "error", err)
223 return err
224 }
225
226 subjectId, err := s.res.ResolveIdent(ctx, record.Subject)
227 if err != nil || subjectId.Handle.IsInvalidHandle() {
228 return err
229 }
230
231 repoAt, err := syntax.ParseATURI(record.Repo)
232 if err != nil {
233 l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo)
234 return nil
235 }
236
237 // TODO: get rid of this entirely
238 // resolve this aturi to extract the repo record
239 owner, err := s.res.ResolveIdent(ctx, repoAt.Authority().String())
240 if err != nil || owner.Handle.IsInvalidHandle() {
241 return fmt.Errorf("failed to resolve handle: %w", err)
242 }
243
244 xrpcc := xrpc.Client{
245 Host: owner.PDSEndpoint(),
246 }
247
248 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
249 if err != nil {
250 return err
251 }
252
253 repo := resp.Value.Val.(*tangled.Repo)
254 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
255
256 // check perms for this user
257 if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
258 return fmt.Errorf("insufficient permissions: %w", err)
259 }
260
261 // add collaborator to rbac
262 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
263 l.Error("failed to add repo to enforcer", "error", err)
264 return fmt.Errorf("failed to add repo: %w", err)
265 }
266
267 return nil
268 }
269 return nil
270}
271
272func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error {
273 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators")
274
275 l.Info("fetching and adding existing collaborators")
276
277 xrpcc := xrpc.Client{
278 Host: owner.PDSEndpoint(),
279 }
280
281 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false)
282 if err != nil {
283 return err
284 }
285
286 var errs error
287 for _, r := range resp.Records {
288 if r == nil {
289 continue
290 }
291 record := r.Value.Val.(*tangled.RepoCollaborator)
292
293 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
294 l.Error("failed to add repo to enforcer", "error", err)
295 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err))
296 }
297 }
298
299 return errs
300}