forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package engine 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "log/slog" 8 "os" 9 "path" 10 "sync" 11 12 "github.com/docker/docker/api/types/container" 13 "github.com/docker/docker/api/types/image" 14 "github.com/docker/docker/api/types/mount" 15 "github.com/docker/docker/api/types/network" 16 "github.com/docker/docker/api/types/volume" 17 "github.com/docker/docker/client" 18 "github.com/docker/docker/pkg/stdcopy" 19 "golang.org/x/sync/errgroup" 20 "tangled.sh/tangled.sh/core/api/tangled" 21 "tangled.sh/tangled.sh/core/knotserver/notifier" 22 "tangled.sh/tangled.sh/core/log" 23 "tangled.sh/tangled.sh/core/spindle/db" 24) 25 26const ( 27 workspaceDir = "/tangled/workspace" 28) 29 30type Engine struct { 31 docker client.APIClient 32 l *slog.Logger 33 db *db.DB 34 n *notifier.Notifier 35} 36 37func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { 38 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 39 if err != nil { 40 return nil, err 41 } 42 43 l := log.FromContext(ctx).With("component", "spindle") 44 45 return &Engine{docker: dcli, l: l, db: db, n: n}, nil 46} 47 48// SetupPipeline sets up a new network for the pipeline, and possibly volumes etc. 49// in the future. In here also goes other setup steps. 50func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, atUri, id string) error { 51 e.l.Info("setting up pipeline", "pipeline", id) 52 53 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 54 Name: workspaceVolume(id), 55 Driver: "local", 56 }) 57 if err != nil { 58 return err 59 } 60 61 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 62 Name: nixVolume(id), 63 Driver: "local", 64 }) 65 if err != nil { 66 return err 67 } 68 69 _, err = e.docker.NetworkCreate(ctx, pipelineName(id), network.CreateOptions{ 70 Driver: "bridge", 71 }) 72 if err != nil { 73 return err 74 } 75 76 err = e.db.CreatePipeline(id, atUri, e.n) 77 return err 78} 79 80func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error { 81 e.l.Info("starting all workflows in parallel", "pipeline", id) 82 83 err := e.db.MarkPipelineRunning(id, e.n) 84 if err != nil { 85 return err 86 } 87 88 g := errgroup.Group{} 89 for _, w := range pipeline.Workflows { 90 g.Go(func() error { 91 // TODO: actual checks for image/registry etc. 92 var deps string 93 for _, d := range w.Dependencies { 94 if d.Registry == "nixpkgs" { 95 deps = path.Join(d.Packages...) 96 } 97 } 98 99 // load defaults from somewhere else 100 deps = path.Join(deps, "bash", "git", "coreutils", "nix") 101 102 cimg := path.Join("nixery.dev", deps) 103 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{}) 104 if err != nil { 105 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 106 err := e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 107 if err != nil { 108 return err 109 } 110 return fmt.Errorf("pulling image: %w", err) 111 } 112 defer reader.Close() 113 io.Copy(os.Stdout, reader) 114 115 err = e.StartSteps(ctx, w.Steps, id, cimg) 116 if err != nil { 117 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 118 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 119 } 120 121 return nil 122 }) 123 } 124 125 err = g.Wait() 126 if err != nil { 127 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 128 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 129 } 130 131 e.l.Info("pipeline success!", "id", id) 132 return e.db.MarkPipelineSuccess(id, e.n) 133} 134 135// StartSteps starts all steps sequentially with the same base image. 136// ONLY marks pipeline as failed if container's exit code is non-zero. 137// All other errors are bubbled up. 138func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error { 139 for _, step := range steps { 140 hostConfig := hostConfig(id) 141 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 142 Image: image, 143 Cmd: []string{"bash", "-c", step.Command}, 144 WorkingDir: workspaceDir, 145 Tty: false, 146 Hostname: "spindle", 147 Env: []string{"HOME=" + workspaceDir}, 148 }, hostConfig, nil, nil, "") 149 if err != nil { 150 return fmt.Errorf("creating container: %w", err) 151 } 152 153 err = e.docker.NetworkConnect(ctx, pipelineName(id), resp.ID, nil) 154 if err != nil { 155 return fmt.Errorf("connecting network: %w", err) 156 } 157 158 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 159 if err != nil { 160 return err 161 } 162 e.l.Info("started container", "name", resp.ID, "step", step.Name) 163 164 wg := sync.WaitGroup{} 165 166 wg.Add(1) 167 go func() { 168 defer wg.Done() 169 err := e.TailStep(ctx, resp.ID) 170 if err != nil { 171 e.l.Error("failed to tail container", "container", resp.ID) 172 return 173 } 174 }() 175 176 // wait until all logs are piped 177 wg.Wait() 178 179 state, err := e.WaitStep(ctx, resp.ID) 180 if err != nil { 181 return err 182 } 183 184 if state.ExitCode != 0 { 185 e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode) 186 return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n) 187 } 188 } 189 190 return nil 191 192} 193 194func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 195 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 196 select { 197 case err := <-errCh: 198 if err != nil { 199 return nil, err 200 } 201 case <-wait: 202 } 203 204 e.l.Info("waited for container", "name", containerID) 205 206 info, err := e.docker.ContainerInspect(ctx, containerID) 207 if err != nil { 208 return nil, err 209 } 210 211 return info.State, nil 212} 213 214func (e *Engine) TailStep(ctx context.Context, containerID string) error { 215 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 216 Follow: true, 217 ShowStdout: true, 218 ShowStderr: true, 219 Details: false, 220 Timestamps: false, 221 }) 222 if err != nil { 223 return err 224 } 225 226 go func() { 227 _, _ = stdcopy.StdCopy(os.Stdout, os.Stdout, logs) 228 _ = logs.Close() 229 }() 230 return nil 231} 232 233func workspaceVolume(id string) string { 234 return "workspace-" + id 235} 236 237func nixVolume(id string) string { 238 return "nix-" + id 239} 240 241func pipelineName(id string) string { 242 return "pipeline-" + id 243} 244 245func hostConfig(id string) *container.HostConfig { 246 hostConfig := &container.HostConfig{ 247 Mounts: []mount.Mount{ 248 { 249 Type: mount.TypeVolume, 250 Source: workspaceVolume(id), 251 Target: workspaceDir, 252 }, 253 { 254 Type: mount.TypeVolume, 255 Source: nixVolume(id), 256 Target: "/nix", 257 }, 258 }, 259 ReadonlyRootfs: true, 260 CapDrop: []string{"ALL"}, 261 SecurityOpt: []string{"no-new-privileges"}, 262 } 263 264 return hostConfig 265}