forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package engine 2 3import ( 4 "bufio" 5 "context" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "path" 11 "strings" 12 "sync" 13 "syscall" 14 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" 18 "github.com/docker/docker/api/types/network" 19 "github.com/docker/docker/api/types/volume" 20 "github.com/docker/docker/client" 21 "github.com/docker/docker/pkg/stdcopy" 22 "golang.org/x/sync/errgroup" 23 "tangled.sh/tangled.sh/core/api/tangled" 24 "tangled.sh/tangled.sh/core/log" 25 "tangled.sh/tangled.sh/core/notifier" 26 "tangled.sh/tangled.sh/core/spindle/db" 27) 28 29const ( 30 workspaceDir = "/tangled/workspace" 31) 32 33type cleanupFunc func(context.Context) error 34 35type Engine struct { 36 docker client.APIClient 37 l *slog.Logger 38 db *db.DB 39 n *notifier.Notifier 40 41 chanMu sync.RWMutex 42 stdoutChans map[string]chan string 43 stderrChans map[string]chan string 44 45 cleanupMu sync.Mutex 46 cleanup map[string][]cleanupFunc 47} 48 49func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { 50 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 51 if err != nil { 52 return nil, err 53 } 54 55 l := log.FromContext(ctx).With("component", "spindle") 56 57 e := &Engine{ 58 docker: dcli, 59 l: l, 60 db: db, 61 n: n, 62 } 63 64 e.stdoutChans = make(map[string]chan string, 100) 65 e.stderrChans = make(map[string]chan string, 100) 66 67 e.cleanup = make(map[string][]cleanupFunc) 68 69 return e, nil 70} 71 72func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error { 73 e.l.Info("starting all workflows in parallel", "pipeline", id) 74 75 err := e.db.MarkPipelineRunning(id, e.n) 76 if err != nil { 77 return err 78 } 79 80 g := errgroup.Group{} 81 for _, w := range pipeline.Workflows { 82 g.Go(func() error { 83 err := e.SetupWorkflow(ctx, id, w.Name) 84 if err != nil { 85 return err 86 } 87 88 defer e.DestroyWorkflow(ctx, id, w.Name) 89 90 // TODO: actual checks for image/registry etc. 91 var deps string 92 for _, d := range w.Dependencies { 93 if d.Registry == "nixpkgs" { 94 deps = path.Join(d.Packages...) 95 } 96 } 97 98 // load defaults from somewhere else 99 deps = path.Join(deps, "bash", "git", "coreutils", "nix") 100 101 cimg := path.Join("nixery.dev", deps) 102 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{}) 103 if err != nil { 104 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 105 err := e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 106 if err != nil { 107 return err 108 } 109 return fmt.Errorf("pulling image: %w", err) 110 } 111 defer reader.Close() 112 io.Copy(os.Stdout, reader) 113 114 err = e.StartSteps(ctx, w.Steps, w.Name, id, cimg) 115 if err != nil { 116 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 117 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 118 } 119 120 return nil 121 }) 122 } 123 124 err = g.Wait() 125 if err != nil { 126 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 127 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 128 } 129 130 e.l.Info("pipeline success!", "id", id) 131 return e.db.MarkPipelineSuccess(id, e.n) 132} 133 134// SetupWorkflow sets up a new network for the workflow and volumes for 135// the workspace and Nix store. These are persisted across steps and are 136// destroyed at the end of the workflow. 137func (e *Engine) SetupWorkflow(ctx context.Context, id, workflowName string) error { 138 e.l.Info("setting up workflow", "pipeline", id, "workflow", workflowName) 139 140 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 141 Name: workspaceVolume(id, workflowName), 142 Driver: "local", 143 }) 144 if err != nil { 145 return err 146 } 147 e.registerCleanup(id, workflowName, func(ctx context.Context) error { 148 return e.docker.VolumeRemove(ctx, workspaceVolume(id, workflowName), true) 149 }) 150 151 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 152 Name: nixVolume(id, workflowName), 153 Driver: "local", 154 }) 155 if err != nil { 156 return err 157 } 158 e.registerCleanup(id, workflowName, func(ctx context.Context) error { 159 return e.docker.VolumeRemove(ctx, nixVolume(id, workflowName), true) 160 }) 161 162 _, err = e.docker.NetworkCreate(ctx, networkName(id, workflowName), network.CreateOptions{ 163 Driver: "bridge", 164 }) 165 if err != nil { 166 return err 167 } 168 e.registerCleanup(id, workflowName, func(ctx context.Context) error { 169 return e.docker.NetworkRemove(ctx, networkName(id, workflowName)) 170 }) 171 172 return nil 173} 174 175// StartSteps starts all steps sequentially with the same base image. 176// ONLY marks pipeline as failed if container's exit code is non-zero. 177// All other errors are bubbled up. 178func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, workflowName, id, image string) error { 179 // set up logging channels 180 e.chanMu.Lock() 181 if _, exists := e.stdoutChans[id]; !exists { 182 e.stdoutChans[id] = make(chan string, 100) 183 } 184 if _, exists := e.stderrChans[id]; !exists { 185 e.stderrChans[id] = make(chan string, 100) 186 } 187 e.chanMu.Unlock() 188 189 // close channels after all steps are complete 190 defer func() { 191 close(e.stdoutChans[id]) 192 close(e.stderrChans[id]) 193 }() 194 195 for _, step := range steps { 196 hostConfig := hostConfig(id, workflowName) 197 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 198 Image: image, 199 Cmd: []string{"bash", "-c", step.Command}, 200 WorkingDir: workspaceDir, 201 Tty: false, 202 Hostname: "spindle", 203 Env: []string{"HOME=" + workspaceDir}, 204 }, hostConfig, nil, nil, "") 205 if err != nil { 206 return fmt.Errorf("creating container: %w", err) 207 } 208 209 err = e.docker.NetworkConnect(ctx, networkName(id, workflowName), resp.ID, nil) 210 if err != nil { 211 return fmt.Errorf("connecting network: %w", err) 212 } 213 214 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 215 if err != nil { 216 return err 217 } 218 e.l.Info("started container", "name", resp.ID, "step", step.Name) 219 220 wg := sync.WaitGroup{} 221 222 wg.Add(1) 223 go func() { 224 defer wg.Done() 225 err := e.TailStep(ctx, resp.ID, id) 226 if err != nil { 227 e.l.Error("failed to tail container", "container", resp.ID) 228 return 229 } 230 }() 231 232 // wait until all logs are piped 233 wg.Wait() 234 235 state, err := e.WaitStep(ctx, resp.ID) 236 if err != nil { 237 return err 238 } 239 240 err = e.DestroyStep(ctx, resp.ID, id) 241 if err != nil { 242 return err 243 } 244 245 if state.ExitCode != 0 { 246 e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode) 247 return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n) 248 } 249 } 250 251 return nil 252 253} 254 255func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 256 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 257 select { 258 case err := <-errCh: 259 if err != nil { 260 return nil, err 261 } 262 case <-wait: 263 } 264 265 e.l.Info("waited for container", "name", containerID) 266 267 info, err := e.docker.ContainerInspect(ctx, containerID) 268 if err != nil { 269 return nil, err 270 } 271 272 return info.State, nil 273} 274 275func (e *Engine) TailStep(ctx context.Context, containerID, pipelineID string) error { 276 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 277 Follow: true, 278 ShowStdout: true, 279 ShowStderr: true, 280 Details: false, 281 Timestamps: false, 282 }) 283 if err != nil { 284 return err 285 } 286 287 // using StdCopy we demux logs and stream stdout and stderr to different 288 // channels. 289 // 290 // stdout w||r stdoutCh 291 // stderr w||r stderrCh 292 // 293 294 rpipeOut, wpipeOut := io.Pipe() 295 rpipeErr, wpipeErr := io.Pipe() 296 297 go func() { 298 defer wpipeOut.Close() 299 defer wpipeErr.Close() 300 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs) 301 if err != nil && err != io.EOF { 302 e.l.Error("failed to copy logs", "error", err) 303 } 304 }() 305 306 // read from stdout and send to stdout pipe 307 // NOTE: the stdoutCh channnel is closed further up in StartSteps 308 // once all steps are done. 309 go func() { 310 e.chanMu.RLock() 311 stdoutCh := e.stdoutChans[pipelineID] 312 e.chanMu.RUnlock() 313 314 scanner := bufio.NewScanner(rpipeOut) 315 for scanner.Scan() { 316 stdoutCh <- scanner.Text() 317 } 318 if err := scanner.Err(); err != nil { 319 e.l.Error("failed to scan stdout", "error", err) 320 } 321 }() 322 323 // read from stderr and send to stderr pipe 324 // NOTE: the stderrCh channnel is closed further up in StartSteps 325 // once all steps are done. 326 go func() { 327 e.chanMu.RLock() 328 stderrCh := e.stderrChans[pipelineID] 329 e.chanMu.RUnlock() 330 331 scanner := bufio.NewScanner(rpipeErr) 332 for scanner.Scan() { 333 stderrCh <- scanner.Text() 334 } 335 if err := scanner.Err(); err != nil { 336 e.l.Error("failed to scan stderr", "error", err) 337 } 338 }() 339 340 return nil 341} 342 343func (e *Engine) DestroyStep(ctx context.Context, containerID, pipelineID string) error { 344 err := e.docker.ContainerKill(ctx, containerID, syscall.SIGKILL.String()) 345 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 346 return err 347 } 348 349 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 350 RemoveVolumes: true, 351 RemoveLinks: false, 352 Force: false, 353 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 354 return err 355 } 356 357 return nil 358} 359 360func (e *Engine) DestroyWorkflow(ctx context.Context, pipelineID, workflowName string) error { 361 e.cleanupMu.Lock() 362 key := fmt.Sprintf("%s-%s", pipelineID, workflowName) 363 364 fns := e.cleanup[key] 365 delete(e.cleanup, key) 366 e.cleanupMu.Unlock() 367 368 for _, fn := range fns { 369 if err := fn(ctx); err != nil { 370 e.l.Error("failed to cleanup workflow resource", "pipeline", pipelineID, "workflow", workflowName, "err", err) 371 } 372 } 373 return nil 374} 375 376func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) { 377 e.chanMu.RLock() 378 defer e.chanMu.RUnlock() 379 380 stdoutCh, ok1 := e.stdoutChans[pipelineID] 381 stderrCh, ok2 := e.stderrChans[pipelineID] 382 383 if !ok1 || !ok2 { 384 return nil, nil, false 385 } 386 return stdoutCh, stderrCh, true 387} 388 389func (e *Engine) registerCleanup(pipelineID, workflowName string, fn cleanupFunc) { 390 e.cleanupMu.Lock() 391 defer e.cleanupMu.Unlock() 392 393 key := fmt.Sprintf("%s-%s", pipelineID, workflowName) 394 e.cleanup[key] = append(e.cleanup[key], fn) 395} 396 397func workspaceVolume(id, name string) string { 398 return fmt.Sprintf("workspace-%s-%s", id, name) 399} 400 401func nixVolume(id, name string) string { 402 return fmt.Sprintf("nix-%s-%s", id, name) 403} 404 405func networkName(id, name string) string { 406 return fmt.Sprintf("workflow-network-%s-%s", id, name) 407} 408 409func hostConfig(id, name string) *container.HostConfig { 410 hostConfig := &container.HostConfig{ 411 Mounts: []mount.Mount{ 412 { 413 Type: mount.TypeVolume, 414 Source: workspaceVolume(id, name), 415 Target: workspaceDir, 416 }, 417 { 418 Type: mount.TypeVolume, 419 Source: nixVolume(id, name), 420 Target: "/nix", 421 }, 422 }, 423 ReadonlyRootfs: true, 424 CapDrop: []string{"ALL"}, 425 SecurityOpt: []string{"no-new-privileges"}, 426 } 427 428 return hostConfig 429} 430 431// thanks woodpecker 432func isErrContainerNotFoundOrNotRunning(err error) bool { 433 // Error response from daemon: Cannot kill container: ...: No such container: ... 434 // Error response from daemon: Cannot kill container: ...: Container ... is not running" 435 // Error response from podman daemon: can only kill running containers. ... is in state exited 436 // Error: No such container: ... 437 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 438}