1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/xrpc"
16 "github.com/bluesky-social/jetstream/pkg/models"
17 securejoin "github.com/cyphar/filepath-securejoin"
18 "tangled.org/core/api/tangled"
19 "tangled.org/core/knotserver/db"
20 "tangled.org/core/knotserver/git"
21 "tangled.org/core/log"
22 "tangled.org/core/rbac"
23 "tangled.org/core/workflow"
24)
25
26func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error {
27 l := log.FromContext(ctx)
28 raw := json.RawMessage(event.Commit.Record)
29 did := event.Did
30
31 var record tangled.PublicKey
32 if err := json.Unmarshal(raw, &record); err != nil {
33 return fmt.Errorf("failed to unmarshal record: %w", err)
34 }
35
36 pk := db.PublicKey{
37 Did: did,
38 PublicKey: record,
39 }
40 if err := h.db.AddPublicKey(pk); err != nil {
41 l.Error("failed to add public key", "error", err)
42 return fmt.Errorf("failed to add public key: %w", err)
43 }
44 l.Info("added public key from firehose", "did", did)
45 return nil
46}
47
48func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error {
49 l := log.FromContext(ctx)
50 raw := json.RawMessage(event.Commit.Record)
51 did := event.Did
52
53 var record tangled.KnotMember
54 if err := json.Unmarshal(raw, &record); err != nil {
55 return fmt.Errorf("failed to unmarshal record: %w", err)
56 }
57
58 if record.Domain != h.c.Server.Hostname {
59 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
60 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
61 }
62
63 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
64 if err != nil || !ok {
65 l.Error("failed to add member", "did", did)
66 return fmt.Errorf("failed to enforce permissions: %w", err)
67 }
68
69 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
70 l.Error("failed to add member", "error", err)
71 return fmt.Errorf("failed to add member: %w", err)
72 }
73 l.Info("added member from firehose", "member", record.Subject)
74
75 if err := h.db.AddDid(record.Subject); err != nil {
76 l.Error("failed to add did", "error", err)
77 return fmt.Errorf("failed to add did: %w", err)
78 }
79 h.jc.AddDid(record.Subject)
80
81 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil {
82 return fmt.Errorf("failed to fetch and add keys: %w", err)
83 }
84
85 return nil
86}
87
88func (h *Knot) processPull(ctx context.Context, event *models.Event) error {
89 raw := json.RawMessage(event.Commit.Record)
90 did := event.Did
91
92 var record tangled.RepoPull
93 if err := json.Unmarshal(raw, &record); err != nil {
94 return fmt.Errorf("failed to unmarshal record: %w", err)
95 }
96
97 l := log.FromContext(ctx)
98 l = l.With("handler", "processPull")
99 l = l.With("did", did)
100
101 if record.Target == nil {
102 return fmt.Errorf("ignoring pull record: target repo is nil")
103 }
104
105 l = l.With("target_repo", record.Target.Repo)
106 l = l.With("target_branch", record.Target.Branch)
107
108 if record.Source == nil {
109 return fmt.Errorf("ignoring pull record: not a branch-based pull request")
110 }
111
112 if record.Source.Repo != nil {
113 return fmt.Errorf("ignoring pull record: fork based pull")
114 }
115
116 repoAt, err := syntax.ParseATURI(record.Target.Repo)
117 if err != nil {
118 return fmt.Errorf("failed to parse ATURI: %w", err)
119 }
120
121 // resolve this aturi to extract the repo record
122 ident, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
123 if err != nil || ident.Handle.IsInvalidHandle() {
124 return fmt.Errorf("failed to resolve handle: %w", err)
125 }
126
127 xrpcc := xrpc.Client{
128 Host: ident.PDSEndpoint(),
129 }
130
131 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
132 if err != nil {
133 return fmt.Errorf("failed to resolver repo: %w", err)
134 }
135
136 repo := resp.Value.Val.(*tangled.Repo)
137
138 if repo.Knot != h.c.Server.Hostname {
139 return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
140 }
141
142 didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name)
143 if err != nil {
144 return fmt.Errorf("failed to construct relative repo path: %w", err)
145 }
146
147 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
148 if err != nil {
149 return fmt.Errorf("failed to construct absolute repo path: %w", err)
150 }
151
152 gr, err := git.Open(repoPath, record.Source.Sha)
153 if err != nil {
154 return fmt.Errorf("failed to open git repository: %w", err)
155 }
156
157 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
158 if err != nil {
159 return fmt.Errorf("failed to open workflow directory: %w", err)
160 }
161
162 var pipeline workflow.RawPipeline
163 for _, e := range workflowDir {
164 if !e.IsFile() {
165 continue
166 }
167
168 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
169 contents, err := gr.RawContent(fpath)
170 if err != nil {
171 continue
172 }
173
174 pipeline = append(pipeline, workflow.RawWorkflow{
175 Name: e.Name,
176 Contents: contents,
177 })
178 }
179
180 trigger := tangled.Pipeline_PullRequestTriggerData{
181 Action: "create",
182 SourceBranch: record.Source.Branch,
183 SourceSha: record.Source.Sha,
184 TargetBranch: record.Target.Branch,
185 }
186
187 compiler := workflow.Compiler{
188 Trigger: tangled.Pipeline_TriggerMetadata{
189 Kind: string(workflow.TriggerKindPullRequest),
190 PullRequest: &trigger,
191 Repo: &tangled.Pipeline_TriggerRepo{
192 Did: ident.DID.String(),
193 Knot: repo.Knot,
194 Repo: repo.Name,
195 },
196 },
197 }
198
199 cp := compiler.Compile(compiler.Parse(pipeline))
200 eventJson, err := json.Marshal(cp)
201 if err != nil {
202 return fmt.Errorf("failed to marshal pipeline event: %w", err)
203 }
204
205 // do not run empty pipelines
206 if cp.Workflows == nil {
207 return nil
208 }
209
210 ev := db.Event{
211 Rkey: TID(),
212 Nsid: tangled.PipelineNSID,
213 EventJson: string(eventJson),
214 }
215
216 return h.db.InsertEvent(ev, h.n)
217}
218
219// duplicated from add collaborator
220func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error {
221 raw := json.RawMessage(event.Commit.Record)
222 did := event.Did
223
224 var record tangled.RepoCollaborator
225 if err := json.Unmarshal(raw, &record); err != nil {
226 return fmt.Errorf("failed to unmarshal record: %w", err)
227 }
228
229 repoAt, err := syntax.ParseATURI(record.Repo)
230 if err != nil {
231 return err
232 }
233
234 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
235 if err != nil || subjectId.Handle.IsInvalidHandle() {
236 return err
237 }
238
239 // TODO: fix this for good, we need to fetch the record here unfortunately
240 // resolve this aturi to extract the repo record
241 owner, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
242 if err != nil || owner.Handle.IsInvalidHandle() {
243 return fmt.Errorf("failed to resolve handle: %w", err)
244 }
245
246 xrpcc := xrpc.Client{
247 Host: owner.PDSEndpoint(),
248 }
249
250 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
251 if err != nil {
252 return err
253 }
254
255 repo := resp.Value.Val.(*tangled.Repo)
256 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
257
258 // check perms for this user
259 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo)
260 if err != nil {
261 return fmt.Errorf("failed to check permissions: %w", err)
262 }
263 if !ok {
264 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", didSlashRepo)
265 }
266
267 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
268 return err
269 }
270 h.jc.AddDid(subjectId.DID.String())
271
272 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
273 return err
274 }
275
276 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
277}
278
279func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
280 l := log.FromContext(ctx)
281
282 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
283 if err != nil {
284 l.Error("error building endpoint url", "did", did, "error", err.Error())
285 return fmt.Errorf("error building endpoint url: %w", err)
286 }
287
288 resp, err := http.Get(keysEndpoint)
289 if err != nil {
290 l.Error("error getting keys", "did", did, "error", err)
291 return fmt.Errorf("error getting keys: %w", err)
292 }
293 defer resp.Body.Close()
294
295 if resp.StatusCode == http.StatusNotFound {
296 l.Info("no keys found for did", "did", did)
297 return nil
298 }
299
300 plaintext, err := io.ReadAll(resp.Body)
301 if err != nil {
302 l.Error("error reading response body", "error", err)
303 return fmt.Errorf("error reading response body: %w", err)
304 }
305
306 for key := range strings.SplitSeq(string(plaintext), "\n") {
307 if key == "" {
308 continue
309 }
310 pk := db.PublicKey{
311 Did: did,
312 }
313 pk.Key = key
314 if err := h.db.AddPublicKey(pk); err != nil {
315 l.Error("failed to add public key", "error", err)
316 return fmt.Errorf("failed to add public key: %w", err)
317 }
318 }
319 return nil
320}
321
322func (h *Knot) processMessages(ctx context.Context, event *models.Event) error {
323 if event.Kind != models.EventKindCommit {
324 return nil
325 }
326
327 var err error
328 defer func() {
329 eventTime := event.TimeUS
330 lastTimeUs := eventTime + 1
331 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
332 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
333 }
334 }()
335
336 switch event.Commit.Collection {
337 case tangled.PublicKeyNSID:
338 err = h.processPublicKey(ctx, event)
339 case tangled.KnotMemberNSID:
340 err = h.processKnotMember(ctx, event)
341 case tangled.RepoPullNSID:
342 err = h.processPull(ctx, event)
343 case tangled.RepoCollaboratorNSID:
344 err = h.processCollaborator(ctx, event)
345 }
346
347 if err != nil {
348 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
349 }
350
351 return nil
352}