forked from tangled.org/core
this repo has no description
1package engine 2 3import ( 4 "bufio" 5 "context" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "path" 11 "strings" 12 "sync" 13 14 "github.com/docker/docker/api/types/container" 15 "github.com/docker/docker/api/types/image" 16 "github.com/docker/docker/api/types/mount" 17 "github.com/docker/docker/api/types/network" 18 "github.com/docker/docker/api/types/volume" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "tangled.sh/tangled.sh/core/api/tangled" 22 "tangled.sh/tangled.sh/core/log" 23 "tangled.sh/tangled.sh/core/notifier" 24 "tangled.sh/tangled.sh/core/spindle/db" 25 "tangled.sh/tangled.sh/core/spindle/models" 26) 27 28const ( 29 workspaceDir = "/tangled/workspace" 30) 31 32type cleanupFunc func(context.Context) error 33 34type Engine struct { 35 docker client.APIClient 36 l *slog.Logger 37 db *db.DB 38 n *notifier.Notifier 39 40 chanMu sync.RWMutex 41 stdoutChans map[string]chan string 42 stderrChans map[string]chan string 43 44 cleanupMu sync.Mutex 45 cleanup map[string][]cleanupFunc 46} 47 48func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { 49 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 50 if err != nil { 51 return nil, err 52 } 53 54 l := log.FromContext(ctx).With("component", "spindle") 55 56 e := &Engine{ 57 docker: dcli, 58 l: l, 59 db: db, 60 n: n, 61 } 62 63 e.stdoutChans = make(map[string]chan string, 100) 64 e.stderrChans = make(map[string]chan string, 100) 65 66 e.cleanup = make(map[string][]cleanupFunc) 67 68 return e, nil 69} 70 71func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, pipelineId models.PipelineId) { 72 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 73 74 wg := sync.WaitGroup{} 75 for _, w := range pipeline.Workflows { 76 wg.Add(1) 77 go func() error { 78 defer wg.Done() 79 wid := models.WorkflowId{ 80 PipelineId: pipelineId, 81 Name: w.Name, 82 } 83 84 err := e.db.StatusRunning(wid, e.n) 85 if err != nil { 86 return err 87 } 88 89 err = e.SetupWorkflow(ctx, wid) 90 if err != nil { 91 e.l.Error("setting up worklow", "wid", wid, "err", err) 92 return err 93 } 94 defer e.DestroyWorkflow(ctx, wid) 95 96 // TODO: actual checks for image/registry etc. 97 var deps string 98 for _, d := range w.Dependencies { 99 if d.Registry == "nixpkgs" { 100 deps = path.Join(d.Packages...) 101 } 102 } 103 104 // load defaults from somewhere else 105 deps = path.Join(deps, "bash", "git", "coreutils", "nix") 106 107 cimg := path.Join("nixery.dev", deps) 108 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{}) 109 if err != nil { 110 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error()) 111 112 err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 113 if err != nil { 114 return err 115 } 116 117 return fmt.Errorf("pulling image: %w", err) 118 } 119 defer reader.Close() 120 io.Copy(os.Stdout, reader) 121 122 err = e.StartSteps(ctx, w.Steps, wid, cimg) 123 if err != nil { 124 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error()) 125 126 err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 127 if err != nil { 128 return err 129 } 130 } 131 132 err = e.db.StatusSuccess(wid, e.n) 133 if err != nil { 134 return err 135 } 136 137 return nil 138 }() 139 } 140 141 wg.Wait() 142} 143 144// SetupWorkflow sets up a new network for the workflow and volumes for 145// the workspace and Nix store. These are persisted across steps and are 146// destroyed at the end of the workflow. 147func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error { 148 e.l.Info("setting up workflow", "workflow", wid) 149 150 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 151 Name: workspaceVolume(wid), 152 Driver: "local", 153 }) 154 if err != nil { 155 return err 156 } 157 e.registerCleanup(wid, func(ctx context.Context) error { 158 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 159 }) 160 161 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 162 Name: nixVolume(wid), 163 Driver: "local", 164 }) 165 if err != nil { 166 return err 167 } 168 e.registerCleanup(wid, func(ctx context.Context) error { 169 return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 170 }) 171 172 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 173 Driver: "bridge", 174 }) 175 if err != nil { 176 return err 177 } 178 e.registerCleanup(wid, func(ctx context.Context) error { 179 return e.docker.NetworkRemove(ctx, networkName(wid)) 180 }) 181 182 return nil 183} 184 185// StartSteps starts all steps sequentially with the same base image. 186// ONLY marks pipeline as failed if container's exit code is non-zero. 187// All other errors are bubbled up. 188func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, wid models.WorkflowId, image string) error { 189 // set up logging channels 190 e.chanMu.Lock() 191 if _, exists := e.stdoutChans[wid.String()]; !exists { 192 e.stdoutChans[wid.String()] = make(chan string, 100) 193 } 194 if _, exists := e.stderrChans[wid.String()]; !exists { 195 e.stderrChans[wid.String()] = make(chan string, 100) 196 } 197 e.chanMu.Unlock() 198 199 // close channels after all steps are complete 200 defer func() { 201 close(e.stdoutChans[wid.String()]) 202 close(e.stderrChans[wid.String()]) 203 }() 204 205 for _, step := range steps { 206 hostConfig := hostConfig(wid) 207 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 208 Image: image, 209 Cmd: []string{"bash", "-c", step.Command}, 210 WorkingDir: workspaceDir, 211 Tty: false, 212 Hostname: "spindle", 213 Env: []string{"HOME=" + workspaceDir}, 214 }, hostConfig, nil, nil, "") 215 if err != nil { 216 return fmt.Errorf("creating container: %w", err) 217 } 218 219 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 220 if err != nil { 221 return fmt.Errorf("connecting network: %w", err) 222 } 223 224 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 225 if err != nil { 226 return err 227 } 228 e.l.Info("started container", "name", resp.ID, "step", step.Name) 229 230 wg := sync.WaitGroup{} 231 232 wg.Add(1) 233 go func() { 234 defer wg.Done() 235 err := e.TailStep(ctx, resp.ID, wid) 236 if err != nil { 237 e.l.Error("failed to tail container", "container", resp.ID) 238 return 239 } 240 }() 241 242 // wait until all logs are piped 243 wg.Wait() 244 245 state, err := e.WaitStep(ctx, resp.ID) 246 if err != nil { 247 return err 248 } 249 250 err = e.DestroyStep(ctx, resp.ID) 251 if err != nil { 252 return err 253 } 254 255 if state.ExitCode != 0 { 256 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode) 257 // return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n) 258 } 259 } 260 261 return nil 262 263} 264 265func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 266 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 267 select { 268 case err := <-errCh: 269 if err != nil { 270 return nil, err 271 } 272 case <-wait: 273 } 274 275 e.l.Info("waited for container", "name", containerID) 276 277 info, err := e.docker.ContainerInspect(ctx, containerID) 278 if err != nil { 279 return nil, err 280 } 281 282 return info.State, nil 283} 284 285func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error { 286 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 287 Follow: true, 288 ShowStdout: true, 289 ShowStderr: true, 290 Details: false, 291 Timestamps: false, 292 }) 293 if err != nil { 294 return err 295 } 296 297 // using StdCopy we demux logs and stream stdout and stderr to different 298 // channels. 299 // 300 // stdout w||r stdoutCh 301 // stderr w||r stderrCh 302 // 303 304 rpipeOut, wpipeOut := io.Pipe() 305 rpipeErr, wpipeErr := io.Pipe() 306 307 go func() { 308 defer wpipeOut.Close() 309 defer wpipeErr.Close() 310 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs) 311 if err != nil && err != io.EOF { 312 e.l.Error("failed to copy logs", "error", err) 313 } 314 }() 315 316 // read from stdout and send to stdout pipe 317 // NOTE: the stdoutCh channnel is closed further up in StartSteps 318 // once all steps are done. 319 go func() { 320 e.chanMu.RLock() 321 stdoutCh := e.stdoutChans[wid.String()] 322 e.chanMu.RUnlock() 323 324 scanner := bufio.NewScanner(rpipeOut) 325 for scanner.Scan() { 326 stdoutCh <- scanner.Text() 327 } 328 if err := scanner.Err(); err != nil { 329 e.l.Error("failed to scan stdout", "error", err) 330 } 331 }() 332 333 // read from stderr and send to stderr pipe 334 // NOTE: the stderrCh channnel is closed further up in StartSteps 335 // once all steps are done. 336 go func() { 337 e.chanMu.RLock() 338 stderrCh := e.stderrChans[wid.String()] 339 e.chanMu.RUnlock() 340 341 scanner := bufio.NewScanner(rpipeErr) 342 for scanner.Scan() { 343 stderrCh <- scanner.Text() 344 } 345 if err := scanner.Err(); err != nil { 346 e.l.Error("failed to scan stderr", "error", err) 347 } 348 }() 349 350 return nil 351} 352 353func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 354 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 355 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 356 return err 357 } 358 359 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 360 RemoveVolumes: true, 361 RemoveLinks: false, 362 Force: false, 363 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 364 return err 365 } 366 367 return nil 368} 369 370func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 371 e.cleanupMu.Lock() 372 key := wid.String() 373 374 fns := e.cleanup[key] 375 delete(e.cleanup, key) 376 e.cleanupMu.Unlock() 377 378 for _, fn := range fns { 379 if err := fn(ctx); err != nil { 380 e.l.Error("failed to cleanup workflow resource", "workflowId", wid) 381 } 382 } 383 return nil 384} 385 386func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) { 387 e.chanMu.RLock() 388 defer e.chanMu.RUnlock() 389 390 stdoutCh, ok1 := e.stdoutChans[wid.String()] 391 stderrCh, ok2 := e.stderrChans[wid.String()] 392 393 if !ok1 || !ok2 { 394 return nil, nil, false 395 } 396 return stdoutCh, stderrCh, true 397} 398 399func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 400 e.cleanupMu.Lock() 401 defer e.cleanupMu.Unlock() 402 403 key := wid.String() 404 e.cleanup[key] = append(e.cleanup[key], fn) 405} 406 407func workspaceVolume(wid models.WorkflowId) string { 408 return fmt.Sprintf("workspace-%s", wid) 409} 410 411func nixVolume(wid models.WorkflowId) string { 412 return fmt.Sprintf("nix-%s", wid) 413} 414 415func networkName(wid models.WorkflowId) string { 416 return fmt.Sprintf("workflow-network-%s", wid) 417} 418 419func hostConfig(wid models.WorkflowId) *container.HostConfig { 420 hostConfig := &container.HostConfig{ 421 Mounts: []mount.Mount{ 422 { 423 Type: mount.TypeVolume, 424 Source: workspaceVolume(wid), 425 Target: workspaceDir, 426 }, 427 { 428 Type: mount.TypeVolume, 429 Source: nixVolume(wid), 430 Target: "/nix", 431 }, 432 }, 433 ReadonlyRootfs: true, 434 CapDrop: []string{"ALL"}, 435 SecurityOpt: []string{"no-new-privileges"}, 436 } 437 438 return hostConfig 439} 440 441// thanks woodpecker 442func isErrContainerNotFoundOrNotRunning(err error) bool { 443 // Error response from daemon: Cannot kill container: ...: No such container: ... 444 // Error response from daemon: Cannot kill container: ...: Container ... is not running" 445 // Error response from podman daemon: can only kill running containers. ... is in state exited 446 // Error: No such container: ... 447 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 448}