forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package engine 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "log/slog" 8 "os" 9 "path" 10 "sync" 11 12 "github.com/docker/docker/api/types/container" 13 "github.com/docker/docker/api/types/image" 14 "github.com/docker/docker/api/types/mount" 15 "github.com/docker/docker/api/types/network" 16 "github.com/docker/docker/api/types/volume" 17 "github.com/docker/docker/client" 18 "github.com/docker/docker/pkg/stdcopy" 19 "golang.org/x/sync/errgroup" 20 "tangled.sh/tangled.sh/core/api/tangled" 21 "tangled.sh/tangled.sh/core/log" 22 "tangled.sh/tangled.sh/core/spindle/db" 23) 24 25const ( 26 workspaceDir = "/tangled/workspace" 27) 28 29type Engine struct { 30 docker client.APIClient 31 l *slog.Logger 32 db *db.DB 33} 34 35func New(ctx context.Context, db *db.DB) (*Engine, error) { 36 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 37 if err != nil { 38 return nil, err 39 } 40 41 l := log.FromContext(ctx).With("component", "spindle") 42 43 return &Engine{docker: dcli, l: l, db: db}, nil 44} 45 46// SetupPipeline sets up a new network for the pipeline, and possibly volumes etc. 47// in the future. In here also goes other setup steps. 48func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, id string) error { 49 e.l.Info("setting up pipeline", "pipeline", id) 50 51 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 52 Name: workspaceVolume(id), 53 Driver: "local", 54 }) 55 if err != nil { 56 return err 57 } 58 59 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 60 Name: nixVolume(id), 61 Driver: "local", 62 }) 63 if err != nil { 64 return err 65 } 66 67 _, err = e.docker.NetworkCreate(ctx, pipelineName(id), network.CreateOptions{ 68 Driver: "bridge", 69 }) 70 if err != nil { 71 return err 72 } 73 74 err = e.db.CreatePipeline(id) 75 return err 76} 77 78func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error { 79 e.l.Info("starting all workflows in parallel", "pipeline", id) 80 81 err := e.db.MarkPipelineRunning(id) 82 if err != nil { 83 return err 84 } 85 86 g := errgroup.Group{} 87 for _, w := range pipeline.Workflows { 88 g.Go(func() error { 89 // TODO: actual checks for image/registry etc. 90 var deps string 91 for _, d := range w.Dependencies { 92 if d.Registry == "nixpkgs" { 93 deps = path.Join(d.Packages...) 94 } 95 } 96 97 // load defaults from somewhere else 98 deps = path.Join(deps, "bash", "git", "coreutils", "nix") 99 100 cimg := path.Join("nixery.dev", deps) 101 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{}) 102 if err != nil { 103 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 104 err := e.db.MarkPipelineFailed(id, -1, err.Error()) 105 if err != nil { 106 return err 107 } 108 return fmt.Errorf("pulling image: %w", err) 109 } 110 defer reader.Close() 111 io.Copy(os.Stdout, reader) 112 113 err = e.StartSteps(ctx, w.Steps, id, cimg) 114 if err != nil { 115 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 116 return e.db.MarkPipelineFailed(id, -1, err.Error()) 117 } 118 119 return nil 120 }) 121 } 122 123 err = g.Wait() 124 if err != nil { 125 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 126 return e.db.MarkPipelineFailed(id, -1, err.Error()) 127 } 128 129 e.l.Info("pipeline success!", "id", id) 130 return e.db.MarkPipelineSuccess(id) 131} 132 133// StartSteps starts all steps sequentially with the same base image. 134// ONLY marks pipeline as failed if container's exit code is non-zero. 135// All other errors are bubbled up. 136func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error { 137 for _, step := range steps { 138 hostConfig := hostConfig(id) 139 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 140 Image: image, 141 Cmd: []string{"bash", "-c", step.Command}, 142 WorkingDir: workspaceDir, 143 Tty: false, 144 Hostname: "spindle", 145 Env: []string{"HOME=" + workspaceDir}, 146 }, hostConfig, nil, nil, "") 147 if err != nil { 148 return fmt.Errorf("creating container: %w", err) 149 } 150 151 err = e.docker.NetworkConnect(ctx, pipelineName(id), resp.ID, nil) 152 if err != nil { 153 return fmt.Errorf("connecting network: %w", err) 154 } 155 156 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 157 if err != nil { 158 return err 159 } 160 e.l.Info("started container", "name", resp.ID, "step", step.Name) 161 162 wg := sync.WaitGroup{} 163 164 wg.Add(1) 165 go func() { 166 defer wg.Done() 167 err := e.TailStep(ctx, resp.ID) 168 if err != nil { 169 e.l.Error("failed to tail container", "container", resp.ID) 170 return 171 } 172 }() 173 174 // wait until all logs are piped 175 wg.Wait() 176 177 state, err := e.WaitStep(ctx, resp.ID) 178 if err != nil { 179 return err 180 } 181 182 if state.ExitCode != 0 { 183 e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode) 184 return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error) 185 } 186 } 187 188 return nil 189 190} 191 192func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 193 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 194 select { 195 case err := <-errCh: 196 if err != nil { 197 return nil, err 198 } 199 case <-wait: 200 } 201 202 e.l.Info("waited for container", "name", containerID) 203 204 info, err := e.docker.ContainerInspect(ctx, containerID) 205 if err != nil { 206 return nil, err 207 } 208 209 return info.State, nil 210} 211 212func (e *Engine) TailStep(ctx context.Context, containerID string) error { 213 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 214 Follow: true, 215 ShowStdout: true, 216 ShowStderr: true, 217 Details: false, 218 Timestamps: false, 219 }) 220 if err != nil { 221 return err 222 } 223 224 go func() { 225 _, _ = stdcopy.StdCopy(os.Stdout, os.Stdout, logs) 226 _ = logs.Close() 227 }() 228 return nil 229} 230 231func workspaceVolume(id string) string { 232 return "workspace-" + id 233} 234 235func nixVolume(id string) string { 236 return "nix-" + id 237} 238 239func pipelineName(id string) string { 240 return "pipeline-" + id 241} 242 243func hostConfig(id string) *container.HostConfig { 244 hostConfig := &container.HostConfig{ 245 Mounts: []mount.Mount{ 246 { 247 Type: mount.TypeVolume, 248 Source: workspaceVolume(id), 249 Target: workspaceDir, 250 }, 251 { 252 Type: mount.TypeVolume, 253 Source: nixVolume(id), 254 Target: "/nix", 255 }, 256 }, 257 ReadonlyRootfs: true, 258 CapDrop: []string{"ALL"}, 259 SecurityOpt: []string{"no-new-privileges"}, 260 } 261 262 return hostConfig 263}