forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at test-ci 11 kB view raw
1package engine 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/docker/docker/api/types/container" 15 "github.com/docker/docker/api/types/image" 16 "github.com/docker/docker/api/types/mount" 17 "github.com/docker/docker/api/types/network" 18 "github.com/docker/docker/api/types/volume" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "tangled.sh/tangled.sh/core/log" 22 "tangled.sh/tangled.sh/core/notifier" 23 "tangled.sh/tangled.sh/core/spindle/config" 24 "tangled.sh/tangled.sh/core/spindle/db" 25 "tangled.sh/tangled.sh/core/spindle/models" 26) 27 28const ( 29 workspaceDir = "/tangled/workspace" 30) 31 32type cleanupFunc func(context.Context) error 33 34type Engine struct { 35 docker client.APIClient 36 l *slog.Logger 37 db *db.DB 38 n *notifier.Notifier 39 cfg *config.Config 40 41 cleanupMu sync.Mutex 42 cleanup map[string][]cleanupFunc 43} 44 45func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) { 46 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 47 if err != nil { 48 return nil, err 49 } 50 51 l := log.FromContext(ctx).With("component", "spindle") 52 53 e := &Engine{ 54 docker: dcli, 55 l: l, 56 db: db, 57 n: n, 58 cfg: cfg, 59 } 60 61 e.cleanup = make(map[string][]cleanupFunc) 62 63 return e, nil 64} 65 66func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 67 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 68 69 wg := sync.WaitGroup{} 70 for _, w := range pipeline.Workflows { 71 wg.Add(1) 72 go func() error { 73 defer wg.Done() 74 wid := models.WorkflowId{ 75 PipelineId: pipelineId, 76 Name: w.Name, 77 } 78 79 err := e.db.StatusRunning(wid, e.n) 80 if err != nil { 81 return err 82 } 83 84 err = e.SetupWorkflow(ctx, wid) 85 if err != nil { 86 e.l.Error("setting up worklow", "wid", wid, "err", err) 87 return err 88 } 89 defer e.DestroyWorkflow(ctx, wid) 90 91 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{}) 92 if err != nil { 93 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error()) 94 95 err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 96 if err != nil { 97 return err 98 } 99 100 return fmt.Errorf("pulling image: %w", err) 101 } 102 defer reader.Close() 103 io.Copy(os.Stdout, reader) 104 105 workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout 106 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 107 if err != nil { 108 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 109 workflowTimeout = 5 * time.Minute 110 } 111 e.l.Info("using workflow timeout", "timeout", workflowTimeout) 112 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 113 defer cancel() 114 115 err = e.StartSteps(ctx, w.Steps, wid, w.Image) 116 if err != nil { 117 if errors.Is(err, ErrTimedOut) { 118 dbErr := e.db.StatusTimeout(wid, e.n) 119 if dbErr != nil { 120 return dbErr 121 } 122 } else { 123 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n) 124 if dbErr != nil { 125 return dbErr 126 } 127 } 128 129 return fmt.Errorf("starting steps image: %w", err) 130 } 131 132 err = e.db.StatusSuccess(wid, e.n) 133 if err != nil { 134 return err 135 } 136 137 return nil 138 }() 139 } 140 141 wg.Wait() 142} 143 144// SetupWorkflow sets up a new network for the workflow and volumes for 145// the workspace and Nix store. These are persisted across steps and are 146// destroyed at the end of the workflow. 147func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error { 148 e.l.Info("setting up workflow", "workflow", wid) 149 150 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 151 Name: workspaceVolume(wid), 152 Driver: "local", 153 }) 154 if err != nil { 155 return err 156 } 157 e.registerCleanup(wid, func(ctx context.Context) error { 158 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 159 }) 160 161 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 162 Name: nixVolume(wid), 163 Driver: "local", 164 }) 165 if err != nil { 166 return err 167 } 168 e.registerCleanup(wid, func(ctx context.Context) error { 169 return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 170 }) 171 172 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 173 Driver: "bridge", 174 }) 175 if err != nil { 176 return err 177 } 178 e.registerCleanup(wid, func(ctx context.Context) error { 179 return e.docker.NetworkRemove(ctx, networkName(wid)) 180 }) 181 182 return nil 183} 184 185// StartSteps starts all steps sequentially with the same base image. 186// ONLY marks pipeline as failed if container's exit code is non-zero. 187// All other errors are bubbled up. 188// Fixed version of the step execution logic 189func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error { 190 191 for stepIdx, step := range steps { 192 select { 193 case <-ctx.Done(): 194 return ctx.Err() 195 default: 196 } 197 198 envs := ConstructEnvs(step.Environment) 199 envs.AddEnv("HOME", workspaceDir) 200 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 201 202 hostConfig := hostConfig(wid) 203 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 204 Image: image, 205 Cmd: []string{"bash", "-c", step.Command}, 206 WorkingDir: workspaceDir, 207 Tty: false, 208 Hostname: "spindle", 209 Env: envs.Slice(), 210 }, hostConfig, nil, nil, "") 211 defer e.DestroyStep(ctx, resp.ID) 212 if err != nil { 213 return fmt.Errorf("creating container: %w", err) 214 } 215 216 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 217 if err != nil { 218 return fmt.Errorf("connecting network: %w", err) 219 } 220 221 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 222 if err != nil { 223 return err 224 } 225 e.l.Info("started container", "name", resp.ID, "step", step.Name) 226 227 // start tailing logs in background 228 tailDone := make(chan error, 1) 229 go func() { 230 tailDone <- e.TailStep(ctx, resp.ID, wid, stepIdx, step) 231 }() 232 233 // wait for container completion or timeout 234 waitDone := make(chan struct{}) 235 var state *container.State 236 var waitErr error 237 238 go func() { 239 defer close(waitDone) 240 state, waitErr = e.WaitStep(ctx, resp.ID) 241 }() 242 243 select { 244 case <-waitDone: 245 246 // wait for tailing to complete 247 <-tailDone 248 249 case <-ctx.Done(): 250 e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name) 251 err = e.DestroyStep(context.Background(), resp.ID) 252 if err != nil { 253 e.l.Error("failed to destroy step", "container", resp.ID, "error", err) 254 } 255 256 // wait for both goroutines to finish 257 <-waitDone 258 <-tailDone 259 260 return ErrTimedOut 261 } 262 263 select { 264 case <-ctx.Done(): 265 return ctx.Err() 266 default: 267 } 268 269 if waitErr != nil { 270 return waitErr 271 } 272 273 err = e.DestroyStep(ctx, resp.ID) 274 if err != nil { 275 return err 276 } 277 278 if state.ExitCode != 0 { 279 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 280 if state.OOMKilled { 281 return ErrOOMKilled 282 } 283 return ErrWorkflowFailed 284 } 285 } 286 287 return nil 288} 289 290func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 291 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 292 select { 293 case err := <-errCh: 294 if err != nil { 295 return nil, err 296 } 297 case <-wait: 298 } 299 300 e.l.Info("waited for container", "name", containerID) 301 302 info, err := e.docker.ContainerInspect(ctx, containerID) 303 if err != nil { 304 return nil, err 305 } 306 307 return info.State, nil 308} 309 310func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 311 wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid) 312 if err != nil { 313 e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 314 return err 315 } 316 defer wfLogger.Close() 317 318 ctl := wfLogger.ControlWriter(stepIdx, step) 319 ctl.Write([]byte(step.Name)) 320 321 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 322 Follow: true, 323 ShowStdout: true, 324 ShowStderr: true, 325 Details: false, 326 Timestamps: false, 327 }) 328 if err != nil { 329 return err 330 } 331 332 _, err = stdcopy.StdCopy( 333 wfLogger.DataWriter("stdout"), 334 wfLogger.DataWriter("stderr"), 335 logs, 336 ) 337 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 338 return fmt.Errorf("failed to copy logs: %w", err) 339 } 340 341 return nil 342} 343 344func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 345 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 346 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 347 return err 348 } 349 350 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 351 RemoveVolumes: true, 352 RemoveLinks: false, 353 Force: false, 354 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 355 return err 356 } 357 358 return nil 359} 360 361func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 362 e.cleanupMu.Lock() 363 key := wid.String() 364 365 fns := e.cleanup[key] 366 delete(e.cleanup, key) 367 e.cleanupMu.Unlock() 368 369 for _, fn := range fns { 370 if err := fn(ctx); err != nil { 371 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 372 } 373 } 374 return nil 375} 376 377func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 378 e.cleanupMu.Lock() 379 defer e.cleanupMu.Unlock() 380 381 key := wid.String() 382 e.cleanup[key] = append(e.cleanup[key], fn) 383} 384 385func workspaceVolume(wid models.WorkflowId) string { 386 return fmt.Sprintf("workspace-%s", wid) 387} 388 389func nixVolume(wid models.WorkflowId) string { 390 return fmt.Sprintf("nix-%s", wid) 391} 392 393func networkName(wid models.WorkflowId) string { 394 return fmt.Sprintf("workflow-network-%s", wid) 395} 396 397func hostConfig(wid models.WorkflowId) *container.HostConfig { 398 hostConfig := &container.HostConfig{ 399 Mounts: []mount.Mount{ 400 { 401 Type: mount.TypeVolume, 402 Source: workspaceVolume(wid), 403 Target: workspaceDir, 404 }, 405 { 406 Type: mount.TypeVolume, 407 Source: nixVolume(wid), 408 Target: "/nix", 409 }, 410 { 411 Type: mount.TypeTmpfs, 412 Target: "/tmp", 413 ReadOnly: false, 414 TmpfsOptions: &mount.TmpfsOptions{ 415 Mode: 0o1777, // world-writeable sticky bit 416 Options: [][]string{ 417 {"exec"}, 418 }, 419 }, 420 }, 421 { 422 Type: mount.TypeVolume, 423 Source: "etc-nix-" + wid.String(), 424 Target: "/etc/nix", 425 }, 426 }, 427 ReadonlyRootfs: false, 428 CapDrop: []string{"ALL"}, 429 CapAdd: []string{"CAP_DAC_OVERRIDE"}, 430 SecurityOpt: []string{"no-new-privileges"}, 431 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 432 } 433 434 return hostConfig 435} 436 437// thanks woodpecker 438func isErrContainerNotFoundOrNotRunning(err error) bool { 439 // Error response from daemon: Cannot kill container: ...: No such container: ... 440 // Error response from daemon: Cannot kill container: ...: Container ... is not running" 441 // Error response from podman daemon: can only kill running containers. ... is in state exited 442 // Error: No such container: ... 443 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 444}