forked from tangled.org/core
this repo has no description
1package engine 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "strings" 11 "sync" 12 "time" 13 14 securejoin "github.com/cyphar/filepath-securejoin" 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" 18 "github.com/docker/docker/api/types/network" 19 "github.com/docker/docker/api/types/volume" 20 "github.com/docker/docker/client" 21 "github.com/docker/docker/pkg/stdcopy" 22 "golang.org/x/sync/errgroup" 23 "tangled.sh/tangled.sh/core/log" 24 "tangled.sh/tangled.sh/core/notifier" 25 "tangled.sh/tangled.sh/core/spindle/config" 26 "tangled.sh/tangled.sh/core/spindle/db" 27 "tangled.sh/tangled.sh/core/spindle/models" 28 "tangled.sh/tangled.sh/core/spindle/secrets" 29) 30 31const ( 32 workspaceDir = "/tangled/workspace" 33) 34 35type cleanupFunc func(context.Context) error 36 37type Engine struct { 38 docker client.APIClient 39 l *slog.Logger 40 db *db.DB 41 n *notifier.Notifier 42 cfg *config.Config 43 vault secrets.Manager 44 45 cleanupMu sync.Mutex 46 cleanup map[string][]cleanupFunc 47} 48 49func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier, vault secrets.Manager) (*Engine, error) { 50 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 51 if err != nil { 52 return nil, err 53 } 54 55 l := log.FromContext(ctx).With("component", "spindle") 56 57 e := &Engine{ 58 docker: dcli, 59 l: l, 60 db: db, 61 n: n, 62 cfg: cfg, 63 vault: vault, 64 } 65 66 e.cleanup = make(map[string][]cleanupFunc) 67 68 return e, nil 69} 70 71func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 72 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 73 74 // extract secrets 75 var allSecrets []secrets.UnlockedSecret 76 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil { 77 if res, err := e.vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil { 78 allSecrets = res 79 } 80 } 81 82 workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout 83 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 84 if err != nil { 85 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 86 workflowTimeout = 5 * time.Minute 87 } 88 e.l.Info("using workflow timeout", "timeout", workflowTimeout) 89 90 eg, ctx := errgroup.WithContext(ctx) 91 for _, w := range pipeline.Workflows { 92 eg.Go(func() error { 93 wid := models.WorkflowId{ 94 PipelineId: pipelineId, 95 Name: w.Name, 96 } 97 98 err := e.db.StatusRunning(wid, e.n) 99 if err != nil { 100 return err 101 } 102 103 err = e.SetupWorkflow(ctx, wid) 104 if err != nil { 105 e.l.Error("setting up worklow", "wid", wid, "err", err) 106 return err 107 } 108 defer e.DestroyWorkflow(ctx, wid) 109 110 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{}) 111 if err != nil { 112 e.l.Error("pipeline image pull failed!", "image", w.Image, "workflowId", wid, "error", err.Error()) 113 114 err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 115 if err != nil { 116 return err 117 } 118 119 return fmt.Errorf("pulling image: %w", err) 120 } 121 defer reader.Close() 122 io.Copy(os.Stdout, reader) 123 124 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 125 defer cancel() 126 127 err = e.StartSteps(ctx, wid, w, allSecrets) 128 if err != nil { 129 if errors.Is(err, ErrTimedOut) { 130 dbErr := e.db.StatusTimeout(wid, e.n) 131 if dbErr != nil { 132 return dbErr 133 } 134 } else { 135 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n) 136 if dbErr != nil { 137 return dbErr 138 } 139 } 140 141 return fmt.Errorf("starting steps image: %w", err) 142 } 143 144 err = e.db.StatusSuccess(wid, e.n) 145 if err != nil { 146 return err 147 } 148 149 return nil 150 }) 151 } 152 153 if err = eg.Wait(); err != nil { 154 e.l.Error("failed to run one or more workflows", "err", err) 155 } else { 156 e.l.Error("successfully ran full pipeline") 157 } 158} 159 160// SetupWorkflow sets up a new network for the workflow and volumes for 161// the workspace and Nix store. These are persisted across steps and are 162// destroyed at the end of the workflow. 163func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error { 164 e.l.Info("setting up workflow", "workflow", wid) 165 166 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 167 Name: workspaceVolume(wid), 168 Driver: "local", 169 }) 170 if err != nil { 171 return err 172 } 173 e.registerCleanup(wid, func(ctx context.Context) error { 174 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 175 }) 176 177 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 178 Name: nixVolume(wid), 179 Driver: "local", 180 }) 181 if err != nil { 182 return err 183 } 184 e.registerCleanup(wid, func(ctx context.Context) error { 185 return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 186 }) 187 188 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 189 Driver: "bridge", 190 }) 191 if err != nil { 192 return err 193 } 194 e.registerCleanup(wid, func(ctx context.Context) error { 195 return e.docker.NetworkRemove(ctx, networkName(wid)) 196 }) 197 198 return nil 199} 200 201// StartSteps starts all steps sequentially with the same base image. 202// ONLY marks pipeline as failed if container's exit code is non-zero. 203// All other errors are bubbled up. 204// Fixed version of the step execution logic 205func (e *Engine) StartSteps(ctx context.Context, wid models.WorkflowId, w models.Workflow, secrets []secrets.UnlockedSecret) error { 206 workflowEnvs := ConstructEnvs(w.Environment) 207 for _, s := range secrets { 208 workflowEnvs.AddEnv(s.Key, s.Value) 209 } 210 211 for stepIdx, step := range w.Steps { 212 select { 213 case <-ctx.Done(): 214 return ctx.Err() 215 default: 216 } 217 218 envs := append(EnvVars(nil), workflowEnvs...) 219 for k, v := range step.Environment { 220 envs.AddEnv(k, v) 221 } 222 envs.AddEnv("HOME", workspaceDir) 223 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 224 225 hostConfig := hostConfig(wid) 226 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 227 Image: w.Image, 228 Cmd: []string{"bash", "-c", step.Command}, 229 WorkingDir: workspaceDir, 230 Tty: false, 231 Hostname: "spindle", 232 Env: envs.Slice(), 233 }, hostConfig, nil, nil, "") 234 defer e.DestroyStep(ctx, resp.ID) 235 if err != nil { 236 return fmt.Errorf("creating container: %w", err) 237 } 238 239 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 240 if err != nil { 241 return fmt.Errorf("connecting network: %w", err) 242 } 243 244 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 245 if err != nil { 246 return err 247 } 248 e.l.Info("started container", "name", resp.ID, "step", step.Name) 249 250 // start tailing logs in background 251 tailDone := make(chan error, 1) 252 go func() { 253 tailDone <- e.TailStep(ctx, resp.ID, wid, stepIdx, step) 254 }() 255 256 // wait for container completion or timeout 257 waitDone := make(chan struct{}) 258 var state *container.State 259 var waitErr error 260 261 go func() { 262 defer close(waitDone) 263 state, waitErr = e.WaitStep(ctx, resp.ID) 264 }() 265 266 select { 267 case <-waitDone: 268 269 // wait for tailing to complete 270 <-tailDone 271 272 case <-ctx.Done(): 273 e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name) 274 err = e.DestroyStep(context.Background(), resp.ID) 275 if err != nil { 276 e.l.Error("failed to destroy step", "container", resp.ID, "error", err) 277 } 278 279 // wait for both goroutines to finish 280 <-waitDone 281 <-tailDone 282 283 return ErrTimedOut 284 } 285 286 select { 287 case <-ctx.Done(): 288 return ctx.Err() 289 default: 290 } 291 292 if waitErr != nil { 293 return waitErr 294 } 295 296 err = e.DestroyStep(ctx, resp.ID) 297 if err != nil { 298 return err 299 } 300 301 if state.ExitCode != 0 { 302 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 303 if state.OOMKilled { 304 return ErrOOMKilled 305 } 306 return ErrWorkflowFailed 307 } 308 } 309 310 return nil 311} 312 313func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 314 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 315 select { 316 case err := <-errCh: 317 if err != nil { 318 return nil, err 319 } 320 case <-wait: 321 } 322 323 e.l.Info("waited for container", "name", containerID) 324 325 info, err := e.docker.ContainerInspect(ctx, containerID) 326 if err != nil { 327 return nil, err 328 } 329 330 return info.State, nil 331} 332 333func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 334 wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid) 335 if err != nil { 336 e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 337 return err 338 } 339 defer wfLogger.Close() 340 341 ctl := wfLogger.ControlWriter(stepIdx, step) 342 ctl.Write([]byte(step.Name)) 343 344 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 345 Follow: true, 346 ShowStdout: true, 347 ShowStderr: true, 348 Details: false, 349 Timestamps: false, 350 }) 351 if err != nil { 352 return err 353 } 354 355 _, err = stdcopy.StdCopy( 356 wfLogger.DataWriter("stdout"), 357 wfLogger.DataWriter("stderr"), 358 logs, 359 ) 360 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 361 return fmt.Errorf("failed to copy logs: %w", err) 362 } 363 364 return nil 365} 366 367func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 368 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 369 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 370 return err 371 } 372 373 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 374 RemoveVolumes: true, 375 RemoveLinks: false, 376 Force: false, 377 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 378 return err 379 } 380 381 return nil 382} 383 384func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 385 e.cleanupMu.Lock() 386 key := wid.String() 387 388 fns := e.cleanup[key] 389 delete(e.cleanup, key) 390 e.cleanupMu.Unlock() 391 392 for _, fn := range fns { 393 if err := fn(ctx); err != nil { 394 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 395 } 396 } 397 return nil 398} 399 400func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 401 e.cleanupMu.Lock() 402 defer e.cleanupMu.Unlock() 403 404 key := wid.String() 405 e.cleanup[key] = append(e.cleanup[key], fn) 406} 407 408func workspaceVolume(wid models.WorkflowId) string { 409 return fmt.Sprintf("workspace-%s", wid) 410} 411 412func nixVolume(wid models.WorkflowId) string { 413 return fmt.Sprintf("nix-%s", wid) 414} 415 416func networkName(wid models.WorkflowId) string { 417 return fmt.Sprintf("workflow-network-%s", wid) 418} 419 420func hostConfig(wid models.WorkflowId) *container.HostConfig { 421 hostConfig := &container.HostConfig{ 422 Mounts: []mount.Mount{ 423 { 424 Type: mount.TypeVolume, 425 Source: workspaceVolume(wid), 426 Target: workspaceDir, 427 }, 428 { 429 Type: mount.TypeVolume, 430 Source: nixVolume(wid), 431 Target: "/nix", 432 }, 433 { 434 Type: mount.TypeTmpfs, 435 Target: "/tmp", 436 ReadOnly: false, 437 TmpfsOptions: &mount.TmpfsOptions{ 438 Mode: 0o1777, // world-writeable sticky bit 439 Options: [][]string{ 440 {"exec"}, 441 }, 442 }, 443 }, 444 { 445 Type: mount.TypeVolume, 446 Source: "etc-nix-" + wid.String(), 447 Target: "/etc/nix", 448 }, 449 }, 450 ReadonlyRootfs: false, 451 CapDrop: []string{"ALL"}, 452 CapAdd: []string{"CAP_DAC_OVERRIDE"}, 453 SecurityOpt: []string{"no-new-privileges"}, 454 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 455 } 456 457 return hostConfig 458} 459 460// thanks woodpecker 461func isErrContainerNotFoundOrNotRunning(err error) bool { 462 // Error response from daemon: Cannot kill container: ...: No such container: ... 463 // Error response from daemon: Cannot kill container: ...: Container ... is not running" 464 // Error response from podman daemon: can only kill running containers. ... is in state exited 465 // Error: No such container: ... 466 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 467}