1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/xrpc"
16 "github.com/bluesky-social/jetstream/pkg/models"
17 securejoin "github.com/cyphar/filepath-securejoin"
18 "tangled.sh/tangled.sh/core/api/tangled"
19 "tangled.sh/tangled.sh/core/idresolver"
20 "tangled.sh/tangled.sh/core/knotserver/db"
21 "tangled.sh/tangled.sh/core/knotserver/git"
22 "tangled.sh/tangled.sh/core/log"
23 "tangled.sh/tangled.sh/core/rbac"
24 "tangled.sh/tangled.sh/core/workflow"
25)
26
27func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error {
28 l := log.FromContext(ctx)
29 raw := json.RawMessage(event.Commit.Record)
30 did := event.Did
31
32 var record tangled.PublicKey
33 if err := json.Unmarshal(raw, &record); err != nil {
34 return fmt.Errorf("failed to unmarshal record: %w", err)
35 }
36
37 pk := db.PublicKey{
38 Did: did,
39 PublicKey: record,
40 }
41 if err := h.db.AddPublicKey(pk); err != nil {
42 l.Error("failed to add public key", "error", err)
43 return fmt.Errorf("failed to add public key: %w", err)
44 }
45 l.Info("added public key from firehose", "did", did)
46 return nil
47}
48
49func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error {
50 l := log.FromContext(ctx)
51 raw := json.RawMessage(event.Commit.Record)
52 did := event.Did
53
54 var record tangled.KnotMember
55 if err := json.Unmarshal(raw, &record); err != nil {
56 return fmt.Errorf("failed to unmarshal record: %w", err)
57 }
58
59 if record.Domain != h.c.Server.Hostname {
60 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
61 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
62 }
63
64 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
65 if err != nil || !ok {
66 l.Error("failed to add member", "did", did)
67 return fmt.Errorf("failed to enforce permissions: %w", err)
68 }
69
70 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
71 l.Error("failed to add member", "error", err)
72 return fmt.Errorf("failed to add member: %w", err)
73 }
74 l.Info("added member from firehose", "member", record.Subject)
75
76 if err := h.db.AddDid(record.Subject); err != nil {
77 l.Error("failed to add did", "error", err)
78 return fmt.Errorf("failed to add did: %w", err)
79 }
80 h.jc.AddDid(record.Subject)
81
82 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil {
83 return fmt.Errorf("failed to fetch and add keys: %w", err)
84 }
85
86 return nil
87}
88
89func (h *Knot) processPull(ctx context.Context, event *models.Event) error {
90 raw := json.RawMessage(event.Commit.Record)
91 did := event.Did
92
93 var record tangled.RepoPull
94 if err := json.Unmarshal(raw, &record); err != nil {
95 return fmt.Errorf("failed to unmarshal record: %w", err)
96 }
97
98 l := log.FromContext(ctx)
99 l = l.With("handler", "processPull")
100 l = l.With("did", did)
101
102 if record.Target == nil {
103 return fmt.Errorf("ignoring pull record: target repo is nil")
104 }
105
106 l = l.With("target_repo", record.Target.Repo)
107 l = l.With("target_branch", record.Target.Branch)
108
109 if record.Source == nil {
110 return fmt.Errorf("ignoring pull record: not a branch-based pull request")
111 }
112
113 if record.Source.Repo != nil {
114 return fmt.Errorf("ignoring pull record: fork based pull")
115 }
116
117 repoAt, err := syntax.ParseATURI(record.Target.Repo)
118 if err != nil {
119 return fmt.Errorf("failed to parse ATURI: %w", err)
120 }
121
122 // resolve this aturi to extract the repo record
123 resolver := idresolver.DefaultResolver()
124 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
125 if err != nil || ident.Handle.IsInvalidHandle() {
126 return fmt.Errorf("failed to resolve handle: %w", err)
127 }
128
129 xrpcc := xrpc.Client{
130 Host: ident.PDSEndpoint(),
131 }
132
133 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
134 if err != nil {
135 return fmt.Errorf("failed to resolver repo: %w", err)
136 }
137
138 repo := resp.Value.Val.(*tangled.Repo)
139
140 if repo.Knot != h.c.Server.Hostname {
141 return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
142 }
143
144 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name)
145 if err != nil {
146 return fmt.Errorf("failed to construct relative repo path: %w", err)
147 }
148
149 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
150 if err != nil {
151 return fmt.Errorf("failed to construct absolute repo path: %w", err)
152 }
153
154 gr, err := git.Open(repoPath, record.Source.Branch)
155 if err != nil {
156 return fmt.Errorf("failed to open git repository: %w", err)
157 }
158
159 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
160 if err != nil {
161 return fmt.Errorf("failed to open workflow directory: %w", err)
162 }
163
164 var pipeline workflow.RawPipeline
165 for _, e := range workflowDir {
166 if !e.IsFile {
167 continue
168 }
169
170 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
171 contents, err := gr.RawContent(fpath)
172 if err != nil {
173 continue
174 }
175
176 pipeline = append(pipeline, workflow.RawWorkflow{
177 Name: e.Name,
178 Contents: contents,
179 })
180 }
181
182 trigger := tangled.Pipeline_PullRequestTriggerData{
183 Action: "create",
184 SourceBranch: record.Source.Branch,
185 SourceSha: record.Source.Sha,
186 TargetBranch: record.Target.Branch,
187 }
188
189 compiler := workflow.Compiler{
190 Trigger: tangled.Pipeline_TriggerMetadata{
191 Kind: string(workflow.TriggerKindPullRequest),
192 PullRequest: &trigger,
193 Repo: &tangled.Pipeline_TriggerRepo{
194 Did: repo.Owner,
195 Knot: repo.Knot,
196 Repo: repo.Name,
197 },
198 },
199 }
200
201 cp := compiler.Compile(compiler.Parse(pipeline))
202 eventJson, err := json.Marshal(cp)
203 if err != nil {
204 return fmt.Errorf("failed to marshal pipeline event: %w", err)
205 }
206
207 // do not run empty pipelines
208 if cp.Workflows == nil {
209 return nil
210 }
211
212 ev := db.Event{
213 Rkey: TID(),
214 Nsid: tangled.PipelineNSID,
215 EventJson: string(eventJson),
216 }
217
218 return h.db.InsertEvent(ev, h.n)
219}
220
221// duplicated from add collaborator
222func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error {
223 raw := json.RawMessage(event.Commit.Record)
224 did := event.Did
225
226 var record tangled.RepoCollaborator
227 if err := json.Unmarshal(raw, &record); err != nil {
228 return fmt.Errorf("failed to unmarshal record: %w", err)
229 }
230
231 repoAt, err := syntax.ParseATURI(record.Repo)
232 if err != nil {
233 return err
234 }
235
236 resolver := idresolver.DefaultResolver()
237
238 subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
239 if err != nil || subjectId.Handle.IsInvalidHandle() {
240 return err
241 }
242
243 // TODO: fix this for good, we need to fetch the record here unfortunately
244 // resolve this aturi to extract the repo record
245 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
246 if err != nil || owner.Handle.IsInvalidHandle() {
247 return fmt.Errorf("failed to resolve handle: %w", err)
248 }
249
250 xrpcc := xrpc.Client{
251 Host: owner.PDSEndpoint(),
252 }
253
254 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
255 if err != nil {
256 return err
257 }
258
259 repo := resp.Value.Val.(*tangled.Repo)
260 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
261
262 // check perms for this user
263 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo)
264 if err != nil {
265 return fmt.Errorf("failed to check permissions: %w", err)
266 }
267 if !ok {
268 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", didSlashRepo)
269 }
270
271 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
272 return err
273 }
274 h.jc.AddDid(subjectId.DID.String())
275
276 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
277 return err
278 }
279
280 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
281}
282
283func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
284 l := log.FromContext(ctx)
285
286 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
287 if err != nil {
288 l.Error("error building endpoint url", "did", did, "error", err.Error())
289 return fmt.Errorf("error building endpoint url: %w", err)
290 }
291
292 resp, err := http.Get(keysEndpoint)
293 if err != nil {
294 l.Error("error getting keys", "did", did, "error", err)
295 return fmt.Errorf("error getting keys: %w", err)
296 }
297 defer resp.Body.Close()
298
299 if resp.StatusCode == http.StatusNotFound {
300 l.Info("no keys found for did", "did", did)
301 return nil
302 }
303
304 plaintext, err := io.ReadAll(resp.Body)
305 if err != nil {
306 l.Error("error reading response body", "error", err)
307 return fmt.Errorf("error reading response body: %w", err)
308 }
309
310 for key := range strings.SplitSeq(string(plaintext), "\n") {
311 if key == "" {
312 continue
313 }
314 pk := db.PublicKey{
315 Did: did,
316 }
317 pk.Key = key
318 if err := h.db.AddPublicKey(pk); err != nil {
319 l.Error("failed to add public key", "error", err)
320 return fmt.Errorf("failed to add public key: %w", err)
321 }
322 }
323 return nil
324}
325
326func (h *Knot) processMessages(ctx context.Context, event *models.Event) error {
327 if event.Kind != models.EventKindCommit {
328 return nil
329 }
330
331 var err error
332 defer func() {
333 eventTime := event.TimeUS
334 lastTimeUs := eventTime + 1
335 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
336 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
337 }
338 }()
339
340 switch event.Commit.Collection {
341 case tangled.PublicKeyNSID:
342 err = h.processPublicKey(ctx, event)
343 case tangled.KnotMemberNSID:
344 err = h.processKnotMember(ctx, event)
345 case tangled.RepoPullNSID:
346 err = h.processPull(ctx, event)
347 case tangled.RepoCollaboratorNSID:
348 err = h.processCollaborator(ctx, event)
349 }
350
351 if err != nil {
352 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
353 }
354
355 return nil
356}