forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package engine
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "log/slog"
8 "os"
9 "path"
10 "sync"
11
12 "github.com/docker/docker/api/types/container"
13 "github.com/docker/docker/api/types/image"
14 "github.com/docker/docker/api/types/mount"
15 "github.com/docker/docker/api/types/network"
16 "github.com/docker/docker/api/types/volume"
17 "github.com/docker/docker/client"
18 "github.com/docker/docker/pkg/stdcopy"
19 "golang.org/x/sync/errgroup"
20 "tangled.sh/tangled.sh/core/api/tangled"
21 "tangled.sh/tangled.sh/core/log"
22 "tangled.sh/tangled.sh/core/spindle/db"
23)
24
25const (
26 workspaceDir = "/tangled/workspace"
27)
28
29type Engine struct {
30 docker client.APIClient
31 l *slog.Logger
32 db *db.DB
33}
34
35func New(ctx context.Context, db *db.DB) (*Engine, error) {
36 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
37 if err != nil {
38 return nil, err
39 }
40
41 l := log.FromContext(ctx).With("component", "spindle")
42
43 return &Engine{docker: dcli, l: l, db: db}, nil
44}
45
46// SetupPipeline sets up a new network for the pipeline, and possibly volumes etc.
47// in the future. In here also goes other setup steps.
48func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, id string) error {
49 e.l.Info("setting up pipeline", "pipeline", id)
50
51 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
52 Name: workspaceVolume(id),
53 Driver: "local",
54 })
55 if err != nil {
56 return err
57 }
58
59 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
60 Name: nixVolume(id),
61 Driver: "local",
62 })
63 if err != nil {
64 return err
65 }
66
67 _, err = e.docker.NetworkCreate(ctx, pipelineName(id), network.CreateOptions{
68 Driver: "bridge",
69 })
70 if err != nil {
71 return err
72 }
73
74 err = e.db.CreatePipeline(id)
75 return err
76}
77
78func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error {
79 e.l.Info("starting all workflows in parallel", "pipeline", id)
80
81 err := e.db.MarkPipelineRunning(id)
82 if err != nil {
83 return err
84 }
85
86 g := errgroup.Group{}
87 for _, w := range pipeline.Workflows {
88 g.Go(func() error {
89 // TODO: actual checks for image/registry etc.
90 var deps string
91 for _, d := range w.Dependencies {
92 if d.Registry == "nixpkgs" {
93 deps = path.Join(d.Packages...)
94 }
95 }
96
97 // load defaults from somewhere else
98 deps = path.Join(deps, "bash", "git", "coreutils", "nix")
99
100 cimg := path.Join("nixery.dev", deps)
101 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{})
102 if err != nil {
103 e.l.Error("pipeline failed!", "id", id, "error", err.Error())
104 err := e.db.MarkPipelineFailed(id, -1, err.Error())
105 if err != nil {
106 return err
107 }
108 return fmt.Errorf("pulling image: %w", err)
109 }
110 defer reader.Close()
111 io.Copy(os.Stdout, reader)
112
113 err = e.StartSteps(ctx, w.Steps, id, cimg)
114 if err != nil {
115 e.l.Error("pipeline failed!", "id", id, "error", err.Error())
116 return e.db.MarkPipelineFailed(id, -1, err.Error())
117 }
118
119 return nil
120 })
121 }
122
123 err = g.Wait()
124 if err != nil {
125 e.l.Error("pipeline failed!", "id", id, "error", err.Error())
126 return e.db.MarkPipelineFailed(id, -1, err.Error())
127 }
128
129 e.l.Info("pipeline success!", "id", id)
130 return e.db.MarkPipelineSuccess(id)
131}
132
133// StartSteps starts all steps sequentially with the same base image.
134// ONLY marks pipeline as failed if container's exit code is non-zero.
135// All other errors are bubbled up.
136func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error {
137 for _, step := range steps {
138 hostConfig := hostConfig(id)
139 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
140 Image: image,
141 Cmd: []string{"bash", "-c", step.Command},
142 WorkingDir: workspaceDir,
143 Tty: false,
144 Hostname: "spindle",
145 Env: []string{"HOME=" + workspaceDir},
146 }, hostConfig, nil, nil, "")
147 if err != nil {
148 return fmt.Errorf("creating container: %w", err)
149 }
150
151 err = e.docker.NetworkConnect(ctx, pipelineName(id), resp.ID, nil)
152 if err != nil {
153 return fmt.Errorf("connecting network: %w", err)
154 }
155
156 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
157 if err != nil {
158 return err
159 }
160 e.l.Info("started container", "name", resp.ID, "step", step.Name)
161
162 wg := sync.WaitGroup{}
163
164 wg.Add(1)
165 go func() {
166 defer wg.Done()
167 err := e.TailStep(ctx, resp.ID)
168 if err != nil {
169 e.l.Error("failed to tail container", "container", resp.ID)
170 return
171 }
172 }()
173
174 // wait until all logs are piped
175 wg.Wait()
176
177 state, err := e.WaitStep(ctx, resp.ID)
178 if err != nil {
179 return err
180 }
181
182 if state.ExitCode != 0 {
183 e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode)
184 return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error)
185 }
186 }
187
188 return nil
189
190}
191
192func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
193 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
194 select {
195 case err := <-errCh:
196 if err != nil {
197 return nil, err
198 }
199 case <-wait:
200 }
201
202 e.l.Info("waited for container", "name", containerID)
203
204 info, err := e.docker.ContainerInspect(ctx, containerID)
205 if err != nil {
206 return nil, err
207 }
208
209 return info.State, nil
210}
211
212func (e *Engine) TailStep(ctx context.Context, containerID string) error {
213 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
214 Follow: true,
215 ShowStdout: true,
216 ShowStderr: true,
217 Details: false,
218 Timestamps: false,
219 })
220 if err != nil {
221 return err
222 }
223
224 go func() {
225 _, _ = stdcopy.StdCopy(os.Stdout, os.Stdout, logs)
226 _ = logs.Close()
227 }()
228 return nil
229}
230
231func workspaceVolume(id string) string {
232 return "workspace-" + id
233}
234
235func nixVolume(id string) string {
236 return "nix-" + id
237}
238
239func pipelineName(id string) string {
240 return "pipeline-" + id
241}
242
243func hostConfig(id string) *container.HostConfig {
244 hostConfig := &container.HostConfig{
245 Mounts: []mount.Mount{
246 {
247 Type: mount.TypeVolume,
248 Source: workspaceVolume(id),
249 Target: workspaceDir,
250 },
251 {
252 Type: mount.TypeVolume,
253 Source: nixVolume(id),
254 Target: "/nix",
255 },
256 },
257 ReadonlyRootfs: true,
258 CapDrop: []string{"ALL"},
259 SecurityOpt: []string{"no-new-privileges"},
260 }
261
262 return hostConfig
263}