1package engine
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/docker/docker/api/types/container"
15 "github.com/docker/docker/api/types/image"
16 "github.com/docker/docker/api/types/mount"
17 "github.com/docker/docker/api/types/network"
18 "github.com/docker/docker/api/types/volume"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "tangled.sh/tangled.sh/core/log"
22 "tangled.sh/tangled.sh/core/notifier"
23 "tangled.sh/tangled.sh/core/spindle/config"
24 "tangled.sh/tangled.sh/core/spindle/db"
25 "tangled.sh/tangled.sh/core/spindle/models"
26)
27
28const (
29 workspaceDir = "/tangled/workspace"
30)
31
32type cleanupFunc func(context.Context) error
33
34type Engine struct {
35 docker client.APIClient
36 l *slog.Logger
37 db *db.DB
38 n *notifier.Notifier
39 cfg *config.Config
40
41 cleanupMu sync.Mutex
42 cleanup map[string][]cleanupFunc
43}
44
45func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
46 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
47 if err != nil {
48 return nil, err
49 }
50
51 l := log.FromContext(ctx).With("component", "spindle")
52
53 e := &Engine{
54 docker: dcli,
55 l: l,
56 db: db,
57 n: n,
58 cfg: cfg,
59 }
60
61 e.cleanup = make(map[string][]cleanupFunc)
62
63 return e, nil
64}
65
66func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
67 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
68
69 wg := sync.WaitGroup{}
70 for _, w := range pipeline.Workflows {
71 wg.Add(1)
72 go func() error {
73 defer wg.Done()
74 wid := models.WorkflowId{
75 PipelineId: pipelineId,
76 Name: w.Name,
77 }
78
79 err := e.db.StatusRunning(wid, e.n)
80 if err != nil {
81 return err
82 }
83
84 err = e.SetupWorkflow(ctx, wid)
85 if err != nil {
86 e.l.Error("setting up worklow", "wid", wid, "err", err)
87 return err
88 }
89 defer e.DestroyWorkflow(ctx, wid)
90
91 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{})
92 if err != nil {
93 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
94
95 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
96 if err != nil {
97 return err
98 }
99
100 return fmt.Errorf("pulling image: %w", err)
101 }
102 defer reader.Close()
103 io.Copy(os.Stdout, reader)
104
105 workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout
106 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
107 if err != nil {
108 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
109 workflowTimeout = 5 * time.Minute
110 }
111 e.l.Info("using workflow timeout", "timeout", workflowTimeout)
112 ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
113 defer cancel()
114
115 err = e.StartSteps(ctx, w.Steps, wid, w.Image)
116 if err != nil {
117 if errors.Is(err, ErrTimedOut) {
118 dbErr := e.db.StatusTimeout(wid, e.n)
119 if dbErr != nil {
120 return dbErr
121 }
122 } else {
123 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
124 if dbErr != nil {
125 return dbErr
126 }
127 }
128
129 return fmt.Errorf("starting steps image: %w", err)
130 }
131
132 err = e.db.StatusSuccess(wid, e.n)
133 if err != nil {
134 return err
135 }
136
137 return nil
138 }()
139 }
140
141 wg.Wait()
142}
143
144// SetupWorkflow sets up a new network for the workflow and volumes for
145// the workspace and Nix store. These are persisted across steps and are
146// destroyed at the end of the workflow.
147func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
148 e.l.Info("setting up workflow", "workflow", wid)
149
150 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
151 Name: workspaceVolume(wid),
152 Driver: "local",
153 })
154 if err != nil {
155 return err
156 }
157 e.registerCleanup(wid, func(ctx context.Context) error {
158 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
159 })
160
161 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
162 Name: nixVolume(wid),
163 Driver: "local",
164 })
165 if err != nil {
166 return err
167 }
168 e.registerCleanup(wid, func(ctx context.Context) error {
169 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
170 })
171
172 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
173 Driver: "bridge",
174 })
175 if err != nil {
176 return err
177 }
178 e.registerCleanup(wid, func(ctx context.Context) error {
179 return e.docker.NetworkRemove(ctx, networkName(wid))
180 })
181
182 return nil
183}
184
185// StartSteps starts all steps sequentially with the same base image.
186// ONLY marks pipeline as failed if container's exit code is non-zero.
187// All other errors are bubbled up.
188// Fixed version of the step execution logic
189func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
190
191 for stepIdx, step := range steps {
192 select {
193 case <-ctx.Done():
194 return ctx.Err()
195 default:
196 }
197
198 envs := ConstructEnvs(step.Environment)
199 envs.AddEnv("HOME", workspaceDir)
200 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
201
202 hostConfig := hostConfig(wid)
203 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
204 Image: image,
205 Cmd: []string{"bash", "-c", step.Command},
206 WorkingDir: workspaceDir,
207 Tty: false,
208 Hostname: "spindle",
209 Env: envs.Slice(),
210 }, hostConfig, nil, nil, "")
211 defer e.DestroyStep(ctx, resp.ID)
212 if err != nil {
213 return fmt.Errorf("creating container: %w", err)
214 }
215
216 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
217 if err != nil {
218 return fmt.Errorf("connecting network: %w", err)
219 }
220
221 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
222 if err != nil {
223 return err
224 }
225 e.l.Info("started container", "name", resp.ID, "step", step.Name)
226
227 // start tailing logs in background
228 tailDone := make(chan error, 1)
229 go func() {
230 tailDone <- e.TailStep(ctx, resp.ID, wid, stepIdx, step)
231 }()
232
233 // wait for container completion or timeout
234 waitDone := make(chan struct{})
235 var state *container.State
236 var waitErr error
237
238 go func() {
239 defer close(waitDone)
240 state, waitErr = e.WaitStep(ctx, resp.ID)
241 }()
242
243 select {
244 case <-waitDone:
245
246 // wait for tailing to complete
247 <-tailDone
248
249 case <-ctx.Done():
250 e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name)
251 err = e.DestroyStep(context.Background(), resp.ID)
252 if err != nil {
253 e.l.Error("failed to destroy step", "container", resp.ID, "error", err)
254 }
255
256 // wait for both goroutines to finish
257 <-waitDone
258 <-tailDone
259
260 return ErrTimedOut
261 }
262
263 select {
264 case <-ctx.Done():
265 return ctx.Err()
266 default:
267 }
268
269 if waitErr != nil {
270 return waitErr
271 }
272
273 err = e.DestroyStep(ctx, resp.ID)
274 if err != nil {
275 return err
276 }
277
278 if state.ExitCode != 0 {
279 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
280 if state.OOMKilled {
281 return ErrOOMKilled
282 }
283 return ErrWorkflowFailed
284 }
285 }
286
287 return nil
288}
289
290func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
291 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
292 select {
293 case err := <-errCh:
294 if err != nil {
295 return nil, err
296 }
297 case <-wait:
298 }
299
300 e.l.Info("waited for container", "name", containerID)
301
302 info, err := e.docker.ContainerInspect(ctx, containerID)
303 if err != nil {
304 return nil, err
305 }
306
307 return info.State, nil
308}
309
310func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
311 wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid)
312 if err != nil {
313 e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
314 return err
315 }
316 defer wfLogger.Close()
317
318 ctl := wfLogger.ControlWriter(stepIdx, step)
319 ctl.Write([]byte(step.Name))
320
321 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
322 Follow: true,
323 ShowStdout: true,
324 ShowStderr: true,
325 Details: false,
326 Timestamps: false,
327 })
328 if err != nil {
329 return err
330 }
331
332 _, err = stdcopy.StdCopy(
333 wfLogger.DataWriter("stdout"),
334 wfLogger.DataWriter("stderr"),
335 logs,
336 )
337 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
338 return fmt.Errorf("failed to copy logs: %w", err)
339 }
340
341 return nil
342}
343
344func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
345 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
346 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
347 return err
348 }
349
350 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
351 RemoveVolumes: true,
352 RemoveLinks: false,
353 Force: false,
354 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
355 return err
356 }
357
358 return nil
359}
360
361func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
362 e.cleanupMu.Lock()
363 key := wid.String()
364
365 fns := e.cleanup[key]
366 delete(e.cleanup, key)
367 e.cleanupMu.Unlock()
368
369 for _, fn := range fns {
370 if err := fn(ctx); err != nil {
371 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
372 }
373 }
374 return nil
375}
376
377func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
378 e.cleanupMu.Lock()
379 defer e.cleanupMu.Unlock()
380
381 key := wid.String()
382 e.cleanup[key] = append(e.cleanup[key], fn)
383}
384
385func workspaceVolume(wid models.WorkflowId) string {
386 return fmt.Sprintf("workspace-%s", wid)
387}
388
389func nixVolume(wid models.WorkflowId) string {
390 return fmt.Sprintf("nix-%s", wid)
391}
392
393func networkName(wid models.WorkflowId) string {
394 return fmt.Sprintf("workflow-network-%s", wid)
395}
396
397func hostConfig(wid models.WorkflowId) *container.HostConfig {
398 hostConfig := &container.HostConfig{
399 Mounts: []mount.Mount{
400 {
401 Type: mount.TypeVolume,
402 Source: workspaceVolume(wid),
403 Target: workspaceDir,
404 },
405 {
406 Type: mount.TypeVolume,
407 Source: nixVolume(wid),
408 Target: "/nix",
409 },
410 {
411 Type: mount.TypeTmpfs,
412 Target: "/tmp",
413 ReadOnly: false,
414 TmpfsOptions: &mount.TmpfsOptions{
415 Mode: 0o1777, // world-writeable sticky bit
416 },
417 },
418 {
419 Type: mount.TypeVolume,
420 Source: "etc-nix-" + wid.String(),
421 Target: "/etc/nix",
422 },
423 },
424 ReadonlyRootfs: false,
425 CapDrop: []string{"ALL"},
426 CapAdd: []string{"CAP_DAC_OVERRIDE"},
427 SecurityOpt: []string{"no-new-privileges"},
428 ExtraHosts: []string{"host.docker.internal:host-gateway"},
429 }
430
431 return hostConfig
432}
433
434// thanks woodpecker
435func isErrContainerNotFoundOrNotRunning(err error) bool {
436 // Error response from daemon: Cannot kill container: ...: No such container: ...
437 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
438 // Error response from podman daemon: can only kill running containers. ... is in state exited
439 // Error: No such container: ...
440 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
441}