1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "slices"
12 "strings"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/bluesky-social/indigo/xrpc"
17 "github.com/bluesky-social/jetstream/pkg/models"
18 securejoin "github.com/cyphar/filepath-securejoin"
19 "tangled.sh/tangled.sh/core/api/tangled"
20 "tangled.sh/tangled.sh/core/idresolver"
21 "tangled.sh/tangled.sh/core/knotserver/db"
22 "tangled.sh/tangled.sh/core/knotserver/git"
23 "tangled.sh/tangled.sh/core/log"
24 "tangled.sh/tangled.sh/core/rbac"
25 "tangled.sh/tangled.sh/core/workflow"
26)
27
28func (h *Handle) processPublicKey(ctx context.Context, event *models.Event) error {
29 l := log.FromContext(ctx)
30 raw := json.RawMessage(event.Commit.Record)
31 did := event.Did
32
33 var record tangled.PublicKey
34 if err := json.Unmarshal(raw, &record); err != nil {
35 return fmt.Errorf("failed to unmarshal record: %w", err)
36 }
37
38 pk := db.PublicKey{
39 Did: did,
40 PublicKey: record,
41 }
42 if err := h.db.AddPublicKey(pk); err != nil {
43 l.Error("failed to add public key", "error", err)
44 return fmt.Errorf("failed to add public key: %w", err)
45 }
46 l.Info("added public key from firehose", "did", did)
47 return nil
48}
49
50func (h *Handle) processKnotMember(ctx context.Context, event *models.Event) error {
51 l := log.FromContext(ctx)
52 raw := json.RawMessage(event.Commit.Record)
53 did := event.Did
54
55 var record tangled.KnotMember
56 if err := json.Unmarshal(raw, &record); err != nil {
57 return fmt.Errorf("failed to unmarshal record: %w", err)
58 }
59
60 if record.Domain != h.c.Server.Hostname {
61 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
62 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
63 }
64
65 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
66 if err != nil || !ok {
67 l.Error("failed to add member", "did", did)
68 return fmt.Errorf("failed to enforce permissions: %w", err)
69 }
70
71 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
72 l.Error("failed to add member", "error", err)
73 return fmt.Errorf("failed to add member: %w", err)
74 }
75 l.Info("added member from firehose", "member", record.Subject)
76
77 if err := h.db.AddDid(did); err != nil {
78 l.Error("failed to add did", "error", err)
79 return fmt.Errorf("failed to add did: %w", err)
80 }
81 h.jc.AddDid(did)
82
83 if err := h.fetchAndAddKeys(ctx, did); err != nil {
84 return fmt.Errorf("failed to fetch and add keys: %w", err)
85 }
86
87 return nil
88}
89
90func (h *Handle) processPull(ctx context.Context, event *models.Event) error {
91 raw := json.RawMessage(event.Commit.Record)
92 did := event.Did
93
94 var record tangled.RepoPull
95 if err := json.Unmarshal(raw, &record); err != nil {
96 return fmt.Errorf("failed to unmarshal record: %w", err)
97 }
98
99 l := log.FromContext(ctx)
100 l = l.With("handler", "processPull")
101 l = l.With("did", did)
102 l = l.With("target_repo", record.TargetRepo)
103 l = l.With("target_branch", record.TargetBranch)
104
105 if record.Source == nil {
106 reason := "not a branch-based pull request"
107 l.Info("ignoring pull record", "reason", reason)
108 return fmt.Errorf("ignoring pull record: %s", reason)
109 }
110
111 if record.Source.Repo != nil {
112 reason := "fork based pull"
113 l.Info("ignoring pull record", "reason", reason)
114 return fmt.Errorf("ignoring pull record: %s", reason)
115 }
116
117 allDids, err := h.db.GetAllDids()
118 if err != nil {
119 return err
120 }
121
122 // presently: we only process PRs from collaborators for pipelines
123 if !slices.Contains(allDids, did) {
124 reason := "not a known did"
125 l.Info("rejecting pull record", "reason", reason)
126 return fmt.Errorf("rejected pull record: %s, %s", reason, did)
127 }
128
129 repoAt, err := syntax.ParseATURI(record.TargetRepo)
130 if err != nil {
131 return err
132 }
133
134 // resolve this aturi to extract the repo record
135 resolver := idresolver.DefaultResolver()
136 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
137 if err != nil || ident.Handle.IsInvalidHandle() {
138 return fmt.Errorf("failed to resolve handle: %w", err)
139 }
140
141 xrpcc := xrpc.Client{
142 Host: ident.PDSEndpoint(),
143 }
144
145 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
146 if err != nil {
147 return err
148 }
149
150 repo := resp.Value.Val.(*tangled.Repo)
151
152 if repo.Knot != h.c.Server.Hostname {
153 reason := "not this knot"
154 l.Info("rejecting pull record", "reason", reason)
155 return fmt.Errorf("rejected pull record: %s", reason)
156 }
157
158 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name)
159 if err != nil {
160 return err
161 }
162
163 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
164 if err != nil {
165 return err
166 }
167
168 gr, err := git.Open(repoPath, record.Source.Branch)
169 if err != nil {
170 return err
171 }
172
173 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
174 if err != nil {
175 return err
176 }
177
178 var pipeline workflow.RawPipeline
179 for _, e := range workflowDir {
180 if !e.IsFile {
181 continue
182 }
183
184 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
185 contents, err := gr.RawContent(fpath)
186 if err != nil {
187 continue
188 }
189
190 pipeline = append(pipeline, workflow.RawWorkflow{
191 Name: e.Name,
192 Contents: contents,
193 })
194 }
195
196 trigger := tangled.Pipeline_PullRequestTriggerData{
197 Action: "create",
198 SourceBranch: record.Source.Branch,
199 SourceSha: record.Source.Sha,
200 TargetBranch: record.TargetBranch,
201 }
202
203 compiler := workflow.Compiler{
204 Trigger: tangled.Pipeline_TriggerMetadata{
205 Kind: string(workflow.TriggerKindPullRequest),
206 PullRequest: &trigger,
207 Repo: &tangled.Pipeline_TriggerRepo{
208 Did: repo.Owner,
209 Knot: repo.Knot,
210 Repo: repo.Name,
211 },
212 },
213 }
214
215 cp := compiler.Compile(compiler.Parse(pipeline))
216 eventJson, err := json.Marshal(cp)
217 if err != nil {
218 return err
219 }
220
221 // do not run empty pipelines
222 if cp.Workflows == nil {
223 return nil
224 }
225
226 ev := db.Event{
227 Rkey: TID(),
228 Nsid: tangled.PipelineNSID,
229 EventJson: string(eventJson),
230 }
231
232 return h.db.InsertEvent(ev, h.n)
233}
234
235// duplicated from add collaborator
236func (h *Handle) processCollaborator(ctx context.Context, event *models.Event) error {
237 raw := json.RawMessage(event.Commit.Record)
238 did := event.Did
239
240 var record tangled.RepoCollaborator
241 if err := json.Unmarshal(raw, &record); err != nil {
242 return fmt.Errorf("failed to unmarshal record: %w", err)
243 }
244
245 repoAt, err := syntax.ParseATURI(record.Repo)
246 if err != nil {
247 return err
248 }
249
250 resolver := idresolver.DefaultResolver()
251
252 subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
253 if err != nil || subjectId.Handle.IsInvalidHandle() {
254 return err
255 }
256
257 // TODO: fix this for good, we need to fetch the record here unfortunately
258 // resolve this aturi to extract the repo record
259 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
260 if err != nil || owner.Handle.IsInvalidHandle() {
261 return fmt.Errorf("failed to resolve handle: %w", err)
262 }
263
264 xrpcc := xrpc.Client{
265 Host: owner.PDSEndpoint(),
266 }
267
268 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
269 if err != nil {
270 return err
271 }
272
273 repo := resp.Value.Val.(*tangled.Repo)
274 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
275
276 // check perms for this user
277 if ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo); !ok || err != nil {
278 return fmt.Errorf("insufficient permissions: %w", err)
279 }
280
281 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
282 return err
283 }
284 h.jc.AddDid(subjectId.DID.String())
285
286 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
287 return err
288 }
289
290 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
291}
292
293func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
294 l := log.FromContext(ctx)
295
296 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
297 if err != nil {
298 l.Error("error building endpoint url", "did", did, "error", err.Error())
299 return fmt.Errorf("error building endpoint url: %w", err)
300 }
301
302 resp, err := http.Get(keysEndpoint)
303 if err != nil {
304 l.Error("error getting keys", "did", did, "error", err)
305 return fmt.Errorf("error getting keys: %w", err)
306 }
307 defer resp.Body.Close()
308
309 if resp.StatusCode == http.StatusNotFound {
310 l.Info("no keys found for did", "did", did)
311 return nil
312 }
313
314 plaintext, err := io.ReadAll(resp.Body)
315 if err != nil {
316 l.Error("error reading response body", "error", err)
317 return fmt.Errorf("error reading response body: %w", err)
318 }
319
320 for _, key := range strings.Split(string(plaintext), "\n") {
321 if key == "" {
322 continue
323 }
324 pk := db.PublicKey{
325 Did: did,
326 }
327 pk.Key = key
328 if err := h.db.AddPublicKey(pk); err != nil {
329 l.Error("failed to add public key", "error", err)
330 return fmt.Errorf("failed to add public key: %w", err)
331 }
332 }
333 return nil
334}
335
336func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
337 if event.Kind != models.EventKindCommit {
338 return nil
339 }
340
341 var err error
342 defer func() {
343 eventTime := event.TimeUS
344 lastTimeUs := eventTime + 1
345 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
346 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
347 }
348 }()
349
350 switch event.Commit.Collection {
351 case tangled.PublicKeyNSID:
352 err = h.processPublicKey(ctx, event)
353 case tangled.KnotMemberNSID:
354 err = h.processKnotMember(ctx, event)
355 case tangled.RepoPullNSID:
356 err = h.processPull(ctx, event)
357 case tangled.RepoCollaboratorNSID:
358 err = h.processCollaborator(ctx, event)
359 }
360
361 if err != nil {
362 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
363 }
364
365 return nil
366}