forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package engine
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "strings"
11 "sync"
12 "time"
13
14 securejoin "github.com/cyphar/filepath-securejoin"
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/api/types/volume"
20 "github.com/docker/docker/client"
21 "github.com/docker/docker/pkg/stdcopy"
22 "golang.org/x/sync/errgroup"
23 "tangled.sh/tangled.sh/core/log"
24 "tangled.sh/tangled.sh/core/notifier"
25 "tangled.sh/tangled.sh/core/spindle/config"
26 "tangled.sh/tangled.sh/core/spindle/db"
27 "tangled.sh/tangled.sh/core/spindle/models"
28 "tangled.sh/tangled.sh/core/spindle/secrets"
29)
30
31const (
32 workspaceDir = "/tangled/workspace"
33)
34
35type cleanupFunc func(context.Context) error
36
37type Engine struct {
38 docker client.APIClient
39 l *slog.Logger
40 db *db.DB
41 n *notifier.Notifier
42 cfg *config.Config
43 vault secrets.Manager
44
45 cleanupMu sync.Mutex
46 cleanup map[string][]cleanupFunc
47}
48
49func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier, vault secrets.Manager) (*Engine, error) {
50 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
51 if err != nil {
52 return nil, err
53 }
54
55 l := log.FromContext(ctx).With("component", "spindle")
56
57 e := &Engine{
58 docker: dcli,
59 l: l,
60 db: db,
61 n: n,
62 cfg: cfg,
63 vault: vault,
64 }
65
66 e.cleanup = make(map[string][]cleanupFunc)
67
68 return e, nil
69}
70
71func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
72 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
73
74 // extract secrets
75 var allSecrets []secrets.UnlockedSecret
76 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil {
77 if res, err := e.vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil {
78 allSecrets = res
79 }
80 }
81
82 workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout
83 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
84 if err != nil {
85 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
86 workflowTimeout = 5 * time.Minute
87 }
88 e.l.Info("using workflow timeout", "timeout", workflowTimeout)
89
90 eg, ctx := errgroup.WithContext(ctx)
91 for _, w := range pipeline.Workflows {
92 eg.Go(func() error {
93 wid := models.WorkflowId{
94 PipelineId: pipelineId,
95 Name: w.Name,
96 }
97
98 err := e.db.StatusRunning(wid, e.n)
99 if err != nil {
100 return err
101 }
102
103 err = e.SetupWorkflow(ctx, wid)
104 if err != nil {
105 e.l.Error("setting up worklow", "wid", wid, "err", err)
106 return err
107 }
108 defer e.DestroyWorkflow(ctx, wid)
109
110 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{})
111 if err != nil {
112 e.l.Error("pipeline image pull failed!", "image", w.Image, "workflowId", wid, "error", err.Error())
113
114 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
115 if err != nil {
116 return err
117 }
118
119 return fmt.Errorf("pulling image: %w", err)
120 }
121 defer reader.Close()
122 io.Copy(os.Stdout, reader)
123
124 ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
125 defer cancel()
126
127 err = e.StartSteps(ctx, wid, w, allSecrets)
128 if err != nil {
129 if errors.Is(err, ErrTimedOut) {
130 dbErr := e.db.StatusTimeout(wid, e.n)
131 if dbErr != nil {
132 return dbErr
133 }
134 } else {
135 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
136 if dbErr != nil {
137 return dbErr
138 }
139 }
140
141 return fmt.Errorf("starting steps image: %w", err)
142 }
143
144 err = e.db.StatusSuccess(wid, e.n)
145 if err != nil {
146 return err
147 }
148
149 return nil
150 })
151 }
152
153 if err = eg.Wait(); err != nil {
154 e.l.Error("failed to run one or more workflows", "err", err)
155 } else {
156 e.l.Error("successfully ran full pipeline")
157 }
158}
159
160// SetupWorkflow sets up a new network for the workflow and volumes for
161// the workspace and Nix store. These are persisted across steps and are
162// destroyed at the end of the workflow.
163func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
164 e.l.Info("setting up workflow", "workflow", wid)
165
166 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
167 Name: workspaceVolume(wid),
168 Driver: "local",
169 })
170 if err != nil {
171 return err
172 }
173 e.registerCleanup(wid, func(ctx context.Context) error {
174 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
175 })
176
177 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
178 Name: nixVolume(wid),
179 Driver: "local",
180 })
181 if err != nil {
182 return err
183 }
184 e.registerCleanup(wid, func(ctx context.Context) error {
185 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
186 })
187
188 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
189 Driver: "bridge",
190 })
191 if err != nil {
192 return err
193 }
194 e.registerCleanup(wid, func(ctx context.Context) error {
195 return e.docker.NetworkRemove(ctx, networkName(wid))
196 })
197
198 return nil
199}
200
201// StartSteps starts all steps sequentially with the same base image.
202// ONLY marks pipeline as failed if container's exit code is non-zero.
203// All other errors are bubbled up.
204// Fixed version of the step execution logic
205func (e *Engine) StartSteps(ctx context.Context, wid models.WorkflowId, w models.Workflow, secrets []secrets.UnlockedSecret) error {
206 workflowEnvs := ConstructEnvs(w.Environment)
207 for _, s := range secrets {
208 workflowEnvs.AddEnv(s.Key, s.Value)
209 }
210
211 for stepIdx, step := range w.Steps {
212 select {
213 case <-ctx.Done():
214 return ctx.Err()
215 default:
216 }
217
218 envs := append(EnvVars(nil), workflowEnvs...)
219 for k, v := range step.Environment {
220 envs.AddEnv(k, v)
221 }
222 envs.AddEnv("HOME", workspaceDir)
223 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
224
225 hostConfig := hostConfig(wid)
226 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
227 Image: w.Image,
228 Cmd: []string{"bash", "-c", step.Command},
229 WorkingDir: workspaceDir,
230 Tty: false,
231 Hostname: "spindle",
232 Env: envs.Slice(),
233 }, hostConfig, nil, nil, "")
234 defer e.DestroyStep(ctx, resp.ID)
235 if err != nil {
236 return fmt.Errorf("creating container: %w", err)
237 }
238
239 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
240 if err != nil {
241 return fmt.Errorf("connecting network: %w", err)
242 }
243
244 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
245 if err != nil {
246 return err
247 }
248 e.l.Info("started container", "name", resp.ID, "step", step.Name)
249
250 // start tailing logs in background
251 tailDone := make(chan error, 1)
252 go func() {
253 tailDone <- e.TailStep(ctx, resp.ID, wid, stepIdx, step)
254 }()
255
256 // wait for container completion or timeout
257 waitDone := make(chan struct{})
258 var state *container.State
259 var waitErr error
260
261 go func() {
262 defer close(waitDone)
263 state, waitErr = e.WaitStep(ctx, resp.ID)
264 }()
265
266 select {
267 case <-waitDone:
268
269 // wait for tailing to complete
270 <-tailDone
271
272 case <-ctx.Done():
273 e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name)
274 err = e.DestroyStep(context.Background(), resp.ID)
275 if err != nil {
276 e.l.Error("failed to destroy step", "container", resp.ID, "error", err)
277 }
278
279 // wait for both goroutines to finish
280 <-waitDone
281 <-tailDone
282
283 return ErrTimedOut
284 }
285
286 select {
287 case <-ctx.Done():
288 return ctx.Err()
289 default:
290 }
291
292 if waitErr != nil {
293 return waitErr
294 }
295
296 err = e.DestroyStep(ctx, resp.ID)
297 if err != nil {
298 return err
299 }
300
301 if state.ExitCode != 0 {
302 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
303 if state.OOMKilled {
304 return ErrOOMKilled
305 }
306 return ErrWorkflowFailed
307 }
308 }
309
310 return nil
311}
312
313func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
314 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
315 select {
316 case err := <-errCh:
317 if err != nil {
318 return nil, err
319 }
320 case <-wait:
321 }
322
323 e.l.Info("waited for container", "name", containerID)
324
325 info, err := e.docker.ContainerInspect(ctx, containerID)
326 if err != nil {
327 return nil, err
328 }
329
330 return info.State, nil
331}
332
333func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
334 wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid)
335 if err != nil {
336 e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
337 return err
338 }
339 defer wfLogger.Close()
340
341 ctl := wfLogger.ControlWriter(stepIdx, step)
342 ctl.Write([]byte(step.Name))
343
344 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
345 Follow: true,
346 ShowStdout: true,
347 ShowStderr: true,
348 Details: false,
349 Timestamps: false,
350 })
351 if err != nil {
352 return err
353 }
354
355 _, err = stdcopy.StdCopy(
356 wfLogger.DataWriter("stdout"),
357 wfLogger.DataWriter("stderr"),
358 logs,
359 )
360 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
361 return fmt.Errorf("failed to copy logs: %w", err)
362 }
363
364 return nil
365}
366
367func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
368 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
369 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
370 return err
371 }
372
373 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
374 RemoveVolumes: true,
375 RemoveLinks: false,
376 Force: false,
377 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
378 return err
379 }
380
381 return nil
382}
383
384func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
385 e.cleanupMu.Lock()
386 key := wid.String()
387
388 fns := e.cleanup[key]
389 delete(e.cleanup, key)
390 e.cleanupMu.Unlock()
391
392 for _, fn := range fns {
393 if err := fn(ctx); err != nil {
394 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
395 }
396 }
397 return nil
398}
399
400func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
401 e.cleanupMu.Lock()
402 defer e.cleanupMu.Unlock()
403
404 key := wid.String()
405 e.cleanup[key] = append(e.cleanup[key], fn)
406}
407
408func workspaceVolume(wid models.WorkflowId) string {
409 return fmt.Sprintf("workspace-%s", wid)
410}
411
412func nixVolume(wid models.WorkflowId) string {
413 return fmt.Sprintf("nix-%s", wid)
414}
415
416func networkName(wid models.WorkflowId) string {
417 return fmt.Sprintf("workflow-network-%s", wid)
418}
419
420func hostConfig(wid models.WorkflowId) *container.HostConfig {
421 hostConfig := &container.HostConfig{
422 Mounts: []mount.Mount{
423 {
424 Type: mount.TypeVolume,
425 Source: workspaceVolume(wid),
426 Target: workspaceDir,
427 },
428 {
429 Type: mount.TypeVolume,
430 Source: nixVolume(wid),
431 Target: "/nix",
432 },
433 {
434 Type: mount.TypeTmpfs,
435 Target: "/tmp",
436 ReadOnly: false,
437 TmpfsOptions: &mount.TmpfsOptions{
438 Mode: 0o1777, // world-writeable sticky bit
439 Options: [][]string{
440 {"exec"},
441 },
442 },
443 },
444 {
445 Type: mount.TypeVolume,
446 Source: "etc-nix-" + wid.String(),
447 Target: "/etc/nix",
448 },
449 },
450 ReadonlyRootfs: false,
451 CapDrop: []string{"ALL"},
452 CapAdd: []string{"CAP_DAC_OVERRIDE"},
453 SecurityOpt: []string{"no-new-privileges"},
454 ExtraHosts: []string{"host.docker.internal:host-gateway"},
455 }
456
457 return hostConfig
458}
459
460// thanks woodpecker
461func isErrContainerNotFoundOrNotRunning(err error) bool {
462 // Error response from daemon: Cannot kill container: ...: No such container: ...
463 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
464 // Error response from podman daemon: can only kill running containers. ... is in state exited
465 // Error: No such container: ...
466 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
467}