1package engine
2
3import (
4 "bufio"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "os"
11 "strings"
12 "sync"
13 "time"
14
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/api/types/volume"
20 "github.com/docker/docker/client"
21 "github.com/docker/docker/pkg/stdcopy"
22 "tangled.sh/tangled.sh/core/log"
23 "tangled.sh/tangled.sh/core/notifier"
24 "tangled.sh/tangled.sh/core/spindle/config"
25 "tangled.sh/tangled.sh/core/spindle/db"
26 "tangled.sh/tangled.sh/core/spindle/models"
27)
28
29const (
30 workspaceDir = "/tangled/workspace"
31)
32
33type cleanupFunc func(context.Context) error
34
35type Engine struct {
36 docker client.APIClient
37 l *slog.Logger
38 db *db.DB
39 n *notifier.Notifier
40 cfg *config.Config
41
42 chanMu sync.RWMutex
43 stdoutChans map[string]chan string
44 stderrChans map[string]chan string
45
46 cleanupMu sync.Mutex
47 cleanup map[string][]cleanupFunc
48}
49
50func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
51 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
52 if err != nil {
53 return nil, err
54 }
55
56 l := log.FromContext(ctx).With("component", "spindle")
57
58 e := &Engine{
59 docker: dcli,
60 l: l,
61 db: db,
62 n: n,
63 cfg: cfg,
64 }
65
66 e.stdoutChans = make(map[string]chan string, 100)
67 e.stderrChans = make(map[string]chan string, 100)
68
69 e.cleanup = make(map[string][]cleanupFunc)
70
71 return e, nil
72}
73
74func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
75 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
76
77 wg := sync.WaitGroup{}
78 for _, w := range pipeline.Workflows {
79 wg.Add(1)
80 go func() error {
81 defer wg.Done()
82 wid := models.WorkflowId{
83 PipelineId: pipelineId,
84 Name: w.Name,
85 }
86
87 err := e.db.StatusRunning(wid, e.n)
88 if err != nil {
89 return err
90 }
91
92 err = e.SetupWorkflow(ctx, wid)
93 if err != nil {
94 e.l.Error("setting up worklow", "wid", wid, "err", err)
95 return err
96 }
97 defer e.DestroyWorkflow(ctx, wid)
98
99 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{})
100 if err != nil {
101 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
102
103 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
104 if err != nil {
105 return err
106 }
107
108 return fmt.Errorf("pulling image: %w", err)
109 }
110 defer reader.Close()
111 io.Copy(os.Stdout, reader)
112
113 err = e.StartSteps(ctx, w.Steps, wid, w.Image)
114 if err != nil {
115 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error())
116
117 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
118 if dbErr != nil {
119 return dbErr
120 }
121
122 return fmt.Errorf("starting steps image: %w", err)
123 }
124
125 err = e.db.StatusSuccess(wid, e.n)
126 if err != nil {
127 return err
128 }
129
130 return nil
131 }()
132 }
133
134 wg.Wait()
135}
136
137// SetupWorkflow sets up a new network for the workflow and volumes for
138// the workspace and Nix store. These are persisted across steps and are
139// destroyed at the end of the workflow.
140func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
141 e.l.Info("setting up workflow", "workflow", wid)
142
143 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
144 Name: workspaceVolume(wid),
145 Driver: "local",
146 })
147 if err != nil {
148 return err
149 }
150 e.registerCleanup(wid, func(ctx context.Context) error {
151 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
152 })
153
154 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
155 Name: nixVolume(wid),
156 Driver: "local",
157 })
158 if err != nil {
159 return err
160 }
161 e.registerCleanup(wid, func(ctx context.Context) error {
162 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
163 })
164
165 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
166 Driver: "bridge",
167 })
168 if err != nil {
169 return err
170 }
171 e.registerCleanup(wid, func(ctx context.Context) error {
172 return e.docker.NetworkRemove(ctx, networkName(wid))
173 })
174
175 return nil
176}
177
178// StartSteps starts all steps sequentially with the same base image.
179// ONLY marks pipeline as failed if container's exit code is non-zero.
180// All other errors are bubbled up.
181// Fixed version of the step execution logic
182func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
183 stepTimeoutStr := e.cfg.Pipelines.StepTimeout
184 stepTimeout, err := time.ParseDuration(stepTimeoutStr)
185 if err != nil {
186 e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr)
187 stepTimeout = 5 * time.Minute
188 }
189 e.l.Info("using step timeout", "timeout", stepTimeout)
190
191 e.chanMu.Lock()
192 if _, exists := e.stdoutChans[wid.String()]; !exists {
193 e.stdoutChans[wid.String()] = make(chan string, 100)
194 }
195 if _, exists := e.stderrChans[wid.String()]; !exists {
196 e.stderrChans[wid.String()] = make(chan string, 100)
197 }
198 e.chanMu.Unlock()
199
200 // close channels after all steps are complete
201 defer func() {
202 close(e.stdoutChans[wid.String()])
203 close(e.stderrChans[wid.String()])
204 }()
205
206 for stepIdx, step := range steps {
207 envs := ConstructEnvs(step.Environment)
208 envs.AddEnv("HOME", workspaceDir)
209 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
210
211 hostConfig := hostConfig(wid)
212 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
213 Image: image,
214 Cmd: []string{"bash", "-c", step.Command},
215 WorkingDir: workspaceDir,
216 Tty: false,
217 Hostname: "spindle",
218 Env: envs.Slice(),
219 }, hostConfig, nil, nil, "")
220 if err != nil {
221 return fmt.Errorf("creating container: %w", err)
222 }
223
224 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
225 if err != nil {
226 return fmt.Errorf("connecting network: %w", err)
227 }
228
229 stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout)
230
231 err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{})
232 if err != nil {
233 stepCancel()
234 return err
235 }
236 e.l.Info("started container", "name", resp.ID, "step", step.Name)
237
238 // start tailing logs in background
239 tailDone := make(chan error, 1)
240 go func() {
241 tailDone <- e.TailStep(stepCtx, resp.ID, wid, stepIdx)
242 }()
243
244 // wait for container completion or timeout
245 waitDone := make(chan struct{})
246 var state *container.State
247 var waitErr error
248
249 go func() {
250 defer close(waitDone)
251 state, waitErr = e.WaitStep(stepCtx, resp.ID)
252 }()
253
254 select {
255 case <-waitDone:
256 // container finished normally
257 stepCancel()
258
259 // wait for tailing to complete
260 <-tailDone
261
262 case <-stepCtx.Done():
263 e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout)
264
265 _ = e.DestroyStep(ctx, resp.ID)
266
267 // wait for both goroutines to finish
268 <-waitDone
269 <-tailDone
270
271 stepCancel()
272 return fmt.Errorf("step timed out after %v", stepTimeout)
273 }
274
275 if waitErr != nil {
276 return waitErr
277 }
278
279 err = e.DestroyStep(ctx, resp.ID)
280 if err != nil {
281 return err
282 }
283
284 if state.ExitCode != 0 {
285 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
286 err := e.db.StatusFailed(wid, state.Error, int64(state.ExitCode), e.n)
287 if err != nil {
288 return err
289 }
290 return fmt.Errorf("error: %s, exit code: %d, oom: %t", state.Error, state.ExitCode, state.OOMKilled)
291 }
292 }
293
294 return nil
295}
296
297func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
298 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
299 select {
300 case err := <-errCh:
301 if err != nil {
302 return nil, err
303 }
304 case <-wait:
305 }
306
307 e.l.Info("waited for container", "name", containerID)
308
309 info, err := e.docker.ContainerInspect(ctx, containerID)
310 if err != nil {
311 return nil, err
312 }
313
314 return info.State, nil
315}
316
317func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int) error {
318 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
319 Follow: true,
320 ShowStdout: true,
321 ShowStderr: true,
322 Details: true,
323 Timestamps: false,
324 })
325 if err != nil {
326 return err
327 }
328
329 stepLogger, err := NewStepLogger(e.cfg.Pipelines.LogDir, wid.String(), stepIdx)
330 if err != nil {
331 e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
332 }
333
334 var logOutput io.Writer = io.Discard
335
336 if e.cfg.Server.Dev {
337 logOutput = &ansiStrippingWriter{underlying: os.Stdout}
338 }
339
340 tee := io.TeeReader(logs, logOutput)
341
342 // using StdCopy we demux logs and stream stdout and stderr to different
343 // channels.
344 //
345 // stdout w||r stdoutCh
346 // stderr w||r stderrCh
347 //
348
349 rpipeOut, wpipeOut := io.Pipe()
350 rpipeErr, wpipeErr := io.Pipe()
351
352 // sets up a io.MultiWriter to write to both the pipe
353 // and the file-based logger.
354 multiOut := io.MultiWriter(wpipeOut, stepLogger.Stdout())
355 multiErr := io.MultiWriter(wpipeErr, stepLogger.Stderr())
356
357 wg := sync.WaitGroup{}
358
359 wg.Add(1)
360 go func() {
361 defer wg.Done()
362 defer wpipeOut.Close()
363 defer wpipeErr.Close()
364 defer stepLogger.Close()
365 _, err := stdcopy.StdCopy(multiOut, multiErr, tee)
366 if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) {
367 e.l.Error("failed to copy logs", "error", err)
368 }
369 }()
370
371 // read from stdout and send to stdout pipe
372 // NOTE: the stdoutCh channnel is closed further up in StartSteps
373 // once all steps are done.
374 wg.Add(1)
375 go func() {
376 defer wg.Done()
377 e.chanMu.RLock()
378 stdoutCh := e.stdoutChans[wid.String()]
379 e.chanMu.RUnlock()
380
381 scanner := bufio.NewScanner(rpipeOut)
382 for scanner.Scan() {
383 stdoutCh <- scanner.Text()
384 }
385 if err := scanner.Err(); err != nil {
386 e.l.Error("failed to scan stdout", "error", err)
387 }
388 }()
389
390 // read from stderr and send to stderr pipe
391 // NOTE: the stderrCh channnel is closed further up in StartSteps
392 // once all steps are done.
393 wg.Add(1)
394 go func() {
395 defer wg.Done()
396 e.chanMu.RLock()
397 stderrCh := e.stderrChans[wid.String()]
398 e.chanMu.RUnlock()
399
400 scanner := bufio.NewScanner(rpipeErr)
401 for scanner.Scan() {
402 stderrCh <- scanner.Text()
403 }
404 if err := scanner.Err(); err != nil {
405 e.l.Error("failed to scan stderr", "error", err)
406 }
407 }()
408
409 wg.Wait()
410
411 return nil
412}
413
414func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
415 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
416 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
417 return err
418 }
419
420 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
421 RemoveVolumes: true,
422 RemoveLinks: false,
423 Force: false,
424 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
425 return err
426 }
427
428 return nil
429}
430
431func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
432 e.cleanupMu.Lock()
433 key := wid.String()
434
435 fns := e.cleanup[key]
436 delete(e.cleanup, key)
437 e.cleanupMu.Unlock()
438
439 for _, fn := range fns {
440 if err := fn(ctx); err != nil {
441 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
442 }
443 }
444 return nil
445}
446
447func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) {
448 e.chanMu.RLock()
449 defer e.chanMu.RUnlock()
450
451 stdoutCh, ok1 := e.stdoutChans[wid.String()]
452 stderrCh, ok2 := e.stderrChans[wid.String()]
453
454 if !ok1 || !ok2 {
455 return nil, nil, false
456 }
457 return stdoutCh, stderrCh, true
458}
459
460func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
461 e.cleanupMu.Lock()
462 defer e.cleanupMu.Unlock()
463
464 key := wid.String()
465 e.cleanup[key] = append(e.cleanup[key], fn)
466}
467
468func workspaceVolume(wid models.WorkflowId) string {
469 return fmt.Sprintf("workspace-%s", wid)
470}
471
472func nixVolume(wid models.WorkflowId) string {
473 return fmt.Sprintf("nix-%s", wid)
474}
475
476func networkName(wid models.WorkflowId) string {
477 return fmt.Sprintf("workflow-network-%s", wid)
478}
479
480func hostConfig(wid models.WorkflowId) *container.HostConfig {
481 hostConfig := &container.HostConfig{
482 Mounts: []mount.Mount{
483 {
484 Type: mount.TypeVolume,
485 Source: workspaceVolume(wid),
486 Target: workspaceDir,
487 },
488 {
489 Type: mount.TypeVolume,
490 Source: nixVolume(wid),
491 Target: "/nix",
492 },
493 {
494 Type: mount.TypeTmpfs,
495 Target: "/tmp",
496 },
497 },
498 ReadonlyRootfs: false,
499 CapDrop: []string{"ALL"},
500 SecurityOpt: []string{"seccomp=unconfined"},
501 }
502
503 return hostConfig
504}
505
506// thanks woodpecker
507func isErrContainerNotFoundOrNotRunning(err error) bool {
508 // Error response from daemon: Cannot kill container: ...: No such container: ...
509 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
510 // Error response from podman daemon: can only kill running containers. ... is in state exited
511 // Error: No such container: ...
512 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
513}