forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package nixery 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "path" 11 "runtime" 12 "sync" 13 "time" 14 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" 18 "github.com/docker/docker/api/types/network" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "gopkg.in/yaml.v3" 22 "tangled.sh/tangled.sh/core/api/tangled" 23 "tangled.sh/tangled.sh/core/log" 24 "tangled.sh/tangled.sh/core/spindle/config" 25 "tangled.sh/tangled.sh/core/spindle/engine" 26 "tangled.sh/tangled.sh/core/spindle/models" 27 "tangled.sh/tangled.sh/core/spindle/secrets" 28) 29 30const ( 31 workspaceDir = "/tangled/workspace" 32 homeDir = "/tangled/home" 33) 34 35type cleanupFunc func(context.Context) error 36 37type Engine struct { 38 docker client.APIClient 39 l *slog.Logger 40 cfg *config.Config 41 42 cleanupMu sync.Mutex 43 cleanup map[string][]cleanupFunc 44} 45 46type Step struct { 47 name string 48 kind models.StepKind 49 command string 50 environment map[string]string 51} 52 53func (s Step) Name() string { 54 return s.name 55} 56 57func (s Step) Command() string { 58 return s.command 59} 60 61func (s Step) Kind() models.StepKind { 62 return s.kind 63} 64 65// setupSteps get added to start of Steps 66type setupSteps []models.Step 67 68// addStep adds a step to the beginning of the workflow's steps. 69func (ss *setupSteps) addStep(step models.Step) { 70 *ss = append(*ss, step) 71} 72 73type addlFields struct { 74 image string 75 container string 76 env map[string]string 77} 78 79func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 80 swf := &models.Workflow{} 81 addl := addlFields{} 82 83 dwf := &struct { 84 Steps []struct { 85 Command string `yaml:"command"` 86 Name string `yaml:"name"` 87 Environment map[string]string `yaml:"environment"` 88 } `yaml:"steps"` 89 Dependencies map[string][]string `yaml:"dependencies"` 90 Environment map[string]string `yaml:"environment"` 91 }{} 92 err := yaml.Unmarshal([]byte(twf.Raw), &dwf) 93 if err != nil { 94 return nil, err 95 } 96 97 for _, dstep := range dwf.Steps { 98 sstep := Step{} 99 sstep.environment = dstep.Environment 100 sstep.command = dstep.Command 101 sstep.name = dstep.Name 102 sstep.kind = models.StepKindUser 103 swf.Steps = append(swf.Steps, sstep) 104 } 105 swf.Name = twf.Name 106 addl.env = dwf.Environment 107 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery) 108 109 setup := &setupSteps{} 110 111 setup.addStep(nixConfStep()) 112 setup.addStep(cloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) 113 // this step could be empty 114 if s := dependencyStep(dwf.Dependencies); s != nil { 115 setup.addStep(*s) 116 } 117 118 // append setup steps in order to the start of workflow steps 119 swf.Steps = append(*setup, swf.Steps...) 120 swf.Data = addl 121 122 return swf, nil 123} 124 125func (e *Engine) WorkflowTimeout() time.Duration { 126 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout 127 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 128 if err != nil { 129 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 130 workflowTimeout = 5 * time.Minute 131 } 132 133 return workflowTimeout 134} 135 136func workflowImage(deps map[string][]string, nixery string) string { 137 var dependencies string 138 for reg, ds := range deps { 139 if reg == "nixpkgs" { 140 dependencies = path.Join(ds...) 141 } 142 } 143 144 // load defaults from somewhere else 145 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 146 147 if runtime.GOARCH == "arm64" { 148 dependencies = path.Join("arm64", dependencies) 149 } 150 151 return path.Join(nixery, dependencies) 152} 153 154func New(ctx context.Context, cfg *config.Config) (*Engine, error) { 155 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 156 if err != nil { 157 return nil, err 158 } 159 160 l := log.FromContext(ctx).With("component", "spindle") 161 162 e := &Engine{ 163 docker: dcli, 164 l: l, 165 cfg: cfg, 166 } 167 168 e.cleanup = make(map[string][]cleanupFunc) 169 170 return e, nil 171} 172 173func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 174 e.l.Info("setting up workflow", "workflow", wid) 175 176 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 177 Driver: "bridge", 178 }) 179 if err != nil { 180 return err 181 } 182 e.registerCleanup(wid, func(ctx context.Context) error { 183 return e.docker.NetworkRemove(ctx, networkName(wid)) 184 }) 185 186 addl := wf.Data.(addlFields) 187 188 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 189 if err != nil { 190 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 191 192 return fmt.Errorf("pulling image: %w", err) 193 } 194 defer reader.Close() 195 io.Copy(os.Stdout, reader) 196 197 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 198 Image: addl.image, 199 Cmd: []string{"cat"}, 200 OpenStdin: true, // so cat stays alive :3 201 Tty: false, 202 Hostname: "spindle", 203 WorkingDir: workspaceDir, 204 Labels: map[string]string{ 205 "sh.tangled.pipeline/workflow_id": wid.String(), 206 }, 207 // TODO(winter): investigate whether environment variables passed here 208 // get propagated to ContainerExec processes 209 }, &container.HostConfig{ 210 Mounts: []mount.Mount{ 211 { 212 Type: mount.TypeTmpfs, 213 Target: "/tmp", 214 ReadOnly: false, 215 TmpfsOptions: &mount.TmpfsOptions{ 216 Mode: 0o1777, // world-writeable sticky bit 217 Options: [][]string{ 218 {"exec"}, 219 }, 220 }, 221 }, 222 }, 223 ReadonlyRootfs: false, 224 CapDrop: []string{"ALL"}, 225 CapAdd: []string{"CAP_DAC_OVERRIDE"}, 226 SecurityOpt: []string{"no-new-privileges"}, 227 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 228 }, nil, nil, "") 229 if err != nil { 230 return fmt.Errorf("creating container: %w", err) 231 } 232 e.registerCleanup(wid, func(ctx context.Context) error { 233 err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) 234 if err != nil { 235 return err 236 } 237 238 return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 239 RemoveVolumes: true, 240 RemoveLinks: false, 241 Force: false, 242 }) 243 }) 244 245 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 246 if err != nil { 247 return fmt.Errorf("starting container: %w", err) 248 } 249 250 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 251 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir}, 252 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? 253 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") 254 }) 255 if err != nil { 256 return err 257 } 258 259 // This actually *starts* the command. Thanks, Docker! 260 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 261 if err != nil { 262 return err 263 } 264 defer execResp.Close() 265 266 // This is apparently best way to wait for the command to complete. 267 _, err = io.ReadAll(execResp.Reader) 268 if err != nil { 269 return err 270 } 271 272 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 273 if err != nil { 274 return err 275 } 276 277 if execInspectResp.ExitCode != 0 { 278 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 279 } else if execInspectResp.Running { 280 return errors.New("mkdir is somehow still running??") 281 } 282 283 addl.container = resp.ID 284 wf.Data = addl 285 286 return nil 287} 288 289func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 290 addl := w.Data.(addlFields) 291 workflowEnvs := ConstructEnvs(addl.env) 292 // TODO(winter): should SetupWorkflow also have secret access? 293 // IMO yes, but probably worth thinking on. 294 for _, s := range secrets { 295 workflowEnvs.AddEnv(s.Key, s.Value) 296 } 297 298 step := w.Steps[idx].(Step) 299 300 select { 301 case <-ctx.Done(): 302 return ctx.Err() 303 default: 304 } 305 306 envs := append(EnvVars(nil), workflowEnvs...) 307 for k, v := range step.environment { 308 envs.AddEnv(k, v) 309 } 310 envs.AddEnv("HOME", homeDir) 311 312 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ 313 Cmd: []string{"bash", "-c", step.command}, 314 AttachStdout: true, 315 AttachStderr: true, 316 Env: envs, 317 }) 318 if err != nil { 319 return fmt.Errorf("creating exec: %w", err) 320 } 321 322 // start tailing logs in background 323 tailDone := make(chan error, 1) 324 go func() { 325 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step) 326 }() 327 328 select { 329 case <-tailDone: 330 331 case <-ctx.Done(): 332 // cleanup will be handled by DestroyWorkflow, since 333 // Docker doesn't provide an API to kill an exec run 334 // (sure, we could grab the PID and kill it ourselves, 335 // but that's wasted effort) 336 e.l.Warn("step timed out", "step", step.Name) 337 338 <-tailDone 339 340 return engine.ErrTimedOut 341 } 342 343 select { 344 case <-ctx.Done(): 345 return ctx.Err() 346 default: 347 } 348 349 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 350 if err != nil { 351 return err 352 } 353 354 if execInspectResp.ExitCode != 0 { 355 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) 356 if err != nil { 357 return err 358 } 359 360 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 361 362 if inspectResp.State.OOMKilled { 363 return ErrOOMKilled 364 } 365 return engine.ErrWorkflowFailed 366 } 367 368 return nil 369} 370 371func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 372 if wfLogger == nil { 373 return nil 374 } 375 376 // This actually *starts* the command. Thanks, Docker! 377 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) 378 if err != nil { 379 return err 380 } 381 defer logs.Close() 382 383 _, err = stdcopy.StdCopy( 384 wfLogger.DataWriter("stdout"), 385 wfLogger.DataWriter("stderr"), 386 logs.Reader, 387 ) 388 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 389 return fmt.Errorf("failed to copy logs: %w", err) 390 } 391 392 return nil 393} 394 395func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 396 e.cleanupMu.Lock() 397 key := wid.String() 398 399 fns := e.cleanup[key] 400 delete(e.cleanup, key) 401 e.cleanupMu.Unlock() 402 403 for _, fn := range fns { 404 if err := fn(ctx); err != nil { 405 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 406 } 407 } 408 return nil 409} 410 411func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 412 e.cleanupMu.Lock() 413 defer e.cleanupMu.Unlock() 414 415 key := wid.String() 416 e.cleanup[key] = append(e.cleanup[key], fn) 417} 418 419func networkName(wid models.WorkflowId) string { 420 return fmt.Sprintf("workflow-network-%s", wid) 421}