1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "slices"
12 "strings"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/bluesky-social/indigo/xrpc"
17 "github.com/bluesky-social/jetstream/pkg/models"
18 securejoin "github.com/cyphar/filepath-securejoin"
19 "tangled.sh/tangled.sh/core/api/tangled"
20 "tangled.sh/tangled.sh/core/appview/idresolver"
21 "tangled.sh/tangled.sh/core/knotserver/db"
22 "tangled.sh/tangled.sh/core/knotserver/git"
23 "tangled.sh/tangled.sh/core/log"
24 "tangled.sh/tangled.sh/core/workflow"
25)
26
27func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
28 l := log.FromContext(ctx)
29 pk := db.PublicKey{
30 Did: did,
31 PublicKey: record,
32 }
33 if err := h.db.AddPublicKey(pk); err != nil {
34 l.Error("failed to add public key", "error", err)
35 return fmt.Errorf("failed to add public key: %w", err)
36 }
37 l.Info("added public key from firehose", "did", did)
38 return nil
39}
40
41func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
42 l := log.FromContext(ctx)
43
44 if record.Domain != h.c.Server.Hostname {
45 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
46 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
47 }
48
49 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
50 if err != nil || !ok {
51 l.Error("failed to add member", "did", did)
52 return fmt.Errorf("failed to enforce permissions: %w", err)
53 }
54
55 if err := h.e.AddKnotMember(ThisServer, record.Subject); err != nil {
56 l.Error("failed to add member", "error", err)
57 return fmt.Errorf("failed to add member: %w", err)
58 }
59 l.Info("added member from firehose", "member", record.Subject)
60
61 if err := h.db.AddDid(did); err != nil {
62 l.Error("failed to add did", "error", err)
63 return fmt.Errorf("failed to add did: %w", err)
64 }
65 h.jc.AddDid(did)
66
67 if err := h.fetchAndAddKeys(ctx, did); err != nil {
68 return fmt.Errorf("failed to fetch and add keys: %w", err)
69 }
70
71 return nil
72}
73
74func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error {
75 l := log.FromContext(ctx)
76 l = l.With("handler", "processPull")
77 l = l.With("did", did)
78 l = l.With("target_repo", record.TargetRepo)
79 l = l.With("target_branch", record.TargetBranch)
80
81 if record.Source == nil {
82 reason := "not a branch-based pull request"
83 l.Info("ignoring pull record", "reason", reason)
84 return fmt.Errorf("ignoring pull record: %s", reason)
85 }
86
87 if record.Source.Repo != nil {
88 reason := "fork based pull"
89 l.Info("ignoring pull record", "reason", reason)
90 return fmt.Errorf("ignoring pull record: %s", reason)
91 }
92
93 allDids, err := h.db.GetAllDids()
94 if err != nil {
95 return err
96 }
97
98 // presently: we only process PRs from collaborators for pipelines
99 if !slices.Contains(allDids, did) {
100 reason := "not a known did"
101 l.Info("rejecting pull record", "reason", reason)
102 return fmt.Errorf("rejected pull record: %s, %s", reason, did)
103 }
104
105 repoAt, err := syntax.ParseATURI(record.TargetRepo)
106 if err != nil {
107 return err
108 }
109
110 // resolve this aturi to extract the repo record
111 resolver := idresolver.DefaultResolver()
112 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
113 if err != nil || ident.Handle.IsInvalidHandle() {
114 return fmt.Errorf("failed to resolve handle: %w", err)
115 }
116
117 xrpcc := xrpc.Client{
118 Host: ident.PDSEndpoint(),
119 }
120
121 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
122 if err != nil {
123 return err
124 }
125
126 repo := resp.Value.Val.(*tangled.Repo)
127
128 if repo.Knot != h.c.Server.Hostname {
129 reason := "not this knot"
130 l.Info("rejecting pull record", "reason", reason)
131 return fmt.Errorf("rejected pull record: %s", reason)
132 }
133
134 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name)
135 if err != nil {
136 return err
137 }
138
139 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
140 if err != nil {
141 return err
142 }
143
144 gr, err := git.Open(repoPath, record.Source.Branch)
145 if err != nil {
146 return err
147 }
148
149 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
150 if err != nil {
151 return err
152 }
153
154 var pipeline workflow.Pipeline
155 for _, e := range workflowDir {
156 if !e.IsFile {
157 continue
158 }
159
160 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
161 contents, err := gr.RawContent(fpath)
162 if err != nil {
163 continue
164 }
165
166 wf, err := workflow.FromFile(e.Name, contents)
167 if err != nil {
168 // TODO: log here, respond to client that is pushing
169 h.l.Error("failed to parse workflow", "err", err, "path", fpath)
170 continue
171 }
172
173 pipeline = append(pipeline, wf)
174 }
175
176 trigger := tangled.Pipeline_PullRequestTriggerData{
177 Action: "create",
178 SourceBranch: record.Source.Branch,
179 SourceSha: record.Source.Sha,
180 TargetBranch: record.TargetBranch,
181 }
182
183 compiler := workflow.Compiler{
184 Trigger: tangled.Pipeline_TriggerMetadata{
185 Kind: string(workflow.TriggerKindPullRequest),
186 PullRequest: &trigger,
187 Repo: &tangled.Pipeline_TriggerRepo{
188 Did: repo.Owner,
189 Knot: repo.Knot,
190 Repo: repo.Name,
191 },
192 },
193 }
194
195 cp := compiler.Compile(pipeline)
196 eventJson, err := json.Marshal(cp)
197 if err != nil {
198 return err
199 }
200
201 // do not run empty pipelines
202 if cp.Workflows == nil {
203 return nil
204 }
205
206 event := db.Event{
207 Rkey: TID(),
208 Nsid: tangled.PipelineNSID,
209 EventJson: string(eventJson),
210 }
211
212 return h.db.InsertEvent(event, h.n)
213}
214
215func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
216 l := log.FromContext(ctx)
217
218 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
219 if err != nil {
220 l.Error("error building endpoint url", "did", did, "error", err.Error())
221 return fmt.Errorf("error building endpoint url: %w", err)
222 }
223
224 resp, err := http.Get(keysEndpoint)
225 if err != nil {
226 l.Error("error getting keys", "did", did, "error", err)
227 return fmt.Errorf("error getting keys: %w", err)
228 }
229 defer resp.Body.Close()
230
231 if resp.StatusCode == http.StatusNotFound {
232 l.Info("no keys found for did", "did", did)
233 return nil
234 }
235
236 plaintext, err := io.ReadAll(resp.Body)
237 if err != nil {
238 l.Error("error reading response body", "error", err)
239 return fmt.Errorf("error reading response body: %w", err)
240 }
241
242 for _, key := range strings.Split(string(plaintext), "\n") {
243 if key == "" {
244 continue
245 }
246 pk := db.PublicKey{
247 Did: did,
248 }
249 pk.Key = key
250 if err := h.db.AddPublicKey(pk); err != nil {
251 l.Error("failed to add public key", "error", err)
252 return fmt.Errorf("failed to add public key: %w", err)
253 }
254 }
255 return nil
256}
257
258func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
259 did := event.Did
260 if event.Kind != models.EventKindCommit {
261 return nil
262 }
263
264 var err error
265 defer func() {
266 eventTime := event.TimeUS
267 lastTimeUs := eventTime + 1
268 fmt.Println("lastTimeUs", lastTimeUs)
269 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
270 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
271 }
272 }()
273
274 raw := json.RawMessage(event.Commit.Record)
275
276 switch event.Commit.Collection {
277 case tangled.PublicKeyNSID:
278 var record tangled.PublicKey
279 if err := json.Unmarshal(raw, &record); err != nil {
280 return fmt.Errorf("failed to unmarshal record: %w", err)
281 }
282 if err := h.processPublicKey(ctx, did, record); err != nil {
283 return fmt.Errorf("failed to process public key: %w", err)
284 }
285
286 case tangled.KnotMemberNSID:
287 var record tangled.KnotMember
288 if err := json.Unmarshal(raw, &record); err != nil {
289 return fmt.Errorf("failed to unmarshal record: %w", err)
290 }
291 if err := h.processKnotMember(ctx, did, record); err != nil {
292 return fmt.Errorf("failed to process knot member: %w", err)
293 }
294 case tangled.RepoPullNSID:
295 var record tangled.RepoPull
296 if err := json.Unmarshal(raw, &record); err != nil {
297 return fmt.Errorf("failed to unmarshal record: %w", err)
298 }
299 if err := h.processPull(ctx, did, record); err != nil {
300 return fmt.Errorf("failed to process knot member: %w", err)
301 }
302 }
303
304 return err
305}