1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "slices"
12 "strings"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/bluesky-social/indigo/xrpc"
17 "github.com/bluesky-social/jetstream/pkg/models"
18 securejoin "github.com/cyphar/filepath-securejoin"
19 "tangled.sh/tangled.sh/core/api/tangled"
20 "tangled.sh/tangled.sh/core/idresolver"
21 "tangled.sh/tangled.sh/core/knotserver/db"
22 "tangled.sh/tangled.sh/core/knotserver/git"
23 "tangled.sh/tangled.sh/core/log"
24 "tangled.sh/tangled.sh/core/rbac"
25 "tangled.sh/tangled.sh/core/workflow"
26)
27
28func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
29 l := log.FromContext(ctx)
30 pk := db.PublicKey{
31 Did: did,
32 PublicKey: record,
33 }
34 if err := h.db.AddPublicKey(pk); err != nil {
35 l.Error("failed to add public key", "error", err)
36 return fmt.Errorf("failed to add public key: %w", err)
37 }
38 l.Info("added public key from firehose", "did", did)
39 return nil
40}
41
42func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
43 l := log.FromContext(ctx)
44
45 if record.Domain != h.c.Server.Hostname {
46 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
47 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
48 }
49
50 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
51 if err != nil || !ok {
52 l.Error("failed to add member", "did", did)
53 return fmt.Errorf("failed to enforce permissions: %w", err)
54 }
55
56 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
57 l.Error("failed to add member", "error", err)
58 return fmt.Errorf("failed to add member: %w", err)
59 }
60 l.Info("added member from firehose", "member", record.Subject)
61
62 if err := h.db.AddDid(did); err != nil {
63 l.Error("failed to add did", "error", err)
64 return fmt.Errorf("failed to add did: %w", err)
65 }
66 h.jc.AddDid(did)
67
68 if err := h.fetchAndAddKeys(ctx, did); err != nil {
69 return fmt.Errorf("failed to fetch and add keys: %w", err)
70 }
71
72 return nil
73}
74
75func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error {
76 l := log.FromContext(ctx)
77 l = l.With("handler", "processPull")
78 l = l.With("did", did)
79 l = l.With("target_repo", record.TargetRepo)
80 l = l.With("target_branch", record.TargetBranch)
81
82 if record.Source == nil {
83 reason := "not a branch-based pull request"
84 l.Info("ignoring pull record", "reason", reason)
85 return fmt.Errorf("ignoring pull record: %s", reason)
86 }
87
88 if record.Source.Repo != nil {
89 reason := "fork based pull"
90 l.Info("ignoring pull record", "reason", reason)
91 return fmt.Errorf("ignoring pull record: %s", reason)
92 }
93
94 allDids, err := h.db.GetAllDids()
95 if err != nil {
96 return err
97 }
98
99 // presently: we only process PRs from collaborators for pipelines
100 if !slices.Contains(allDids, did) {
101 reason := "not a known did"
102 l.Info("rejecting pull record", "reason", reason)
103 return fmt.Errorf("rejected pull record: %s, %s", reason, did)
104 }
105
106 repoAt, err := syntax.ParseATURI(record.TargetRepo)
107 if err != nil {
108 return err
109 }
110
111 // resolve this aturi to extract the repo record
112 resolver := idresolver.DefaultResolver()
113 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
114 if err != nil || ident.Handle.IsInvalidHandle() {
115 return fmt.Errorf("failed to resolve handle: %w", err)
116 }
117
118 xrpcc := xrpc.Client{
119 Host: ident.PDSEndpoint(),
120 }
121
122 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
123 if err != nil {
124 return err
125 }
126
127 repo := resp.Value.Val.(*tangled.Repo)
128
129 if repo.Knot != h.c.Server.Hostname {
130 reason := "not this knot"
131 l.Info("rejecting pull record", "reason", reason)
132 return fmt.Errorf("rejected pull record: %s", reason)
133 }
134
135 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name)
136 if err != nil {
137 return err
138 }
139
140 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
141 if err != nil {
142 return err
143 }
144
145 gr, err := git.Open(repoPath, record.Source.Branch)
146 if err != nil {
147 return err
148 }
149
150 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
151 if err != nil {
152 return err
153 }
154
155 var pipeline workflow.Pipeline
156 for _, e := range workflowDir {
157 if !e.IsFile {
158 continue
159 }
160
161 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
162 contents, err := gr.RawContent(fpath)
163 if err != nil {
164 continue
165 }
166
167 wf, err := workflow.FromFile(e.Name, contents)
168 if err != nil {
169 // TODO: log here, respond to client that is pushing
170 h.l.Error("failed to parse workflow", "err", err, "path", fpath)
171 continue
172 }
173
174 pipeline = append(pipeline, wf)
175 }
176
177 trigger := tangled.Pipeline_PullRequestTriggerData{
178 Action: "create",
179 SourceBranch: record.Source.Branch,
180 SourceSha: record.Source.Sha,
181 TargetBranch: record.TargetBranch,
182 }
183
184 compiler := workflow.Compiler{
185 Trigger: tangled.Pipeline_TriggerMetadata{
186 Kind: string(workflow.TriggerKindPullRequest),
187 PullRequest: &trigger,
188 Repo: &tangled.Pipeline_TriggerRepo{
189 Did: repo.Owner,
190 Knot: repo.Knot,
191 Repo: repo.Name,
192 },
193 },
194 }
195
196 cp := compiler.Compile(pipeline)
197 eventJson, err := json.Marshal(cp)
198 if err != nil {
199 return err
200 }
201
202 // do not run empty pipelines
203 if cp.Workflows == nil {
204 return nil
205 }
206
207 event := db.Event{
208 Rkey: TID(),
209 Nsid: tangled.PipelineNSID,
210 EventJson: string(eventJson),
211 }
212
213 return h.db.InsertEvent(event, h.n)
214}
215
216func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
217 l := log.FromContext(ctx)
218
219 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
220 if err != nil {
221 l.Error("error building endpoint url", "did", did, "error", err.Error())
222 return fmt.Errorf("error building endpoint url: %w", err)
223 }
224
225 resp, err := http.Get(keysEndpoint)
226 if err != nil {
227 l.Error("error getting keys", "did", did, "error", err)
228 return fmt.Errorf("error getting keys: %w", err)
229 }
230 defer resp.Body.Close()
231
232 if resp.StatusCode == http.StatusNotFound {
233 l.Info("no keys found for did", "did", did)
234 return nil
235 }
236
237 plaintext, err := io.ReadAll(resp.Body)
238 if err != nil {
239 l.Error("error reading response body", "error", err)
240 return fmt.Errorf("error reading response body: %w", err)
241 }
242
243 for _, key := range strings.Split(string(plaintext), "\n") {
244 if key == "" {
245 continue
246 }
247 pk := db.PublicKey{
248 Did: did,
249 }
250 pk.Key = key
251 if err := h.db.AddPublicKey(pk); err != nil {
252 l.Error("failed to add public key", "error", err)
253 return fmt.Errorf("failed to add public key: %w", err)
254 }
255 }
256 return nil
257}
258
259func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
260 did := event.Did
261 if event.Kind != models.EventKindCommit {
262 return nil
263 }
264
265 var err error
266 defer func() {
267 eventTime := event.TimeUS
268 lastTimeUs := eventTime + 1
269 fmt.Println("lastTimeUs", lastTimeUs)
270 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
271 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
272 }
273 }()
274
275 raw := json.RawMessage(event.Commit.Record)
276
277 switch event.Commit.Collection {
278 case tangled.PublicKeyNSID:
279 var record tangled.PublicKey
280 if err := json.Unmarshal(raw, &record); err != nil {
281 return fmt.Errorf("failed to unmarshal record: %w", err)
282 }
283 if err := h.processPublicKey(ctx, did, record); err != nil {
284 return fmt.Errorf("failed to process public key: %w", err)
285 }
286
287 case tangled.KnotMemberNSID:
288 var record tangled.KnotMember
289 if err := json.Unmarshal(raw, &record); err != nil {
290 return fmt.Errorf("failed to unmarshal record: %w", err)
291 }
292 if err := h.processKnotMember(ctx, did, record); err != nil {
293 return fmt.Errorf("failed to process knot member: %w", err)
294 }
295 case tangled.RepoPullNSID:
296 var record tangled.RepoPull
297 if err := json.Unmarshal(raw, &record); err != nil {
298 return fmt.Errorf("failed to unmarshal record: %w", err)
299 }
300 if err := h.processPull(ctx, did, record); err != nil {
301 return fmt.Errorf("failed to process knot member: %w", err)
302 }
303 }
304
305 return err
306}