forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package engine 2 3import ( 4 "bufio" 5 "context" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "path" 11 "strings" 12 "sync" 13 14 "github.com/docker/docker/api/types/container" 15 "github.com/docker/docker/api/types/image" 16 "github.com/docker/docker/api/types/mount" 17 "github.com/docker/docker/api/types/network" 18 "github.com/docker/docker/api/types/volume" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "tangled.sh/tangled.sh/core/api/tangled" 22 "tangled.sh/tangled.sh/core/log" 23 "tangled.sh/tangled.sh/core/notifier" 24 "tangled.sh/tangled.sh/core/spindle/db" 25 "tangled.sh/tangled.sh/core/spindle/models" 26) 27 28const ( 29 workspaceDir = "/tangled/workspace" 30) 31 32type cleanupFunc func(context.Context) error 33 34type Engine struct { 35 docker client.APIClient 36 l *slog.Logger 37 db *db.DB 38 n *notifier.Notifier 39 40 chanMu sync.RWMutex 41 stdoutChans map[string]chan string 42 stderrChans map[string]chan string 43 44 cleanupMu sync.Mutex 45 cleanup map[string][]cleanupFunc 46} 47 48func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { 49 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 50 if err != nil { 51 return nil, err 52 } 53 54 l := log.FromContext(ctx).With("component", "spindle") 55 56 e := &Engine{ 57 docker: dcli, 58 l: l, 59 db: db, 60 n: n, 61 } 62 63 e.stdoutChans = make(map[string]chan string, 100) 64 e.stderrChans = make(map[string]chan string, 100) 65 66 e.cleanup = make(map[string][]cleanupFunc) 67 68 return e, nil 69} 70 71func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, pipelineId models.PipelineId) { 72 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 73 74 wg := sync.WaitGroup{} 75 for _, w := range pipeline.Workflows { 76 wg.Add(1) 77 go func() error { 78 defer wg.Done() 79 wid := models.WorkflowId{ 80 PipelineId: pipelineId, 81 Name: w.Name, 82 } 83 84 err := e.db.StatusRunning(wid, e.n) 85 if err != nil { 86 return err 87 } 88 89 err = e.SetupWorkflow(ctx, wid) 90 if err != nil { 91 e.l.Error("setting up worklow", "wid", wid, "err", err) 92 return err 93 } 94 defer e.DestroyWorkflow(ctx, wid) 95 96 // TODO: actual checks for image/registry etc. 97 var deps string 98 for _, d := range w.Dependencies { 99 if d.Registry == "nixpkgs" { 100 deps = path.Join(d.Packages...) 101 } 102 } 103 104 // load defaults from somewhere else 105 deps = path.Join(deps, "bash", "git", "coreutils", "nix") 106 107 cimg := path.Join("nixery.dev", deps) 108 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{}) 109 if err != nil { 110 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error()) 111 112 err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 113 if err != nil { 114 return err 115 } 116 117 return fmt.Errorf("pulling image: %w", err) 118 } 119 defer reader.Close() 120 io.Copy(os.Stdout, reader) 121 122 err = e.StartSteps(ctx, w.Steps, wid, cimg) 123 if err != nil { 124 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error()) 125 126 err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 127 if err != nil { 128 return err 129 } 130 } 131 132 err = e.db.StatusSuccess(wid, e.n) 133 if err != nil { 134 return err 135 } 136 137 return nil 138 }() 139 } 140 141 wg.Wait() 142} 143 144// SetupWorkflow sets up a new network for the workflow and volumes for 145// the workspace and Nix store. These are persisted across steps and are 146// destroyed at the end of the workflow. 147func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error { 148 e.l.Info("setting up workflow", "workflow", wid) 149 150 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 151 Name: workspaceVolume(wid), 152 Driver: "local", 153 }) 154 if err != nil { 155 return err 156 } 157 e.registerCleanup(wid, func(ctx context.Context) error { 158 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 159 }) 160 161 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 162 Name: nixVolume(wid), 163 Driver: "local", 164 }) 165 if err != nil { 166 return err 167 } 168 e.registerCleanup(wid, func(ctx context.Context) error { 169 return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 170 }) 171 172 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 173 Driver: "bridge", 174 }) 175 if err != nil { 176 return err 177 } 178 e.registerCleanup(wid, func(ctx context.Context) error { 179 return e.docker.NetworkRemove(ctx, networkName(wid)) 180 }) 181 182 return nil 183} 184 185// StartSteps starts all steps sequentially with the same base image. 186// ONLY marks pipeline as failed if container's exit code is non-zero. 187// All other errors are bubbled up. 188func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, wid models.WorkflowId, image string) error { 189 // set up logging channels 190 e.chanMu.Lock() 191 if _, exists := e.stdoutChans[wid.String()]; !exists { 192 e.stdoutChans[wid.String()] = make(chan string, 100) 193 } 194 if _, exists := e.stderrChans[wid.String()]; !exists { 195 e.stderrChans[wid.String()] = make(chan string, 100) 196 } 197 e.chanMu.Unlock() 198 199 // close channels after all steps are complete 200 defer func() { 201 close(e.stdoutChans[wid.String()]) 202 close(e.stderrChans[wid.String()]) 203 }() 204 205 for _, step := range steps { 206 envs := ConstructEnvs(step.Environment) 207 envs.AddEnv("HOME", workspaceDir) 208 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 209 210 hostConfig := hostConfig(wid) 211 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 212 Image: image, 213 Cmd: []string{"bash", "-c", step.Command}, 214 WorkingDir: workspaceDir, 215 Tty: false, 216 Hostname: "spindle", 217 Env: envs.Slice(), 218 }, hostConfig, nil, nil, "") 219 if err != nil { 220 return fmt.Errorf("creating container: %w", err) 221 } 222 223 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 224 if err != nil { 225 return fmt.Errorf("connecting network: %w", err) 226 } 227 228 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 229 if err != nil { 230 return err 231 } 232 e.l.Info("started container", "name", resp.ID, "step", step.Name) 233 234 wg := sync.WaitGroup{} 235 236 wg.Add(1) 237 go func() { 238 defer wg.Done() 239 err := e.TailStep(ctx, resp.ID, wid) 240 if err != nil { 241 e.l.Error("failed to tail container", "container", resp.ID) 242 return 243 } 244 }() 245 246 // wait until all logs are piped 247 wg.Wait() 248 249 state, err := e.WaitStep(ctx, resp.ID) 250 if err != nil { 251 return err 252 } 253 254 err = e.DestroyStep(ctx, resp.ID) 255 if err != nil { 256 return err 257 } 258 259 if state.ExitCode != 0 { 260 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode) 261 // return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n) 262 } 263 } 264 265 return nil 266 267} 268 269func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 270 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 271 select { 272 case err := <-errCh: 273 if err != nil { 274 return nil, err 275 } 276 case <-wait: 277 } 278 279 e.l.Info("waited for container", "name", containerID) 280 281 info, err := e.docker.ContainerInspect(ctx, containerID) 282 if err != nil { 283 return nil, err 284 } 285 286 return info.State, nil 287} 288 289func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error { 290 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 291 Follow: true, 292 ShowStdout: true, 293 ShowStderr: true, 294 Details: false, 295 Timestamps: false, 296 }) 297 if err != nil { 298 return err 299 } 300 301 // using StdCopy we demux logs and stream stdout and stderr to different 302 // channels. 303 // 304 // stdout w||r stdoutCh 305 // stderr w||r stderrCh 306 // 307 308 rpipeOut, wpipeOut := io.Pipe() 309 rpipeErr, wpipeErr := io.Pipe() 310 311 go func() { 312 defer wpipeOut.Close() 313 defer wpipeErr.Close() 314 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs) 315 if err != nil && err != io.EOF { 316 e.l.Error("failed to copy logs", "error", err) 317 } 318 }() 319 320 // read from stdout and send to stdout pipe 321 // NOTE: the stdoutCh channnel is closed further up in StartSteps 322 // once all steps are done. 323 go func() { 324 e.chanMu.RLock() 325 stdoutCh := e.stdoutChans[wid.String()] 326 e.chanMu.RUnlock() 327 328 scanner := bufio.NewScanner(rpipeOut) 329 for scanner.Scan() { 330 stdoutCh <- scanner.Text() 331 } 332 if err := scanner.Err(); err != nil { 333 e.l.Error("failed to scan stdout", "error", err) 334 } 335 }() 336 337 // read from stderr and send to stderr pipe 338 // NOTE: the stderrCh channnel is closed further up in StartSteps 339 // once all steps are done. 340 go func() { 341 e.chanMu.RLock() 342 stderrCh := e.stderrChans[wid.String()] 343 e.chanMu.RUnlock() 344 345 scanner := bufio.NewScanner(rpipeErr) 346 for scanner.Scan() { 347 stderrCh <- scanner.Text() 348 } 349 if err := scanner.Err(); err != nil { 350 e.l.Error("failed to scan stderr", "error", err) 351 } 352 }() 353 354 return nil 355} 356 357func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 358 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 359 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 360 return err 361 } 362 363 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 364 RemoveVolumes: true, 365 RemoveLinks: false, 366 Force: false, 367 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 368 return err 369 } 370 371 return nil 372} 373 374func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 375 e.cleanupMu.Lock() 376 key := wid.String() 377 378 fns := e.cleanup[key] 379 delete(e.cleanup, key) 380 e.cleanupMu.Unlock() 381 382 for _, fn := range fns { 383 if err := fn(ctx); err != nil { 384 e.l.Error("failed to cleanup workflow resource", "workflowId", wid) 385 } 386 } 387 return nil 388} 389 390func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) { 391 e.chanMu.RLock() 392 defer e.chanMu.RUnlock() 393 394 stdoutCh, ok1 := e.stdoutChans[wid.String()] 395 stderrCh, ok2 := e.stderrChans[wid.String()] 396 397 if !ok1 || !ok2 { 398 return nil, nil, false 399 } 400 return stdoutCh, stderrCh, true 401} 402 403func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 404 e.cleanupMu.Lock() 405 defer e.cleanupMu.Unlock() 406 407 key := wid.String() 408 e.cleanup[key] = append(e.cleanup[key], fn) 409} 410 411func workspaceVolume(wid models.WorkflowId) string { 412 return fmt.Sprintf("workspace-%s", wid) 413} 414 415func nixVolume(wid models.WorkflowId) string { 416 return fmt.Sprintf("nix-%s", wid) 417} 418 419func networkName(wid models.WorkflowId) string { 420 return fmt.Sprintf("workflow-network-%s", wid) 421} 422 423func hostConfig(wid models.WorkflowId) *container.HostConfig { 424 hostConfig := &container.HostConfig{ 425 Mounts: []mount.Mount{ 426 { 427 Type: mount.TypeVolume, 428 Source: workspaceVolume(wid), 429 Target: workspaceDir, 430 }, 431 { 432 Type: mount.TypeVolume, 433 Source: nixVolume(wid), 434 Target: "/nix", 435 }, 436 }, 437 ReadonlyRootfs: true, 438 CapDrop: []string{"ALL"}, 439 SecurityOpt: []string{"no-new-privileges"}, 440 } 441 442 return hostConfig 443} 444 445// thanks woodpecker 446func isErrContainerNotFoundOrNotRunning(err error) bool { 447 // Error response from daemon: Cannot kill container: ...: No such container: ... 448 // Error response from daemon: Cannot kill container: ...: Container ... is not running" 449 // Error response from podman daemon: can only kill running containers. ... is in state exited 450 // Error: No such container: ... 451 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 452}