1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "slices"
12 "strings"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/bluesky-social/indigo/xrpc"
17 "github.com/bluesky-social/jetstream/pkg/models"
18 securejoin "github.com/cyphar/filepath-securejoin"
19 "tangled.sh/tangled.sh/core/api/tangled"
20 "tangled.sh/tangled.sh/core/idresolver"
21 "tangled.sh/tangled.sh/core/knotserver/db"
22 "tangled.sh/tangled.sh/core/knotserver/git"
23 "tangled.sh/tangled.sh/core/log"
24 "tangled.sh/tangled.sh/core/rbac"
25 "tangled.sh/tangled.sh/core/workflow"
26)
27
28func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
29 l := log.FromContext(ctx)
30 pk := db.PublicKey{
31 Did: did,
32 PublicKey: record,
33 }
34 if err := h.db.AddPublicKey(pk); err != nil {
35 l.Error("failed to add public key", "error", err)
36 return fmt.Errorf("failed to add public key: %w", err)
37 }
38 l.Info("added public key from firehose", "did", did)
39 return nil
40}
41
42func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
43 l := log.FromContext(ctx)
44
45 if record.Domain != h.c.Server.Hostname {
46 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
47 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
48 }
49
50 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
51 if err != nil || !ok {
52 l.Error("failed to add member", "did", did)
53 return fmt.Errorf("failed to enforce permissions: %w", err)
54 }
55
56 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
57 l.Error("failed to add member", "error", err)
58 return fmt.Errorf("failed to add member: %w", err)
59 }
60 l.Info("added member from firehose", "member", record.Subject)
61
62 if err := h.db.AddDid(did); err != nil {
63 l.Error("failed to add did", "error", err)
64 return fmt.Errorf("failed to add did: %w", err)
65 }
66 h.jc.AddDid(did)
67
68 if err := h.fetchAndAddKeys(ctx, did); err != nil {
69 return fmt.Errorf("failed to fetch and add keys: %w", err)
70 }
71
72 return nil
73}
74
75func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error {
76 l := log.FromContext(ctx)
77 l = l.With("handler", "processPull")
78 l = l.With("did", did)
79 l = l.With("target_repo", record.TargetRepo)
80 l = l.With("target_branch", record.TargetBranch)
81
82 if record.Source == nil {
83 reason := "not a branch-based pull request"
84 l.Info("ignoring pull record", "reason", reason)
85 return fmt.Errorf("ignoring pull record: %s", reason)
86 }
87
88 if record.Source.Repo != nil {
89 reason := "fork based pull"
90 l.Info("ignoring pull record", "reason", reason)
91 return fmt.Errorf("ignoring pull record: %s", reason)
92 }
93
94 allDids, err := h.db.GetAllDids()
95 if err != nil {
96 return err
97 }
98
99 // presently: we only process PRs from collaborators for pipelines
100 if !slices.Contains(allDids, did) {
101 reason := "not a known did"
102 l.Info("rejecting pull record", "reason", reason)
103 return fmt.Errorf("rejected pull record: %s, %s", reason, did)
104 }
105
106 repoAt, err := syntax.ParseATURI(record.TargetRepo)
107 if err != nil {
108 return err
109 }
110
111 // resolve this aturi to extract the repo record
112 resolver := idresolver.DefaultResolver()
113 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
114 if err != nil || ident.Handle.IsInvalidHandle() {
115 return fmt.Errorf("failed to resolve handle: %w", err)
116 }
117
118 xrpcc := xrpc.Client{
119 Host: ident.PDSEndpoint(),
120 }
121
122 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
123 if err != nil {
124 return err
125 }
126
127 repo := resp.Value.Val.(*tangled.Repo)
128
129 if repo.Knot != h.c.Server.Hostname {
130 reason := "not this knot"
131 l.Info("rejecting pull record", "reason", reason)
132 return fmt.Errorf("rejected pull record: %s", reason)
133 }
134
135 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name)
136 if err != nil {
137 return err
138 }
139
140 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
141 if err != nil {
142 return err
143 }
144
145 gr, err := git.Open(repoPath, record.Source.Branch)
146 if err != nil {
147 return err
148 }
149
150 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
151 if err != nil {
152 return err
153 }
154
155 var pipeline workflow.RawPipeline
156 for _, e := range workflowDir {
157 if !e.IsFile {
158 continue
159 }
160
161 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
162 contents, err := gr.RawContent(fpath)
163 if err != nil {
164 continue
165 }
166
167 pipeline = append(pipeline, workflow.RawWorkflow{
168 Name: e.Name,
169 Contents: contents,
170 })
171 }
172
173 trigger := tangled.Pipeline_PullRequestTriggerData{
174 Action: "create",
175 SourceBranch: record.Source.Branch,
176 SourceSha: record.Source.Sha,
177 TargetBranch: record.TargetBranch,
178 }
179
180 compiler := workflow.Compiler{
181 Trigger: tangled.Pipeline_TriggerMetadata{
182 Kind: string(workflow.TriggerKindPullRequest),
183 PullRequest: &trigger,
184 Repo: &tangled.Pipeline_TriggerRepo{
185 Did: repo.Owner,
186 Knot: repo.Knot,
187 Repo: repo.Name,
188 },
189 },
190 }
191
192 cp := compiler.Compile(compiler.Parse(pipeline))
193 eventJson, err := json.Marshal(cp)
194 if err != nil {
195 return err
196 }
197
198 // do not run empty pipelines
199 if cp.Workflows == nil {
200 return nil
201 }
202
203 event := db.Event{
204 Rkey: TID(),
205 Nsid: tangled.PipelineNSID,
206 EventJson: string(eventJson),
207 }
208
209 return h.db.InsertEvent(event, h.n)
210}
211
212// duplicated from add collaborator
213func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error {
214 repoAt, err := syntax.ParseATURI(record.Repo)
215 if err != nil {
216 return err
217 }
218
219 resolver := idresolver.DefaultResolver()
220
221 subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
222 if err != nil || subjectId.Handle.IsInvalidHandle() {
223 return err
224 }
225
226 // TODO: fix this for good, we need to fetch the record here unfortunately
227 // resolve this aturi to extract the repo record
228 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
229 if err != nil || owner.Handle.IsInvalidHandle() {
230 return fmt.Errorf("failed to resolve handle: %w", err)
231 }
232
233 xrpcc := xrpc.Client{
234 Host: owner.PDSEndpoint(),
235 }
236
237 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
238 if err != nil {
239 return err
240 }
241
242 repo := resp.Value.Val.(*tangled.Repo)
243 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
244
245 // check perms for this user
246 if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
247 return fmt.Errorf("insufficient permissions: %w", err)
248 }
249
250 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
251 return err
252 }
253 h.jc.AddDid(subjectId.DID.String())
254
255 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
256 return err
257 }
258
259 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
260}
261
262func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
263 l := log.FromContext(ctx)
264
265 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
266 if err != nil {
267 l.Error("error building endpoint url", "did", did, "error", err.Error())
268 return fmt.Errorf("error building endpoint url: %w", err)
269 }
270
271 resp, err := http.Get(keysEndpoint)
272 if err != nil {
273 l.Error("error getting keys", "did", did, "error", err)
274 return fmt.Errorf("error getting keys: %w", err)
275 }
276 defer resp.Body.Close()
277
278 if resp.StatusCode == http.StatusNotFound {
279 l.Info("no keys found for did", "did", did)
280 return nil
281 }
282
283 plaintext, err := io.ReadAll(resp.Body)
284 if err != nil {
285 l.Error("error reading response body", "error", err)
286 return fmt.Errorf("error reading response body: %w", err)
287 }
288
289 for _, key := range strings.Split(string(plaintext), "\n") {
290 if key == "" {
291 continue
292 }
293 pk := db.PublicKey{
294 Did: did,
295 }
296 pk.Key = key
297 if err := h.db.AddPublicKey(pk); err != nil {
298 l.Error("failed to add public key", "error", err)
299 return fmt.Errorf("failed to add public key: %w", err)
300 }
301 }
302 return nil
303}
304
305func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
306 did := event.Did
307 if event.Kind != models.EventKindCommit {
308 return nil
309 }
310
311 var err error
312 defer func() {
313 eventTime := event.TimeUS
314 lastTimeUs := eventTime + 1
315 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
316 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
317 }
318 }()
319
320 raw := json.RawMessage(event.Commit.Record)
321
322 switch event.Commit.Collection {
323 case tangled.PublicKeyNSID:
324 var record tangled.PublicKey
325 if err := json.Unmarshal(raw, &record); err != nil {
326 return fmt.Errorf("failed to unmarshal record: %w", err)
327 }
328 if err := h.processPublicKey(ctx, did, record); err != nil {
329 return fmt.Errorf("failed to process public key: %w", err)
330 }
331
332 case tangled.KnotMemberNSID:
333 var record tangled.KnotMember
334 if err := json.Unmarshal(raw, &record); err != nil {
335 return fmt.Errorf("failed to unmarshal record: %w", err)
336 }
337 if err := h.processKnotMember(ctx, did, record); err != nil {
338 return fmt.Errorf("failed to process knot member: %w", err)
339 }
340
341 case tangled.RepoPullNSID:
342 var record tangled.RepoPull
343 if err := json.Unmarshal(raw, &record); err != nil {
344 return fmt.Errorf("failed to unmarshal record: %w", err)
345 }
346 if err := h.processPull(ctx, did, record); err != nil {
347 return fmt.Errorf("failed to process knot member: %w", err)
348 }
349
350 case tangled.RepoCollaboratorNSID:
351 var record tangled.RepoCollaborator
352 if err := json.Unmarshal(raw, &record); err != nil {
353 return fmt.Errorf("failed to unmarshal record: %w", err)
354 }
355 if err := h.processCollaborator(ctx, did, record); err != nil {
356 return fmt.Errorf("failed to process knot member: %w", err)
357 }
358
359 }
360
361 return err
362}