1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "slices"
12 "strings"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/bluesky-social/indigo/xrpc"
17 "github.com/bluesky-social/jetstream/pkg/models"
18 securejoin "github.com/cyphar/filepath-securejoin"
19 "tangled.sh/tangled.sh/core/api/tangled"
20 "tangled.sh/tangled.sh/core/idresolver"
21 "tangled.sh/tangled.sh/core/knotserver/db"
22 "tangled.sh/tangled.sh/core/knotserver/git"
23 "tangled.sh/tangled.sh/core/log"
24 "tangled.sh/tangled.sh/core/rbac"
25 "tangled.sh/tangled.sh/core/workflow"
26)
27
28func (h *Handle) processPublicKey(ctx context.Context, did string, operation string, record tangled.PublicKey) error {
29 l := log.FromContext(ctx)
30
31 switch operation {
32 case models.CommitOperationCreate, models.CommitOperationUpdate:
33 pk := db.PublicKey{
34 Did: did,
35 PublicKey: record,
36 }
37 if err := h.db.AddPublicKey(pk); err != nil {
38 l.Error("failed to add public key", "error", err)
39 return fmt.Errorf("failed to add public key: %w", err)
40 }
41 l.Info("added public key from firehose", "did", did)
42
43 case models.CommitOperationDelete:
44 if err := h.db.RemovePublicKey(did); err != nil {
45 l.Error("failed to remove public key", "error", err)
46 return fmt.Errorf("failed to remove public key: %w", err)
47 }
48 l.Info("removed public key (delete triggered from firehose)", "did", did)
49 }
50
51 return nil
52}
53
54func (h *Handle) processKnotMember(ctx context.Context, did string, operation string, record tangled.KnotMember) error {
55 l := log.FromContext(ctx)
56
57 switch operation {
58 case models.CommitOperationCreate, models.CommitOperationUpdate:
59 if record.Domain != h.c.Server.Hostname {
60 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
61 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
62 }
63
64 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
65 if err != nil || !ok {
66 l.Error("failed to add member", "did", did)
67 return fmt.Errorf("failed to enforce permissions: %w", err)
68 }
69
70 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
71 l.Error("failed to add member", "error", err)
72 return fmt.Errorf("failed to add member: %w", err)
73 }
74 l.Info("added member from firehose", "member", record.Subject)
75
76 if err := h.db.AddDid(did); err != nil {
77 l.Error("failed to add did", "error", err)
78 return fmt.Errorf("failed to add did: %w", err)
79 }
80 h.jc.AddDid(did)
81
82 if err := h.fetchAndAddKeys(ctx, did); err != nil {
83 return fmt.Errorf("failed to fetch and add keys: %w", err)
84 }
85
86 case models.CommitOperationDelete:
87 if err := h.e.RemoveKnotMember(rbac.ThisServer, record.Subject); err != nil {
88 l.Error("failed to remove member", "error", err)
89 return fmt.Errorf("failed to remove member: %w", err)
90 }
91 l.Info("removed member (delete triggered from firehose)", "member", record.Subject)
92
93 if err := h.db.RemoveDid(record.Subject); err != nil {
94 l.Error("failed to remove did", "error", err)
95 return fmt.Errorf("failed to remove did: %w", err)
96 }
97 h.jc.RemoveDid(record.Subject)
98 }
99
100 return nil
101}
102
103func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error {
104 l := log.FromContext(ctx)
105 l = l.With("handler", "processPull")
106 l = l.With("did", did)
107 l = l.With("target_repo", record.TargetRepo)
108 l = l.With("target_branch", record.TargetBranch)
109
110 if record.Source == nil {
111 reason := "not a branch-based pull request"
112 l.Info("ignoring pull record", "reason", reason)
113 return fmt.Errorf("ignoring pull record: %s", reason)
114 }
115
116 if record.Source.Repo != nil {
117 reason := "fork based pull"
118 l.Info("ignoring pull record", "reason", reason)
119 return fmt.Errorf("ignoring pull record: %s", reason)
120 }
121
122 allDids, err := h.db.GetAllDids()
123 if err != nil {
124 return err
125 }
126
127 // presently: we only process PRs from collaborators for pipelines
128 if !slices.Contains(allDids, did) {
129 reason := "not a known did"
130 l.Info("rejecting pull record", "reason", reason)
131 return fmt.Errorf("rejected pull record: %s, %s", reason, did)
132 }
133
134 repoAt, err := syntax.ParseATURI(record.TargetRepo)
135 if err != nil {
136 return err
137 }
138
139 // resolve this aturi to extract the repo record
140 resolver := idresolver.DefaultResolver()
141 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
142 if err != nil || ident.Handle.IsInvalidHandle() {
143 return fmt.Errorf("failed to resolve handle: %w", err)
144 }
145
146 xrpcc := xrpc.Client{
147 Host: ident.PDSEndpoint(),
148 }
149
150 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
151 if err != nil {
152 return err
153 }
154
155 repo := resp.Value.Val.(*tangled.Repo)
156
157 if repo.Knot != h.c.Server.Hostname {
158 reason := "not this knot"
159 l.Info("rejecting pull record", "reason", reason)
160 return fmt.Errorf("rejected pull record: %s", reason)
161 }
162
163 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name)
164 if err != nil {
165 return err
166 }
167
168 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
169 if err != nil {
170 return err
171 }
172
173 gr, err := git.Open(repoPath, record.Source.Branch)
174 if err != nil {
175 return err
176 }
177
178 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
179 if err != nil {
180 return err
181 }
182
183 var pipeline workflow.Pipeline
184 for _, e := range workflowDir {
185 if !e.IsFile {
186 continue
187 }
188
189 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
190 contents, err := gr.RawContent(fpath)
191 if err != nil {
192 continue
193 }
194
195 wf, err := workflow.FromFile(e.Name, contents)
196 if err != nil {
197 // TODO: log here, respond to client that is pushing
198 h.l.Error("failed to parse workflow", "err", err, "path", fpath)
199 continue
200 }
201
202 pipeline = append(pipeline, wf)
203 }
204
205 trigger := tangled.Pipeline_PullRequestTriggerData{
206 Action: "create",
207 SourceBranch: record.Source.Branch,
208 SourceSha: record.Source.Sha,
209 TargetBranch: record.TargetBranch,
210 }
211
212 compiler := workflow.Compiler{
213 Trigger: tangled.Pipeline_TriggerMetadata{
214 Kind: string(workflow.TriggerKindPullRequest),
215 PullRequest: &trigger,
216 Repo: &tangled.Pipeline_TriggerRepo{
217 Did: repo.Owner,
218 Knot: repo.Knot,
219 Repo: repo.Name,
220 },
221 },
222 }
223
224 cp := compiler.Compile(pipeline)
225 eventJson, err := json.Marshal(cp)
226 if err != nil {
227 return err
228 }
229
230 // do not run empty pipelines
231 if cp.Workflows == nil {
232 return nil
233 }
234
235 event := db.Event{
236 Rkey: TID(),
237 Nsid: tangled.PipelineNSID,
238 EventJson: string(eventJson),
239 }
240
241 return h.db.InsertEvent(event, h.n)
242}
243
244// duplicated from add collaborator
245func (h *Handle) processCollaborator(ctx context.Context, did string, operation string, record tangled.RepoCollaborator) error {
246 l := log.FromContext(ctx)
247 l = l.With("handler", "processCollaborator", "did", did)
248
249 switch operation {
250 case models.CommitOperationCreate, models.CommitOperationUpdate:
251 repoAt, err := syntax.ParseATURI(record.Repo)
252 if err != nil {
253 return err
254 }
255
256 resolver := h.resolver
257
258 subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
259 if err != nil || subjectId.Handle.IsInvalidHandle() {
260 return err
261 }
262
263 // TODO: fix this for good, we need to fetch the record here unfortunately
264 // resolve this aturi to extract the repo record
265 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
266 if err != nil || owner.Handle.IsInvalidHandle() {
267 return fmt.Errorf("failed to resolve handle: %w", err)
268 }
269
270 xrpcc := xrpc.Client{
271 Host: owner.PDSEndpoint(),
272 }
273
274 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
275 if err != nil {
276 return err
277 }
278
279 repo := resp.Value.Val.(*tangled.Repo)
280 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
281
282 // check perms for this user
283 if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
284 return fmt.Errorf("insufficient permissions: %w", err)
285 }
286
287 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
288 return err
289 }
290 h.jc.AddDid(subjectId.DID.String())
291
292 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
293 return err
294 }
295
296 l.Info("added collaborator from firehose", "subject", record.Subject, "repo", record.Repo)
297
298 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
299
300 case models.CommitOperationDelete:
301 repoAt, err := syntax.ParseATURI(record.Repo)
302 if err != nil {
303 return err
304 }
305
306 resolver := h.resolver
307
308 subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
309 if err != nil || subjectId.Handle.IsInvalidHandle() {
310 return err
311 }
312
313 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
314 if err != nil || owner.Handle.IsInvalidHandle() {
315 return fmt.Errorf("failed to resolve handle: %w", err)
316 }
317
318 xrpcc := xrpc.Client{
319 Host: owner.PDSEndpoint(),
320 }
321
322 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
323 if err != nil {
324 return err
325 }
326
327 repo := resp.Value.Val.(*tangled.Repo)
328 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
329
330 if err := h.e.RemoveCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
331 l.Error("failed to remove collaborator", "error", err)
332 return fmt.Errorf("failed to remove collaborator: %w", err)
333 }
334
335 l.Info("removed collaborator from firehose", "subject", record.Subject, "repo", record.Repo)
336 }
337
338 return nil
339}
340
341func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
342 l := log.FromContext(ctx)
343
344 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
345 if err != nil {
346 l.Error("error building endpoint url", "did", did, "error", err.Error())
347 return fmt.Errorf("error building endpoint url: %w", err)
348 }
349
350 resp, err := http.Get(keysEndpoint)
351 if err != nil {
352 l.Error("error getting keys", "did", did, "error", err)
353 return fmt.Errorf("error getting keys: %w", err)
354 }
355 defer resp.Body.Close()
356
357 if resp.StatusCode == http.StatusNotFound {
358 l.Info("no keys found for did", "did", did)
359 return nil
360 }
361
362 plaintext, err := io.ReadAll(resp.Body)
363 if err != nil {
364 l.Error("error reading response body", "error", err)
365 return fmt.Errorf("error reading response body: %w", err)
366 }
367
368 for _, key := range strings.Split(string(plaintext), "\n") {
369 if key == "" {
370 continue
371 }
372 pk := db.PublicKey{
373 Did: did,
374 }
375 pk.Key = key
376 if err := h.db.AddPublicKey(pk); err != nil {
377 l.Error("failed to add public key", "error", err)
378 return fmt.Errorf("failed to add public key: %w", err)
379 }
380 }
381 return nil
382}
383
384func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
385 did := event.Did
386 if event.Kind != models.EventKindCommit {
387 return nil
388 }
389
390 var err error
391 defer func() {
392 eventTime := event.TimeUS
393 lastTimeUs := eventTime + 1
394 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
395 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
396 }
397 }()
398
399 raw := json.RawMessage(event.Commit.Record)
400
401 switch event.Commit.Collection {
402 case tangled.PublicKeyNSID:
403 var record tangled.PublicKey
404 if err := json.Unmarshal(raw, &record); err != nil {
405 return fmt.Errorf("failed to unmarshal record: %w", err)
406 }
407 if err := h.processPublicKey(ctx, did, event.Commit.Operation, record); err != nil {
408 return fmt.Errorf("failed to process public key: %w", err)
409 }
410
411 case tangled.KnotMemberNSID:
412 var record tangled.KnotMember
413 if err := json.Unmarshal(raw, &record); err != nil {
414 return fmt.Errorf("failed to unmarshal record: %w", err)
415 }
416 if err := h.processKnotMember(ctx, did, event.Commit.Operation, record); err != nil {
417 return fmt.Errorf("failed to process knot member: %w", err)
418 }
419
420 case tangled.RepoPullNSID:
421 var record tangled.RepoPull
422 if err := json.Unmarshal(raw, &record); err != nil {
423 return fmt.Errorf("failed to unmarshal record: %w", err)
424 }
425 if err := h.processPull(ctx, did, record); err != nil {
426 return fmt.Errorf("failed to process knot member: %w", err)
427 }
428
429 case tangled.RepoCollaboratorNSID:
430 var record tangled.RepoCollaborator
431 if err := json.Unmarshal(raw, &record); err != nil {
432 return fmt.Errorf("failed to unmarshal record: %w", err)
433 }
434 if err := h.processCollaborator(ctx, did, event.Commit.Operation, record); err != nil {
435 return fmt.Errorf("failed to process collaborator: %w", err)
436 }
437 }
438
439 return err
440}