1package engine
2
3import (
4 "bufio"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "os"
11 "strings"
12 "sync"
13 "time"
14
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/api/types/volume"
20 "github.com/docker/docker/client"
21 "github.com/docker/docker/pkg/stdcopy"
22 "tangled.sh/tangled.sh/core/log"
23 "tangled.sh/tangled.sh/core/notifier"
24 "tangled.sh/tangled.sh/core/spindle/config"
25 "tangled.sh/tangled.sh/core/spindle/db"
26 "tangled.sh/tangled.sh/core/spindle/models"
27)
28
29const (
30 workspaceDir = "/tangled/workspace"
31)
32
33type cleanupFunc func(context.Context) error
34
35type Engine struct {
36 docker client.APIClient
37 l *slog.Logger
38 db *db.DB
39 n *notifier.Notifier
40 cfg *config.Config
41
42 chanMu sync.RWMutex
43 stdoutChans map[string]chan string
44 stderrChans map[string]chan string
45
46 cleanupMu sync.Mutex
47 cleanup map[string][]cleanupFunc
48}
49
50func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
51 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
52 if err != nil {
53 return nil, err
54 }
55
56 l := log.FromContext(ctx).With("component", "spindle")
57
58 e := &Engine{
59 docker: dcli,
60 l: l,
61 db: db,
62 n: n,
63 cfg: cfg,
64 }
65
66 e.stdoutChans = make(map[string]chan string, 100)
67 e.stderrChans = make(map[string]chan string, 100)
68
69 e.cleanup = make(map[string][]cleanupFunc)
70
71 return e, nil
72}
73
74func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
75 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
76
77 wg := sync.WaitGroup{}
78 for _, w := range pipeline.Workflows {
79 wg.Add(1)
80 go func() error {
81 defer wg.Done()
82 wid := models.WorkflowId{
83 PipelineId: pipelineId,
84 Name: w.Name,
85 }
86
87 err := e.db.StatusRunning(wid, e.n)
88 if err != nil {
89 return err
90 }
91
92 err = e.SetupWorkflow(ctx, wid)
93 if err != nil {
94 e.l.Error("setting up worklow", "wid", wid, "err", err)
95 return err
96 }
97 defer e.DestroyWorkflow(ctx, wid)
98
99 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{})
100 if err != nil {
101 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
102
103 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
104 if err != nil {
105 return err
106 }
107
108 return fmt.Errorf("pulling image: %w", err)
109 }
110 defer reader.Close()
111 io.Copy(os.Stdout, reader)
112
113 err = e.StartSteps(ctx, w.Steps, wid, w.Image)
114 if err != nil {
115 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error())
116
117 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
118 if dbErr != nil {
119 return dbErr
120 }
121
122 return fmt.Errorf("starting steps image: %w", err)
123 }
124
125 err = e.db.StatusSuccess(wid, e.n)
126 if err != nil {
127 return err
128 }
129
130 return nil
131 }()
132 }
133
134 wg.Wait()
135}
136
137// SetupWorkflow sets up a new network for the workflow and volumes for
138// the workspace and Nix store. These are persisted across steps and are
139// destroyed at the end of the workflow.
140func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
141 e.l.Info("setting up workflow", "workflow", wid)
142
143 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
144 Name: workspaceVolume(wid),
145 Driver: "local",
146 })
147 if err != nil {
148 return err
149 }
150 e.registerCleanup(wid, func(ctx context.Context) error {
151 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
152 })
153
154 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
155 Name: nixVolume(wid),
156 Driver: "local",
157 })
158 if err != nil {
159 return err
160 }
161 e.registerCleanup(wid, func(ctx context.Context) error {
162 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
163 })
164
165 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
166 Driver: "bridge",
167 })
168 if err != nil {
169 return err
170 }
171 e.registerCleanup(wid, func(ctx context.Context) error {
172 return e.docker.NetworkRemove(ctx, networkName(wid))
173 })
174
175 return nil
176}
177
178// StartSteps starts all steps sequentially with the same base image.
179// ONLY marks pipeline as failed if container's exit code is non-zero.
180// All other errors are bubbled up.
181// Fixed version of the step execution logic
182func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
183 stepTimeoutStr := e.cfg.Pipelines.StepTimeout
184 stepTimeout, err := time.ParseDuration(stepTimeoutStr)
185 if err != nil {
186 e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr)
187 stepTimeout = 5 * time.Minute
188 }
189 e.l.Info("using step timeout", "timeout", stepTimeout)
190
191 e.chanMu.Lock()
192 if _, exists := e.stdoutChans[wid.String()]; !exists {
193 e.stdoutChans[wid.String()] = make(chan string, 100)
194 }
195 if _, exists := e.stderrChans[wid.String()]; !exists {
196 e.stderrChans[wid.String()] = make(chan string, 100)
197 }
198 e.chanMu.Unlock()
199
200 // close channels after all steps are complete
201 defer func() {
202 close(e.stdoutChans[wid.String()])
203 close(e.stderrChans[wid.String()])
204 }()
205
206 for _, step := range steps {
207 envs := ConstructEnvs(step.Environment)
208 envs.AddEnv("HOME", workspaceDir)
209 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
210
211 hostConfig := hostConfig(wid)
212 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
213 Image: image,
214 Cmd: []string{"bash", "-c", step.Command},
215 WorkingDir: workspaceDir,
216 Tty: false,
217 Hostname: "spindle",
218 Env: envs.Slice(),
219 }, hostConfig, nil, nil, "")
220 if err != nil {
221 return fmt.Errorf("creating container: %w", err)
222 }
223
224 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
225 if err != nil {
226 return fmt.Errorf("connecting network: %w", err)
227 }
228
229 stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout)
230
231 err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{})
232 if err != nil {
233 stepCancel()
234 return err
235 }
236 e.l.Info("started container", "name", resp.ID, "step", step.Name)
237
238 // start tailing logs in background
239 tailDone := make(chan error, 1)
240 go func() {
241 tailDone <- e.TailStep(stepCtx, resp.ID, wid)
242 }()
243
244 // wait for container completion or timeout
245 waitDone := make(chan struct{})
246 var state *container.State
247 var waitErr error
248
249 go func() {
250 defer close(waitDone)
251 state, waitErr = e.WaitStep(stepCtx, resp.ID)
252 }()
253
254 select {
255 case <-waitDone:
256 // container finished normally
257 stepCancel()
258
259 // wait for tailing to complete
260 <-tailDone
261
262 case <-stepCtx.Done():
263 e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout)
264
265 _ = e.DestroyStep(ctx, resp.ID)
266
267 // wait for both goroutines to finish
268 <-waitDone
269 <-tailDone
270
271 stepCancel()
272 return fmt.Errorf("step timed out after %v", stepTimeout)
273 }
274
275 if waitErr != nil {
276 return waitErr
277 }
278
279 err = e.DestroyStep(ctx, resp.ID)
280 if err != nil {
281 return err
282 }
283
284 if state.ExitCode != 0 {
285 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
286 err := e.db.StatusFailed(wid, state.Error, int64(state.ExitCode), e.n)
287 if err != nil {
288 return err
289 }
290 return fmt.Errorf("error: %s, exit code: %d, oom: %t", state.Error, state.ExitCode, state.OOMKilled)
291 }
292 }
293
294 return nil
295}
296
297func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
298 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
299 select {
300 case err := <-errCh:
301 if err != nil {
302 return nil, err
303 }
304 case <-wait:
305 }
306
307 e.l.Info("waited for container", "name", containerID)
308
309 info, err := e.docker.ContainerInspect(ctx, containerID)
310 if err != nil {
311 return nil, err
312 }
313
314 return info.State, nil
315}
316
317func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error {
318 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
319 Follow: true,
320 ShowStdout: true,
321 ShowStderr: true,
322 Details: true,
323 Timestamps: false,
324 })
325 if err != nil {
326 return err
327 }
328
329 var devOutput io.Writer = io.Discard
330 if e.cfg.Server.Dev {
331 devOutput = &ansiStrippingWriter{underlying: os.Stdout}
332 }
333
334 tee := io.TeeReader(logs, devOutput)
335
336 // using StdCopy we demux logs and stream stdout and stderr to different
337 // channels.
338 //
339 // stdout w||r stdoutCh
340 // stderr w||r stderrCh
341 //
342
343 rpipeOut, wpipeOut := io.Pipe()
344 rpipeErr, wpipeErr := io.Pipe()
345
346 wg := sync.WaitGroup{}
347
348 wg.Add(1)
349 go func() {
350 defer wg.Done()
351 defer wpipeOut.Close()
352 defer wpipeErr.Close()
353 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee)
354 if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) {
355 e.l.Error("failed to copy logs", "error", err)
356 }
357 }()
358
359 // read from stdout and send to stdout pipe
360 // NOTE: the stdoutCh channnel is closed further up in StartSteps
361 // once all steps are done.
362 wg.Add(1)
363 go func() {
364 defer wg.Done()
365 e.chanMu.RLock()
366 stdoutCh := e.stdoutChans[wid.String()]
367 e.chanMu.RUnlock()
368
369 scanner := bufio.NewScanner(rpipeOut)
370 for scanner.Scan() {
371 stdoutCh <- scanner.Text()
372 }
373 if err := scanner.Err(); err != nil {
374 e.l.Error("failed to scan stdout", "error", err)
375 }
376 }()
377
378 // read from stderr and send to stderr pipe
379 // NOTE: the stderrCh channnel is closed further up in StartSteps
380 // once all steps are done.
381 wg.Add(1)
382 go func() {
383 defer wg.Done()
384 e.chanMu.RLock()
385 stderrCh := e.stderrChans[wid.String()]
386 e.chanMu.RUnlock()
387
388 scanner := bufio.NewScanner(rpipeErr)
389 for scanner.Scan() {
390 stderrCh <- scanner.Text()
391 }
392 if err := scanner.Err(); err != nil {
393 e.l.Error("failed to scan stderr", "error", err)
394 }
395 }()
396
397 wg.Wait()
398
399 return nil
400}
401
402func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
403 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
404 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
405 return err
406 }
407
408 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
409 RemoveVolumes: true,
410 RemoveLinks: false,
411 Force: false,
412 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
413 return err
414 }
415
416 return nil
417}
418
419func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
420 e.cleanupMu.Lock()
421 key := wid.String()
422
423 fns := e.cleanup[key]
424 delete(e.cleanup, key)
425 e.cleanupMu.Unlock()
426
427 for _, fn := range fns {
428 if err := fn(ctx); err != nil {
429 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
430 }
431 }
432 return nil
433}
434
435func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) {
436 e.chanMu.RLock()
437 defer e.chanMu.RUnlock()
438
439 stdoutCh, ok1 := e.stdoutChans[wid.String()]
440 stderrCh, ok2 := e.stderrChans[wid.String()]
441
442 if !ok1 || !ok2 {
443 return nil, nil, false
444 }
445 return stdoutCh, stderrCh, true
446}
447
448func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
449 e.cleanupMu.Lock()
450 defer e.cleanupMu.Unlock()
451
452 key := wid.String()
453 e.cleanup[key] = append(e.cleanup[key], fn)
454}
455
456func workspaceVolume(wid models.WorkflowId) string {
457 return fmt.Sprintf("workspace-%s", wid)
458}
459
460func nixVolume(wid models.WorkflowId) string {
461 return fmt.Sprintf("nix-%s", wid)
462}
463
464func networkName(wid models.WorkflowId) string {
465 return fmt.Sprintf("workflow-network-%s", wid)
466}
467
468func hostConfig(wid models.WorkflowId) *container.HostConfig {
469 hostConfig := &container.HostConfig{
470 Mounts: []mount.Mount{
471 {
472 Type: mount.TypeVolume,
473 Source: workspaceVolume(wid),
474 Target: workspaceDir,
475 },
476 {
477 Type: mount.TypeVolume,
478 Source: nixVolume(wid),
479 Target: "/nix",
480 },
481 {
482 Type: mount.TypeTmpfs,
483 Target: "/tmp",
484 },
485 },
486 ReadonlyRootfs: false,
487 CapDrop: []string{"ALL"},
488 SecurityOpt: []string{"seccomp=unconfined"},
489 }
490
491 return hostConfig
492}
493
494// thanks woodpecker
495func isErrContainerNotFoundOrNotRunning(err error) bool {
496 // Error response from daemon: Cannot kill container: ...: No such container: ...
497 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
498 // Error response from podman daemon: can only kill running containers. ... is in state exited
499 // Error: No such container: ...
500 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
501}