1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "slices"
12 "strings"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/bluesky-social/indigo/xrpc"
17 "github.com/bluesky-social/jetstream/pkg/models"
18 securejoin "github.com/cyphar/filepath-securejoin"
19 "tangled.sh/tangled.sh/core/api/tangled"
20 "tangled.sh/tangled.sh/core/idresolver"
21 "tangled.sh/tangled.sh/core/knotserver/db"
22 "tangled.sh/tangled.sh/core/knotserver/git"
23 "tangled.sh/tangled.sh/core/log"
24 "tangled.sh/tangled.sh/core/rbac"
25 "tangled.sh/tangled.sh/core/workflow"
26)
27
28func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
29 l := log.FromContext(ctx)
30 pk := db.PublicKey{
31 Did: did,
32 PublicKey: record,
33 }
34 if err := h.db.AddPublicKey(pk); err != nil {
35 l.Error("failed to add public key", "error", err)
36 return fmt.Errorf("failed to add public key: %w", err)
37 }
38 l.Info("added public key from firehose", "did", did)
39 return nil
40}
41
42func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
43 l := log.FromContext(ctx)
44
45 if record.Domain != h.c.Server.Hostname {
46 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
47 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
48 }
49
50 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
51 if err != nil || !ok {
52 l.Error("failed to add member", "did", did)
53 return fmt.Errorf("failed to enforce permissions: %w", err)
54 }
55
56 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
57 l.Error("failed to add member", "error", err)
58 return fmt.Errorf("failed to add member: %w", err)
59 }
60 l.Info("added member from firehose", "member", record.Subject)
61
62 if err := h.db.AddDid(did); err != nil {
63 l.Error("failed to add did", "error", err)
64 return fmt.Errorf("failed to add did: %w", err)
65 }
66 h.jc.AddDid(did)
67
68 if err := h.fetchAndAddKeys(ctx, did); err != nil {
69 return fmt.Errorf("failed to fetch and add keys: %w", err)
70 }
71
72 return nil
73}
74
75func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error {
76 l := log.FromContext(ctx)
77 l = l.With("handler", "processPull")
78 l = l.With("did", did)
79 l = l.With("target_repo", record.TargetRepo)
80 l = l.With("target_branch", record.TargetBranch)
81
82 if record.Source == nil {
83 reason := "not a branch-based pull request"
84 l.Info("ignoring pull record", "reason", reason)
85 return fmt.Errorf("ignoring pull record: %s", reason)
86 }
87
88 if record.Source.Repo != nil {
89 reason := "fork based pull"
90 l.Info("ignoring pull record", "reason", reason)
91 return fmt.Errorf("ignoring pull record: %s", reason)
92 }
93
94 allDids, err := h.db.GetAllDids()
95 if err != nil {
96 return err
97 }
98
99 // presently: we only process PRs from collaborators for pipelines
100 if !slices.Contains(allDids, did) {
101 reason := "not a known did"
102 l.Info("rejecting pull record", "reason", reason)
103 return fmt.Errorf("rejected pull record: %s, %s", reason, did)
104 }
105
106 repoAt, err := syntax.ParseATURI(record.TargetRepo)
107 if err != nil {
108 return err
109 }
110
111 // resolve this aturi to extract the repo record
112 resolver := idresolver.DefaultResolver()
113 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
114 if err != nil || ident.Handle.IsInvalidHandle() {
115 return fmt.Errorf("failed to resolve handle: %w", err)
116 }
117
118 xrpcc := xrpc.Client{
119 Host: ident.PDSEndpoint(),
120 }
121
122 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
123 if err != nil {
124 return err
125 }
126
127 repo := resp.Value.Val.(*tangled.Repo)
128
129 if repo.Knot != h.c.Server.Hostname {
130 reason := "not this knot"
131 l.Info("rejecting pull record", "reason", reason)
132 return fmt.Errorf("rejected pull record: %s", reason)
133 }
134
135 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name)
136 if err != nil {
137 return err
138 }
139
140 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
141 if err != nil {
142 return err
143 }
144
145 gr, err := git.Open(repoPath, record.Source.Branch)
146 if err != nil {
147 return err
148 }
149
150 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
151 if err != nil {
152 return err
153 }
154
155 var pipeline workflow.Pipeline
156 for _, e := range workflowDir {
157 if !e.IsFile {
158 continue
159 }
160
161 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
162 contents, err := gr.RawContent(fpath)
163 if err != nil {
164 continue
165 }
166
167 wf, err := workflow.FromFile(e.Name, contents)
168 if err != nil {
169 // TODO: log here, respond to client that is pushing
170 h.l.Error("failed to parse workflow", "err", err, "path", fpath)
171 continue
172 }
173
174 pipeline = append(pipeline, wf)
175 }
176
177 trigger := tangled.Pipeline_PullRequestTriggerData{
178 Action: "create",
179 SourceBranch: record.Source.Branch,
180 SourceSha: record.Source.Sha,
181 TargetBranch: record.TargetBranch,
182 }
183
184 compiler := workflow.Compiler{
185 Trigger: tangled.Pipeline_TriggerMetadata{
186 Kind: string(workflow.TriggerKindPullRequest),
187 PullRequest: &trigger,
188 Repo: &tangled.Pipeline_TriggerRepo{
189 Did: repo.Owner,
190 Knot: repo.Knot,
191 Repo: repo.Name,
192 },
193 },
194 }
195
196 cp := compiler.Compile(pipeline)
197 eventJson, err := json.Marshal(cp)
198 if err != nil {
199 return err
200 }
201
202 // do not run empty pipelines
203 if cp.Workflows == nil {
204 return nil
205 }
206
207 event := db.Event{
208 Rkey: TID(),
209 Nsid: tangled.PipelineNSID,
210 EventJson: string(eventJson),
211 }
212
213 return h.db.InsertEvent(event, h.n)
214}
215
216// duplicated from add collaborator
217func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error {
218 repoAt, err := syntax.ParseATURI(record.Repo)
219 if err != nil {
220 return err
221 }
222
223 resolver := idresolver.DefaultResolver()
224
225 subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
226 if err != nil || subjectId.Handle.IsInvalidHandle() {
227 return err
228 }
229
230 // TODO: fix this for good, we need to fetch the record here unfortunately
231 // resolve this aturi to extract the repo record
232 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
233 if err != nil || owner.Handle.IsInvalidHandle() {
234 return fmt.Errorf("failed to resolve handle: %w", err)
235 }
236
237 xrpcc := xrpc.Client{
238 Host: owner.PDSEndpoint(),
239 }
240
241 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
242 if err != nil {
243 return err
244 }
245
246 repo := resp.Value.Val.(*tangled.Repo)
247 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
248
249 // check perms for this user
250 if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
251 return fmt.Errorf("insufficient permissions: %w", err)
252 }
253
254 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
255 return err
256 }
257 h.jc.AddDid(subjectId.DID.String())
258
259 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
260 return err
261 }
262
263 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
264}
265
266func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
267 l := log.FromContext(ctx)
268
269 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
270 if err != nil {
271 l.Error("error building endpoint url", "did", did, "error", err.Error())
272 return fmt.Errorf("error building endpoint url: %w", err)
273 }
274
275 resp, err := http.Get(keysEndpoint)
276 if err != nil {
277 l.Error("error getting keys", "did", did, "error", err)
278 return fmt.Errorf("error getting keys: %w", err)
279 }
280 defer resp.Body.Close()
281
282 if resp.StatusCode == http.StatusNotFound {
283 l.Info("no keys found for did", "did", did)
284 return nil
285 }
286
287 plaintext, err := io.ReadAll(resp.Body)
288 if err != nil {
289 l.Error("error reading response body", "error", err)
290 return fmt.Errorf("error reading response body: %w", err)
291 }
292
293 for _, key := range strings.Split(string(plaintext), "\n") {
294 if key == "" {
295 continue
296 }
297 pk := db.PublicKey{
298 Did: did,
299 }
300 pk.Key = key
301 if err := h.db.AddPublicKey(pk); err != nil {
302 l.Error("failed to add public key", "error", err)
303 return fmt.Errorf("failed to add public key: %w", err)
304 }
305 }
306 return nil
307}
308
309func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
310 did := event.Did
311 if event.Kind != models.EventKindCommit {
312 return nil
313 }
314
315 var err error
316 defer func() {
317 eventTime := event.TimeUS
318 lastTimeUs := eventTime + 1
319 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
320 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
321 }
322 }()
323
324 raw := json.RawMessage(event.Commit.Record)
325
326 switch event.Commit.Collection {
327 case tangled.PublicKeyNSID:
328 var record tangled.PublicKey
329 if err := json.Unmarshal(raw, &record); err != nil {
330 return fmt.Errorf("failed to unmarshal record: %w", err)
331 }
332 if err := h.processPublicKey(ctx, did, record); err != nil {
333 return fmt.Errorf("failed to process public key: %w", err)
334 }
335
336 case tangled.KnotMemberNSID:
337 var record tangled.KnotMember
338 if err := json.Unmarshal(raw, &record); err != nil {
339 return fmt.Errorf("failed to unmarshal record: %w", err)
340 }
341 if err := h.processKnotMember(ctx, did, record); err != nil {
342 return fmt.Errorf("failed to process knot member: %w", err)
343 }
344
345 case tangled.RepoPullNSID:
346 var record tangled.RepoPull
347 if err := json.Unmarshal(raw, &record); err != nil {
348 return fmt.Errorf("failed to unmarshal record: %w", err)
349 }
350 if err := h.processPull(ctx, did, record); err != nil {
351 return fmt.Errorf("failed to process knot member: %w", err)
352 }
353
354 case tangled.RepoCollaboratorNSID:
355 var record tangled.RepoCollaborator
356 if err := json.Unmarshal(raw, &record); err != nil {
357 return fmt.Errorf("failed to unmarshal record: %w", err)
358 }
359 if err := h.processCollaborator(ctx, did, record); err != nil {
360 return fmt.Errorf("failed to process knot member: %w", err)
361 }
362
363 }
364
365 return err
366}