1package engine
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "log/slog"
8 "os"
9 "path"
10 "sync"
11
12 "github.com/docker/docker/api/types/container"
13 "github.com/docker/docker/api/types/image"
14 "github.com/docker/docker/api/types/mount"
15 "github.com/docker/docker/api/types/network"
16 "github.com/docker/docker/api/types/volume"
17 "github.com/docker/docker/client"
18 "github.com/docker/docker/pkg/stdcopy"
19 "golang.org/x/sync/errgroup"
20 "tangled.sh/tangled.sh/core/api/tangled"
21 "tangled.sh/tangled.sh/core/knotserver/notifier"
22 "tangled.sh/tangled.sh/core/log"
23 "tangled.sh/tangled.sh/core/spindle/db"
24)
25
26const (
27 workspaceDir = "/tangled/workspace"
28)
29
30type Engine struct {
31 docker client.APIClient
32 l *slog.Logger
33 db *db.DB
34 n *notifier.Notifier
35}
36
37func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
38 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
39 if err != nil {
40 return nil, err
41 }
42
43 l := log.FromContext(ctx).With("component", "spindle")
44
45 return &Engine{docker: dcli, l: l, db: db, n: n}, nil
46}
47
48// SetupPipeline sets up a new network for the pipeline, and possibly volumes etc.
49// in the future. In here also goes other setup steps.
50func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, atUri, id string) error {
51 e.l.Info("setting up pipeline", "pipeline", id)
52
53 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
54 Name: workspaceVolume(id),
55 Driver: "local",
56 })
57 if err != nil {
58 return err
59 }
60
61 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
62 Name: nixVolume(id),
63 Driver: "local",
64 })
65 if err != nil {
66 return err
67 }
68
69 _, err = e.docker.NetworkCreate(ctx, pipelineName(id), network.CreateOptions{
70 Driver: "bridge",
71 })
72 if err != nil {
73 return err
74 }
75
76 err = e.db.CreatePipeline(id, atUri, e.n)
77 return err
78}
79
80func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error {
81 e.l.Info("starting all workflows in parallel", "pipeline", id)
82
83 err := e.db.MarkPipelineRunning(id, e.n)
84 if err != nil {
85 return err
86 }
87
88 g := errgroup.Group{}
89 for _, w := range pipeline.Workflows {
90 g.Go(func() error {
91 // TODO: actual checks for image/registry etc.
92 var deps string
93 for _, d := range w.Dependencies {
94 if d.Registry == "nixpkgs" {
95 deps = path.Join(d.Packages...)
96 }
97 }
98
99 // load defaults from somewhere else
100 deps = path.Join(deps, "bash", "git", "coreutils", "nix")
101
102 cimg := path.Join("nixery.dev", deps)
103 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{})
104 if err != nil {
105 e.l.Error("pipeline failed!", "id", id, "error", err.Error())
106 err := e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
107 if err != nil {
108 return err
109 }
110 return fmt.Errorf("pulling image: %w", err)
111 }
112 defer reader.Close()
113 io.Copy(os.Stdout, reader)
114
115 err = e.StartSteps(ctx, w.Steps, id, cimg)
116 if err != nil {
117 e.l.Error("pipeline failed!", "id", id, "error", err.Error())
118 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
119 }
120
121 return nil
122 })
123 }
124
125 err = g.Wait()
126 if err != nil {
127 e.l.Error("pipeline failed!", "id", id, "error", err.Error())
128 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
129 }
130
131 e.l.Info("pipeline success!", "id", id)
132 return e.db.MarkPipelineSuccess(id, e.n)
133}
134
135// StartSteps starts all steps sequentially with the same base image.
136// ONLY marks pipeline as failed if container's exit code is non-zero.
137// All other errors are bubbled up.
138func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error {
139 for _, step := range steps {
140 hostConfig := hostConfig(id)
141 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
142 Image: image,
143 Cmd: []string{"bash", "-c", step.Command},
144 WorkingDir: workspaceDir,
145 Tty: false,
146 Hostname: "spindle",
147 Env: []string{"HOME=" + workspaceDir},
148 }, hostConfig, nil, nil, "")
149 if err != nil {
150 return fmt.Errorf("creating container: %w", err)
151 }
152
153 err = e.docker.NetworkConnect(ctx, pipelineName(id), resp.ID, nil)
154 if err != nil {
155 return fmt.Errorf("connecting network: %w", err)
156 }
157
158 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
159 if err != nil {
160 return err
161 }
162 e.l.Info("started container", "name", resp.ID, "step", step.Name)
163
164 wg := sync.WaitGroup{}
165
166 wg.Add(1)
167 go func() {
168 defer wg.Done()
169 err := e.TailStep(ctx, resp.ID)
170 if err != nil {
171 e.l.Error("failed to tail container", "container", resp.ID)
172 return
173 }
174 }()
175
176 // wait until all logs are piped
177 wg.Wait()
178
179 state, err := e.WaitStep(ctx, resp.ID)
180 if err != nil {
181 return err
182 }
183
184 if state.ExitCode != 0 {
185 e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode)
186 return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n)
187 }
188 }
189
190 return nil
191
192}
193
194func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
195 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
196 select {
197 case err := <-errCh:
198 if err != nil {
199 return nil, err
200 }
201 case <-wait:
202 }
203
204 e.l.Info("waited for container", "name", containerID)
205
206 info, err := e.docker.ContainerInspect(ctx, containerID)
207 if err != nil {
208 return nil, err
209 }
210
211 return info.State, nil
212}
213
214func (e *Engine) TailStep(ctx context.Context, containerID string) error {
215 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
216 Follow: true,
217 ShowStdout: true,
218 ShowStderr: true,
219 Details: false,
220 Timestamps: false,
221 })
222 if err != nil {
223 return err
224 }
225
226 go func() {
227 _, _ = stdcopy.StdCopy(os.Stdout, os.Stdout, logs)
228 _ = logs.Close()
229 }()
230 return nil
231}
232
233func workspaceVolume(id string) string {
234 return "workspace-" + id
235}
236
237func nixVolume(id string) string {
238 return "nix-" + id
239}
240
241func pipelineName(id string) string {
242 return "pipeline-" + id
243}
244
245func hostConfig(id string) *container.HostConfig {
246 hostConfig := &container.HostConfig{
247 Mounts: []mount.Mount{
248 {
249 Type: mount.TypeVolume,
250 Source: workspaceVolume(id),
251 Target: workspaceDir,
252 },
253 {
254 Type: mount.TypeVolume,
255 Source: nixVolume(id),
256 Target: "/nix",
257 },
258 },
259 ReadonlyRootfs: true,
260 CapDrop: []string{"ALL"},
261 SecurityOpt: []string{"no-new-privileges"},
262 }
263
264 return hostConfig
265}