forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package engine 2 3import ( 4 "bufio" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "os" 11 "strings" 12 "sync" 13 "time" 14 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" 18 "github.com/docker/docker/api/types/network" 19 "github.com/docker/docker/api/types/volume" 20 "github.com/docker/docker/client" 21 "github.com/docker/docker/pkg/stdcopy" 22 "tangled.sh/tangled.sh/core/log" 23 "tangled.sh/tangled.sh/core/notifier" 24 "tangled.sh/tangled.sh/core/spindle/config" 25 "tangled.sh/tangled.sh/core/spindle/db" 26 "tangled.sh/tangled.sh/core/spindle/models" 27) 28 29const ( 30 workspaceDir = "/tangled/workspace" 31) 32 33type cleanupFunc func(context.Context) error 34 35type Engine struct { 36 docker client.APIClient 37 l *slog.Logger 38 db *db.DB 39 n *notifier.Notifier 40 cfg *config.Config 41 42 chanMu sync.RWMutex 43 stdoutChans map[string]chan string 44 stderrChans map[string]chan string 45 46 cleanupMu sync.Mutex 47 cleanup map[string][]cleanupFunc 48} 49 50func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) { 51 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 52 if err != nil { 53 return nil, err 54 } 55 56 l := log.FromContext(ctx).With("component", "spindle") 57 58 e := &Engine{ 59 docker: dcli, 60 l: l, 61 db: db, 62 n: n, 63 cfg: cfg, 64 } 65 66 e.stdoutChans = make(map[string]chan string, 100) 67 e.stderrChans = make(map[string]chan string, 100) 68 69 e.cleanup = make(map[string][]cleanupFunc) 70 71 return e, nil 72} 73 74func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 75 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 76 77 wg := sync.WaitGroup{} 78 for _, w := range pipeline.Workflows { 79 wg.Add(1) 80 go func() error { 81 defer wg.Done() 82 wid := models.WorkflowId{ 83 PipelineId: pipelineId, 84 Name: w.Name, 85 } 86 87 err := e.db.StatusRunning(wid, e.n) 88 if err != nil { 89 return err 90 } 91 92 err = e.SetupWorkflow(ctx, wid) 93 if err != nil { 94 e.l.Error("setting up worklow", "wid", wid, "err", err) 95 return err 96 } 97 defer e.DestroyWorkflow(ctx, wid) 98 99 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{}) 100 if err != nil { 101 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error()) 102 103 err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 104 if err != nil { 105 return err 106 } 107 108 return fmt.Errorf("pulling image: %w", err) 109 } 110 defer reader.Close() 111 io.Copy(os.Stdout, reader) 112 113 err = e.StartSteps(ctx, w.Steps, wid, w.Image) 114 if err != nil { 115 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error()) 116 117 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n) 118 if dbErr != nil { 119 return dbErr 120 } 121 122 return fmt.Errorf("starting steps image: %w", err) 123 } 124 125 err = e.db.StatusSuccess(wid, e.n) 126 if err != nil { 127 return err 128 } 129 130 return nil 131 }() 132 } 133 134 wg.Wait() 135} 136 137// SetupWorkflow sets up a new network for the workflow and volumes for 138// the workspace and Nix store. These are persisted across steps and are 139// destroyed at the end of the workflow. 140func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error { 141 e.l.Info("setting up workflow", "workflow", wid) 142 143 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 144 Name: workspaceVolume(wid), 145 Driver: "local", 146 }) 147 if err != nil { 148 return err 149 } 150 e.registerCleanup(wid, func(ctx context.Context) error { 151 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 152 }) 153 154 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 155 Name: nixVolume(wid), 156 Driver: "local", 157 }) 158 if err != nil { 159 return err 160 } 161 e.registerCleanup(wid, func(ctx context.Context) error { 162 return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 163 }) 164 165 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 166 Driver: "bridge", 167 }) 168 if err != nil { 169 return err 170 } 171 e.registerCleanup(wid, func(ctx context.Context) error { 172 return e.docker.NetworkRemove(ctx, networkName(wid)) 173 }) 174 175 return nil 176} 177 178// StartSteps starts all steps sequentially with the same base image. 179// ONLY marks pipeline as failed if container's exit code is non-zero. 180// All other errors are bubbled up. 181// Fixed version of the step execution logic 182func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error { 183 stepTimeoutStr := e.cfg.Pipelines.StepTimeout 184 stepTimeout, err := time.ParseDuration(stepTimeoutStr) 185 if err != nil { 186 e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr) 187 stepTimeout = 5 * time.Minute 188 } 189 e.l.Info("using step timeout", "timeout", stepTimeout) 190 191 e.chanMu.Lock() 192 if _, exists := e.stdoutChans[wid.String()]; !exists { 193 e.stdoutChans[wid.String()] = make(chan string, 100) 194 } 195 if _, exists := e.stderrChans[wid.String()]; !exists { 196 e.stderrChans[wid.String()] = make(chan string, 100) 197 } 198 e.chanMu.Unlock() 199 200 // close channels after all steps are complete 201 defer func() { 202 close(e.stdoutChans[wid.String()]) 203 close(e.stderrChans[wid.String()]) 204 }() 205 206 for _, step := range steps { 207 envs := ConstructEnvs(step.Environment) 208 envs.AddEnv("HOME", workspaceDir) 209 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 210 211 hostConfig := hostConfig(wid) 212 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 213 Image: image, 214 Cmd: []string{"bash", "-c", step.Command}, 215 WorkingDir: workspaceDir, 216 Tty: false, 217 Hostname: "spindle", 218 Env: envs.Slice(), 219 }, hostConfig, nil, nil, "") 220 if err != nil { 221 return fmt.Errorf("creating container: %w", err) 222 } 223 224 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 225 if err != nil { 226 return fmt.Errorf("connecting network: %w", err) 227 } 228 229 stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout) 230 231 err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{}) 232 if err != nil { 233 stepCancel() 234 return err 235 } 236 e.l.Info("started container", "name", resp.ID, "step", step.Name) 237 238 // start tailing logs in background 239 tailDone := make(chan error, 1) 240 go func() { 241 tailDone <- e.TailStep(stepCtx, resp.ID, wid) 242 }() 243 244 // wait for container completion or timeout 245 waitDone := make(chan struct{}) 246 var state *container.State 247 var waitErr error 248 249 go func() { 250 defer close(waitDone) 251 state, waitErr = e.WaitStep(stepCtx, resp.ID) 252 }() 253 254 select { 255 case <-waitDone: 256 // container finished normally 257 stepCancel() 258 259 // wait for tailing to complete 260 <-tailDone 261 262 case <-stepCtx.Done(): 263 e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout) 264 265 _ = e.DestroyStep(ctx, resp.ID) 266 267 // wait for both goroutines to finish 268 <-waitDone 269 <-tailDone 270 271 stepCancel() 272 return fmt.Errorf("step timed out after %v", stepTimeout) 273 } 274 275 if waitErr != nil { 276 return waitErr 277 } 278 279 err = e.DestroyStep(ctx, resp.ID) 280 if err != nil { 281 return err 282 } 283 284 if state.ExitCode != 0 { 285 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 286 err := e.db.StatusFailed(wid, state.Error, int64(state.ExitCode), e.n) 287 if err != nil { 288 return err 289 } 290 return fmt.Errorf("error: %s, exit code: %d, oom: %t", state.Error, state.ExitCode, state.OOMKilled) 291 } 292 } 293 294 return nil 295} 296 297func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 298 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 299 select { 300 case err := <-errCh: 301 if err != nil { 302 return nil, err 303 } 304 case <-wait: 305 } 306 307 e.l.Info("waited for container", "name", containerID) 308 309 info, err := e.docker.ContainerInspect(ctx, containerID) 310 if err != nil { 311 return nil, err 312 } 313 314 return info.State, nil 315} 316 317func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error { 318 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 319 Follow: true, 320 ShowStdout: true, 321 ShowStderr: true, 322 Details: true, 323 Timestamps: false, 324 }) 325 if err != nil { 326 return err 327 } 328 329 var devOutput io.Writer = io.Discard 330 if e.cfg.Server.Dev { 331 devOutput = &ansiStrippingWriter{underlying: os.Stdout} 332 } 333 334 tee := io.TeeReader(logs, devOutput) 335 336 // using StdCopy we demux logs and stream stdout and stderr to different 337 // channels. 338 // 339 // stdout w||r stdoutCh 340 // stderr w||r stderrCh 341 // 342 343 rpipeOut, wpipeOut := io.Pipe() 344 rpipeErr, wpipeErr := io.Pipe() 345 346 wg := sync.WaitGroup{} 347 348 wg.Add(1) 349 go func() { 350 defer wg.Done() 351 defer wpipeOut.Close() 352 defer wpipeErr.Close() 353 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee) 354 if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) { 355 e.l.Error("failed to copy logs", "error", err) 356 } 357 }() 358 359 // read from stdout and send to stdout pipe 360 // NOTE: the stdoutCh channnel is closed further up in StartSteps 361 // once all steps are done. 362 wg.Add(1) 363 go func() { 364 defer wg.Done() 365 e.chanMu.RLock() 366 stdoutCh := e.stdoutChans[wid.String()] 367 e.chanMu.RUnlock() 368 369 scanner := bufio.NewScanner(rpipeOut) 370 for scanner.Scan() { 371 stdoutCh <- scanner.Text() 372 } 373 if err := scanner.Err(); err != nil { 374 e.l.Error("failed to scan stdout", "error", err) 375 } 376 }() 377 378 // read from stderr and send to stderr pipe 379 // NOTE: the stderrCh channnel is closed further up in StartSteps 380 // once all steps are done. 381 wg.Add(1) 382 go func() { 383 defer wg.Done() 384 e.chanMu.RLock() 385 stderrCh := e.stderrChans[wid.String()] 386 e.chanMu.RUnlock() 387 388 scanner := bufio.NewScanner(rpipeErr) 389 for scanner.Scan() { 390 stderrCh <- scanner.Text() 391 } 392 if err := scanner.Err(); err != nil { 393 e.l.Error("failed to scan stderr", "error", err) 394 } 395 }() 396 397 wg.Wait() 398 399 return nil 400} 401 402func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 403 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 404 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 405 return err 406 } 407 408 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 409 RemoveVolumes: true, 410 RemoveLinks: false, 411 Force: false, 412 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 413 return err 414 } 415 416 return nil 417} 418 419func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 420 e.cleanupMu.Lock() 421 key := wid.String() 422 423 fns := e.cleanup[key] 424 delete(e.cleanup, key) 425 e.cleanupMu.Unlock() 426 427 for _, fn := range fns { 428 if err := fn(ctx); err != nil { 429 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 430 } 431 } 432 return nil 433} 434 435func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) { 436 e.chanMu.RLock() 437 defer e.chanMu.RUnlock() 438 439 stdoutCh, ok1 := e.stdoutChans[wid.String()] 440 stderrCh, ok2 := e.stderrChans[wid.String()] 441 442 if !ok1 || !ok2 { 443 return nil, nil, false 444 } 445 return stdoutCh, stderrCh, true 446} 447 448func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 449 e.cleanupMu.Lock() 450 defer e.cleanupMu.Unlock() 451 452 key := wid.String() 453 e.cleanup[key] = append(e.cleanup[key], fn) 454} 455 456func workspaceVolume(wid models.WorkflowId) string { 457 return fmt.Sprintf("workspace-%s", wid) 458} 459 460func nixVolume(wid models.WorkflowId) string { 461 return fmt.Sprintf("nix-%s", wid) 462} 463 464func networkName(wid models.WorkflowId) string { 465 return fmt.Sprintf("workflow-network-%s", wid) 466} 467 468func hostConfig(wid models.WorkflowId) *container.HostConfig { 469 hostConfig := &container.HostConfig{ 470 Mounts: []mount.Mount{ 471 { 472 Type: mount.TypeVolume, 473 Source: workspaceVolume(wid), 474 Target: workspaceDir, 475 }, 476 { 477 Type: mount.TypeVolume, 478 Source: nixVolume(wid), 479 Target: "/nix", 480 }, 481 { 482 Type: mount.TypeTmpfs, 483 Target: "/tmp", 484 }, 485 }, 486 ReadonlyRootfs: false, 487 CapDrop: []string{"ALL"}, 488 SecurityOpt: []string{"seccomp=unconfined"}, 489 } 490 491 return hostConfig 492} 493 494// thanks woodpecker 495func isErrContainerNotFoundOrNotRunning(err error) bool { 496 // Error response from daemon: Cannot kill container: ...: No such container: ... 497 // Error response from daemon: Cannot kill container: ...: Container ... is not running" 498 // Error response from podman daemon: can only kill running containers. ... is in state exited 499 // Error: No such container: ... 500 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 501}