1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "slices"
12 "strings"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/bluesky-social/indigo/xrpc"
17 "github.com/bluesky-social/jetstream/pkg/models"
18 securejoin "github.com/cyphar/filepath-securejoin"
19 "tangled.sh/tangled.sh/core/api/tangled"
20 "tangled.sh/tangled.sh/core/idresolver"
21 "tangled.sh/tangled.sh/core/knotserver/db"
22 "tangled.sh/tangled.sh/core/knotserver/git"
23 "tangled.sh/tangled.sh/core/log"
24 "tangled.sh/tangled.sh/core/rbac"
25 "tangled.sh/tangled.sh/core/workflow"
26)
27
28func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
29 l := log.FromContext(ctx)
30
31 allDids, err := h.db.GetAllDids()
32 if err != nil {
33 return err
34 }
35
36 // only process public keys from known DIDs
37 if !slices.Contains(allDids, did) {
38 reason := "not a known did"
39 l.Debug("rejecting public key record", "reason", reason, "did", did)
40 return nil
41 }
42
43 pk := db.PublicKey{
44 Did: did,
45 PublicKey: record,
46 }
47 if err := h.db.AddPublicKey(pk); err != nil {
48 l.Error("failed to add public key", "error", err)
49 return fmt.Errorf("failed to add public key: %w", err)
50 }
51 l.Info("added public key from firehose", "did", did)
52 return nil
53}
54
55func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
56 l := log.FromContext(ctx)
57
58 if record.Domain != h.c.Server.Hostname {
59 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
60 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
61 }
62
63 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
64 if err != nil || !ok {
65 l.Error("failed to add member", "did", did)
66 return fmt.Errorf("failed to enforce permissions: %w", err)
67 }
68
69 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
70 l.Error("failed to add member", "error", err)
71 return fmt.Errorf("failed to add member: %w", err)
72 }
73 l.Info("added member from firehose", "member", record.Subject)
74
75 if err := h.db.AddDid(did); err != nil {
76 l.Error("failed to add did", "error", err)
77 return fmt.Errorf("failed to add did: %w", err)
78 }
79 h.jc.AddDid(did)
80
81 if err := h.fetchAndAddKeys(ctx, did); err != nil {
82 return fmt.Errorf("failed to fetch and add keys: %w", err)
83 }
84
85 return nil
86}
87
88func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error {
89 l := log.FromContext(ctx)
90 l = l.With("handler", "processPull")
91 l = l.With("did", did)
92 l = l.With("target_repo", record.TargetRepo)
93 l = l.With("target_branch", record.TargetBranch)
94
95 if record.Source == nil {
96 reason := "not a branch-based pull request"
97 l.Info("ignoring pull record", "reason", reason)
98 return fmt.Errorf("ignoring pull record: %s", reason)
99 }
100
101 if record.Source.Repo != nil {
102 reason := "fork based pull"
103 l.Info("ignoring pull record", "reason", reason)
104 return fmt.Errorf("ignoring pull record: %s", reason)
105 }
106
107 allDids, err := h.db.GetAllDids()
108 if err != nil {
109 return err
110 }
111
112 // presently: we only process PRs from collaborators for pipelines
113 if !slices.Contains(allDids, did) {
114 reason := "not a known did"
115 l.Debug("rejecting pull record", "reason", reason)
116 return nil
117 }
118
119 repoAt, err := syntax.ParseATURI(record.TargetRepo)
120 if err != nil {
121 return err
122 }
123
124 // resolve this aturi to extract the repo record
125 resolver := idresolver.DefaultResolver()
126 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
127 if err != nil || ident.Handle.IsInvalidHandle() {
128 return fmt.Errorf("failed to resolve handle: %w", err)
129 }
130
131 xrpcc := xrpc.Client{
132 Host: ident.PDSEndpoint(),
133 }
134
135 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
136 if err != nil {
137 return err
138 }
139
140 repo := resp.Value.Val.(*tangled.Repo)
141
142 if repo.Knot != h.c.Server.Hostname {
143 reason := "not this knot"
144 l.Debug("rejecting pull record", "reason", reason)
145 return nil
146 }
147
148 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name)
149 if err != nil {
150 return err
151 }
152
153 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
154 if err != nil {
155 return err
156 }
157
158 gr, err := git.Open(repoPath, record.Source.Branch)
159 if err != nil {
160 return err
161 }
162
163 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
164 if err != nil {
165 return err
166 }
167
168 var pipeline workflow.Pipeline
169 for _, e := range workflowDir {
170 if !e.IsFile {
171 continue
172 }
173
174 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
175 contents, err := gr.RawContent(fpath)
176 if err != nil {
177 continue
178 }
179
180 wf, err := workflow.FromFile(e.Name, contents)
181 if err != nil {
182 // TODO: log here, respond to client that is pushing
183 h.l.Error("failed to parse workflow", "err", err, "path", fpath)
184 continue
185 }
186
187 pipeline = append(pipeline, wf)
188 }
189
190 trigger := tangled.Pipeline_PullRequestTriggerData{
191 Action: "create",
192 SourceBranch: record.Source.Branch,
193 SourceSha: record.Source.Sha,
194 TargetBranch: record.TargetBranch,
195 }
196
197 compiler := workflow.Compiler{
198 Trigger: tangled.Pipeline_TriggerMetadata{
199 Kind: string(workflow.TriggerKindPullRequest),
200 PullRequest: &trigger,
201 Repo: &tangled.Pipeline_TriggerRepo{
202 Did: repo.Owner,
203 Knot: repo.Knot,
204 Repo: repo.Name,
205 },
206 },
207 }
208
209 cp := compiler.Compile(pipeline)
210 eventJson, err := json.Marshal(cp)
211 if err != nil {
212 return err
213 }
214
215 // do not run empty pipelines
216 if cp.Workflows == nil {
217 return nil
218 }
219
220 event := db.Event{
221 Rkey: TID(),
222 Nsid: tangled.PipelineNSID,
223 EventJson: string(eventJson),
224 }
225
226 return h.db.InsertEvent(event, h.n)
227}
228
229// duplicated from add collaborator
230func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error {
231 repoAt, err := syntax.ParseATURI(record.Repo)
232 if err != nil {
233 return err
234 }
235
236 resolver := idresolver.DefaultResolver()
237
238 subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
239 if err != nil || subjectId.Handle.IsInvalidHandle() {
240 return err
241 }
242
243 // TODO: fix this for good, we need to fetch the record here unfortunately
244 // resolve this aturi to extract the repo record
245 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
246 if err != nil || owner.Handle.IsInvalidHandle() {
247 return fmt.Errorf("failed to resolve handle: %w", err)
248 }
249
250 xrpcc := xrpc.Client{
251 Host: owner.PDSEndpoint(),
252 }
253
254 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
255 if err != nil {
256 return err
257 }
258
259 repo := resp.Value.Val.(*tangled.Repo)
260 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
261
262 // check perms for this user
263 if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
264 return fmt.Errorf("insufficient permissions: %w", err)
265 }
266
267 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
268 return err
269 }
270 h.jc.AddDid(subjectId.DID.String())
271
272 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
273 return err
274 }
275
276 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
277}
278
279func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
280 l := log.FromContext(ctx)
281
282 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
283 if err != nil {
284 l.Error("error building endpoint url", "did", did, "error", err.Error())
285 return fmt.Errorf("error building endpoint url: %w", err)
286 }
287
288 resp, err := http.Get(keysEndpoint)
289 if err != nil {
290 l.Error("error getting keys", "did", did, "error", err)
291 return fmt.Errorf("error getting keys: %w", err)
292 }
293 defer resp.Body.Close()
294
295 if resp.StatusCode == http.StatusNotFound {
296 l.Info("no keys found for did", "did", did)
297 return nil
298 }
299
300 plaintext, err := io.ReadAll(resp.Body)
301 if err != nil {
302 l.Error("error reading response body", "error", err)
303 return fmt.Errorf("error reading response body: %w", err)
304 }
305
306 for _, key := range strings.Split(string(plaintext), "\n") {
307 if key == "" {
308 continue
309 }
310 pk := db.PublicKey{
311 Did: did,
312 }
313 pk.Key = key
314 if err := h.db.AddPublicKey(pk); err != nil {
315 l.Error("failed to add public key", "error", err)
316 return fmt.Errorf("failed to add public key: %w", err)
317 }
318 }
319 return nil
320}
321
322func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
323 did := event.Did
324 if event.Kind != models.EventKindCommit {
325 return nil
326 }
327
328 var err error
329 defer func() {
330 eventTime := event.TimeUS
331 lastTimeUs := eventTime + 1
332 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
333 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
334 }
335 }()
336
337 raw := json.RawMessage(event.Commit.Record)
338
339 switch event.Commit.Collection {
340 case tangled.PublicKeyNSID:
341 var record tangled.PublicKey
342 if err := json.Unmarshal(raw, &record); err != nil {
343 return fmt.Errorf("failed to unmarshal record: %w", err)
344 }
345 if err := h.processPublicKey(ctx, did, record); err != nil {
346 return fmt.Errorf("failed to process public key: %w", err)
347 }
348
349 case tangled.KnotMemberNSID:
350 var record tangled.KnotMember
351 if err := json.Unmarshal(raw, &record); err != nil {
352 return fmt.Errorf("failed to unmarshal record: %w", err)
353 }
354 if err := h.processKnotMember(ctx, did, record); err != nil {
355 return fmt.Errorf("failed to process knot member: %w", err)
356 }
357
358 case tangled.RepoPullNSID:
359 var record tangled.RepoPull
360 if err := json.Unmarshal(raw, &record); err != nil {
361 return fmt.Errorf("failed to unmarshal record: %w", err)
362 }
363 if err := h.processPull(ctx, did, record); err != nil {
364 return fmt.Errorf("failed to process knot member: %w", err)
365 }
366
367 case tangled.RepoCollaboratorNSID:
368 var record tangled.RepoCollaborator
369 if err := json.Unmarshal(raw, &record); err != nil {
370 return fmt.Errorf("failed to unmarshal record: %w", err)
371 }
372 if err := h.processCollaborator(ctx, did, record); err != nil {
373 return fmt.Errorf("failed to process knot member: %w", err)
374 }
375
376 }
377
378 return err
379}