forked from tangled.org/core
this repo has no description
1package engine 2 3import ( 4 "bufio" 5 "context" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "path" 11 "strings" 12 "sync" 13 14 "github.com/docker/docker/api/types/container" 15 "github.com/docker/docker/api/types/image" 16 "github.com/docker/docker/api/types/mount" 17 "github.com/docker/docker/api/types/network" 18 "github.com/docker/docker/api/types/volume" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "tangled.sh/tangled.sh/core/api/tangled" 22 "tangled.sh/tangled.sh/core/log" 23 "tangled.sh/tangled.sh/core/notifier" 24 "tangled.sh/tangled.sh/core/spindle/db" 25 "tangled.sh/tangled.sh/core/spindle/models" 26) 27 28const ( 29 workspaceDir = "/tangled/workspace" 30) 31 32type cleanupFunc func(context.Context) error 33 34type Engine struct { 35 docker client.APIClient 36 l *slog.Logger 37 db *db.DB 38 n *notifier.Notifier 39 40 chanMu sync.RWMutex 41 stdoutChans map[string]chan string 42 stderrChans map[string]chan string 43 44 cleanupMu sync.Mutex 45 cleanup map[string][]cleanupFunc 46} 47 48func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { 49 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 50 if err != nil { 51 return nil, err 52 } 53 54 l := log.FromContext(ctx).With("component", "spindle") 55 56 e := &Engine{ 57 docker: dcli, 58 l: l, 59 db: db, 60 n: n, 61 } 62 63 e.stdoutChans = make(map[string]chan string, 100) 64 e.stderrChans = make(map[string]chan string, 100) 65 66 e.cleanup = make(map[string][]cleanupFunc) 67 68 return e, nil 69} 70 71func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, pipelineId models.PipelineId) { 72 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 73 74 wg := sync.WaitGroup{} 75 for _, w := range pipeline.Workflows { 76 wg.Add(1) 77 go func() error { 78 defer wg.Done() 79 wid := models.WorkflowId{ 80 PipelineId: pipelineId, 81 Name: w.Name, 82 } 83 84 err := e.db.StatusRunning(wid, e.n) 85 if err != nil { 86 return err 87 } 88 89 err = e.SetupWorkflow(ctx, wid) 90 if err != nil { 91 e.l.Error("setting up worklow", "wid", wid, "err", err) 92 return err 93 } 94 defer e.DestroyWorkflow(ctx, wid) 95 96 // TODO: actual checks for image/registry etc. 97 var deps string 98 for _, d := range w.Dependencies { 99 if d.Registry == "nixpkgs" { 100 deps = path.Join(d.Packages...) 101 } 102 } 103 104 // load defaults from somewhere else 105 deps = path.Join(deps, "bash", "git", "coreutils", "nix") 106 107 cimg := path.Join("nixery.dev", deps) 108 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{}) 109 if err != nil { 110 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error()) 111 112 err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 113 if err != nil { 114 return err 115 } 116 117 return fmt.Errorf("pulling image: %w", err) 118 } 119 defer reader.Close() 120 io.Copy(os.Stdout, reader) 121 122 err = e.StartSteps(ctx, w.Steps, wid, cimg) 123 if err != nil { 124 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error()) 125 126 err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 127 if err != nil { 128 return err 129 } 130 131 return fmt.Errorf("starting steps image: %w", err) 132 } 133 134 err = e.db.StatusSuccess(wid, e.n) 135 if err != nil { 136 return err 137 } 138 139 return nil 140 }() 141 } 142 143 wg.Wait() 144} 145 146// SetupWorkflow sets up a new network for the workflow and volumes for 147// the workspace and Nix store. These are persisted across steps and are 148// destroyed at the end of the workflow. 149func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error { 150 e.l.Info("setting up workflow", "workflow", wid) 151 152 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 153 Name: workspaceVolume(wid), 154 Driver: "local", 155 }) 156 if err != nil { 157 return err 158 } 159 e.registerCleanup(wid, func(ctx context.Context) error { 160 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 161 }) 162 163 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 164 Name: nixVolume(wid), 165 Driver: "local", 166 }) 167 if err != nil { 168 return err 169 } 170 e.registerCleanup(wid, func(ctx context.Context) error { 171 return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 172 }) 173 174 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 175 Driver: "bridge", 176 }) 177 if err != nil { 178 return err 179 } 180 e.registerCleanup(wid, func(ctx context.Context) error { 181 return e.docker.NetworkRemove(ctx, networkName(wid)) 182 }) 183 184 return nil 185} 186 187// StartSteps starts all steps sequentially with the same base image. 188// ONLY marks pipeline as failed if container's exit code is non-zero. 189// All other errors are bubbled up. 190func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, wid models.WorkflowId, image string) error { 191 // set up logging channels 192 e.chanMu.Lock() 193 if _, exists := e.stdoutChans[wid.String()]; !exists { 194 e.stdoutChans[wid.String()] = make(chan string, 100) 195 } 196 if _, exists := e.stderrChans[wid.String()]; !exists { 197 e.stderrChans[wid.String()] = make(chan string, 100) 198 } 199 e.chanMu.Unlock() 200 201 // close channels after all steps are complete 202 defer func() { 203 close(e.stdoutChans[wid.String()]) 204 close(e.stderrChans[wid.String()]) 205 }() 206 207 for _, step := range steps { 208 envs := ConstructEnvs(step.Environment) 209 envs.AddEnv("HOME", workspaceDir) 210 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 211 212 hostConfig := hostConfig(wid) 213 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 214 Image: image, 215 Cmd: []string{"bash", "-c", step.Command}, 216 WorkingDir: workspaceDir, 217 Tty: false, 218 Hostname: "spindle", 219 Env: envs.Slice(), 220 }, hostConfig, nil, nil, "") 221 if err != nil { 222 return fmt.Errorf("creating container: %w", err) 223 } 224 225 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 226 if err != nil { 227 return fmt.Errorf("connecting network: %w", err) 228 } 229 230 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 231 if err != nil { 232 return err 233 } 234 e.l.Info("started container", "name", resp.ID, "step", step.Name) 235 236 wg := sync.WaitGroup{} 237 238 wg.Add(1) 239 go func() { 240 defer wg.Done() 241 err := e.TailStep(ctx, resp.ID, wid) 242 if err != nil { 243 e.l.Error("failed to tail container", "container", resp.ID) 244 return 245 } 246 }() 247 248 // wait until all logs are piped 249 wg.Wait() 250 251 state, err := e.WaitStep(ctx, resp.ID) 252 if err != nil { 253 return err 254 } 255 256 err = e.DestroyStep(ctx, resp.ID) 257 if err != nil { 258 return err 259 } 260 261 if state.ExitCode != 0 { 262 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode) 263 return fmt.Errorf("%s", state.Error) 264 } 265 } 266 267 return nil 268 269} 270 271func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 272 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 273 select { 274 case err := <-errCh: 275 if err != nil { 276 return nil, err 277 } 278 case <-wait: 279 } 280 281 e.l.Info("waited for container", "name", containerID) 282 283 info, err := e.docker.ContainerInspect(ctx, containerID) 284 if err != nil { 285 return nil, err 286 } 287 288 return info.State, nil 289} 290 291func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error { 292 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 293 Follow: true, 294 ShowStdout: true, 295 ShowStderr: true, 296 Details: false, 297 Timestamps: false, 298 }) 299 if err != nil { 300 return err 301 } 302 303 // using StdCopy we demux logs and stream stdout and stderr to different 304 // channels. 305 // 306 // stdout w||r stdoutCh 307 // stderr w||r stderrCh 308 // 309 310 rpipeOut, wpipeOut := io.Pipe() 311 rpipeErr, wpipeErr := io.Pipe() 312 313 go func() { 314 defer wpipeOut.Close() 315 defer wpipeErr.Close() 316 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs) 317 if err != nil && err != io.EOF { 318 e.l.Error("failed to copy logs", "error", err) 319 } 320 }() 321 322 // read from stdout and send to stdout pipe 323 // NOTE: the stdoutCh channnel is closed further up in StartSteps 324 // once all steps are done. 325 go func() { 326 e.chanMu.RLock() 327 stdoutCh := e.stdoutChans[wid.String()] 328 e.chanMu.RUnlock() 329 330 scanner := bufio.NewScanner(rpipeOut) 331 for scanner.Scan() { 332 stdoutCh <- scanner.Text() 333 } 334 if err := scanner.Err(); err != nil { 335 e.l.Error("failed to scan stdout", "error", err) 336 } 337 }() 338 339 // read from stderr and send to stderr pipe 340 // NOTE: the stderrCh channnel is closed further up in StartSteps 341 // once all steps are done. 342 go func() { 343 e.chanMu.RLock() 344 stderrCh := e.stderrChans[wid.String()] 345 e.chanMu.RUnlock() 346 347 scanner := bufio.NewScanner(rpipeErr) 348 for scanner.Scan() { 349 stderrCh <- scanner.Text() 350 } 351 if err := scanner.Err(); err != nil { 352 e.l.Error("failed to scan stderr", "error", err) 353 } 354 }() 355 356 return nil 357} 358 359func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 360 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 361 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 362 return err 363 } 364 365 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 366 RemoveVolumes: true, 367 RemoveLinks: false, 368 Force: false, 369 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 370 return err 371 } 372 373 return nil 374} 375 376func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 377 e.cleanupMu.Lock() 378 key := wid.String() 379 380 fns := e.cleanup[key] 381 delete(e.cleanup, key) 382 e.cleanupMu.Unlock() 383 384 for _, fn := range fns { 385 if err := fn(ctx); err != nil { 386 e.l.Error("failed to cleanup workflow resource", "workflowId", wid) 387 } 388 } 389 return nil 390} 391 392func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) { 393 e.chanMu.RLock() 394 defer e.chanMu.RUnlock() 395 396 stdoutCh, ok1 := e.stdoutChans[wid.String()] 397 stderrCh, ok2 := e.stderrChans[wid.String()] 398 399 if !ok1 || !ok2 { 400 return nil, nil, false 401 } 402 return stdoutCh, stderrCh, true 403} 404 405func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 406 e.cleanupMu.Lock() 407 defer e.cleanupMu.Unlock() 408 409 key := wid.String() 410 e.cleanup[key] = append(e.cleanup[key], fn) 411} 412 413func workspaceVolume(wid models.WorkflowId) string { 414 return fmt.Sprintf("workspace-%s", wid) 415} 416 417func nixVolume(wid models.WorkflowId) string { 418 return fmt.Sprintf("nix-%s", wid) 419} 420 421func networkName(wid models.WorkflowId) string { 422 return fmt.Sprintf("workflow-network-%s", wid) 423} 424 425func hostConfig(wid models.WorkflowId) *container.HostConfig { 426 hostConfig := &container.HostConfig{ 427 Mounts: []mount.Mount{ 428 { 429 Type: mount.TypeVolume, 430 Source: workspaceVolume(wid), 431 Target: workspaceDir, 432 }, 433 { 434 Type: mount.TypeVolume, 435 Source: nixVolume(wid), 436 Target: "/nix", 437 }, 438 }, 439 ReadonlyRootfs: true, 440 CapDrop: []string{"ALL"}, 441 SecurityOpt: []string{"no-new-privileges"}, 442 } 443 444 return hostConfig 445} 446 447// thanks woodpecker 448func isErrContainerNotFoundOrNotRunning(err error) bool { 449 // Error response from daemon: Cannot kill container: ...: No such container: ... 450 // Error response from daemon: Cannot kill container: ...: Container ... is not running" 451 // Error response from podman daemon: can only kill running containers. ... is in state exited 452 // Error: No such container: ... 453 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 454}