1package engine
2
3import (
4 "bufio"
5 "context"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "path"
11 "strings"
12 "sync"
13 "syscall"
14
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/api/types/volume"
20 "github.com/docker/docker/client"
21 "github.com/docker/docker/pkg/stdcopy"
22 "golang.org/x/sync/errgroup"
23 "tangled.sh/tangled.sh/core/api/tangled"
24 "tangled.sh/tangled.sh/core/log"
25 "tangled.sh/tangled.sh/core/notifier"
26 "tangled.sh/tangled.sh/core/spindle/db"
27)
28
29const (
30 workspaceDir = "/tangled/workspace"
31)
32
33type cleanupFunc func(context.Context) error
34
35type Engine struct {
36 docker client.APIClient
37 l *slog.Logger
38 db *db.DB
39 n *notifier.Notifier
40
41 chanMu sync.RWMutex
42 stdoutChans map[string]chan string
43 stderrChans map[string]chan string
44
45 cleanupMu sync.Mutex
46 cleanup map[string][]cleanupFunc
47}
48
49func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
50 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
51 if err != nil {
52 return nil, err
53 }
54
55 l := log.FromContext(ctx).With("component", "spindle")
56
57 e := &Engine{
58 docker: dcli,
59 l: l,
60 db: db,
61 n: n,
62 }
63
64 e.stdoutChans = make(map[string]chan string, 100)
65 e.stderrChans = make(map[string]chan string, 100)
66
67 e.cleanup = make(map[string][]cleanupFunc)
68
69 return e, nil
70}
71
72func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error {
73 e.l.Info("starting all workflows in parallel", "pipeline", id)
74
75 err := e.db.MarkPipelineRunning(id, e.n)
76 if err != nil {
77 return err
78 }
79
80 g := errgroup.Group{}
81 for _, w := range pipeline.Workflows {
82 g.Go(func() error {
83 err := e.SetupWorkflow(ctx, id, w.Name)
84 if err != nil {
85 return err
86 }
87
88 defer e.DestroyWorkflow(ctx, id, w.Name)
89
90 // TODO: actual checks for image/registry etc.
91 var deps string
92 for _, d := range w.Dependencies {
93 if d.Registry == "nixpkgs" {
94 deps = path.Join(d.Packages...)
95 }
96 }
97
98 // load defaults from somewhere else
99 deps = path.Join(deps, "bash", "git", "coreutils", "nix")
100
101 cimg := path.Join("nixery.dev", deps)
102 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{})
103 if err != nil {
104 e.l.Error("pipeline failed!", "id", id, "error", err.Error())
105 err := e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
106 if err != nil {
107 return err
108 }
109 return fmt.Errorf("pulling image: %w", err)
110 }
111 defer reader.Close()
112 io.Copy(os.Stdout, reader)
113
114 err = e.StartSteps(ctx, w.Steps, w.Name, id, cimg)
115 if err != nil {
116 e.l.Error("pipeline failed!", "id", id, "error", err.Error())
117 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
118 }
119
120 return nil
121 })
122 }
123
124 err = g.Wait()
125 if err != nil {
126 e.l.Error("pipeline failed!", "id", id, "error", err.Error())
127 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
128 }
129
130 e.l.Info("pipeline success!", "id", id)
131 return e.db.MarkPipelineSuccess(id, e.n)
132}
133
134// SetupWorkflow sets up a new network for the workflow and volumes for
135// the workspace and Nix store. These are persisted across steps and are
136// destroyed at the end of the workflow.
137func (e *Engine) SetupWorkflow(ctx context.Context, id, workflowName string) error {
138 e.l.Info("setting up workflow", "pipeline", id, "workflow", workflowName)
139
140 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
141 Name: workspaceVolume(id, workflowName),
142 Driver: "local",
143 })
144 if err != nil {
145 return err
146 }
147 e.registerCleanup(id, workflowName, func(ctx context.Context) error {
148 return e.docker.VolumeRemove(ctx, workspaceVolume(id, workflowName), true)
149 })
150
151 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
152 Name: nixVolume(id, workflowName),
153 Driver: "local",
154 })
155 if err != nil {
156 return err
157 }
158 e.registerCleanup(id, workflowName, func(ctx context.Context) error {
159 return e.docker.VolumeRemove(ctx, nixVolume(id, workflowName), true)
160 })
161
162 _, err = e.docker.NetworkCreate(ctx, networkName(id, workflowName), network.CreateOptions{
163 Driver: "bridge",
164 })
165 if err != nil {
166 return err
167 }
168 e.registerCleanup(id, workflowName, func(ctx context.Context) error {
169 return e.docker.NetworkRemove(ctx, networkName(id, workflowName))
170 })
171
172 return nil
173}
174
175// StartSteps starts all steps sequentially with the same base image.
176// ONLY marks pipeline as failed if container's exit code is non-zero.
177// All other errors are bubbled up.
178func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, workflowName, id, image string) error {
179 // set up logging channels
180 e.chanMu.Lock()
181 if _, exists := e.stdoutChans[id]; !exists {
182 e.stdoutChans[id] = make(chan string, 100)
183 }
184 if _, exists := e.stderrChans[id]; !exists {
185 e.stderrChans[id] = make(chan string, 100)
186 }
187 e.chanMu.Unlock()
188
189 // close channels after all steps are complete
190 defer func() {
191 close(e.stdoutChans[id])
192 close(e.stderrChans[id])
193 }()
194
195 for _, step := range steps {
196 hostConfig := hostConfig(id, workflowName)
197 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
198 Image: image,
199 Cmd: []string{"bash", "-c", step.Command},
200 WorkingDir: workspaceDir,
201 Tty: false,
202 Hostname: "spindle",
203 Env: []string{"HOME=" + workspaceDir},
204 }, hostConfig, nil, nil, "")
205 if err != nil {
206 return fmt.Errorf("creating container: %w", err)
207 }
208
209 err = e.docker.NetworkConnect(ctx, networkName(id, workflowName), resp.ID, nil)
210 if err != nil {
211 return fmt.Errorf("connecting network: %w", err)
212 }
213
214 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
215 if err != nil {
216 return err
217 }
218 e.l.Info("started container", "name", resp.ID, "step", step.Name)
219
220 wg := sync.WaitGroup{}
221
222 wg.Add(1)
223 go func() {
224 defer wg.Done()
225 err := e.TailStep(ctx, resp.ID, id)
226 if err != nil {
227 e.l.Error("failed to tail container", "container", resp.ID)
228 return
229 }
230 }()
231
232 // wait until all logs are piped
233 wg.Wait()
234
235 state, err := e.WaitStep(ctx, resp.ID)
236 if err != nil {
237 return err
238 }
239
240 err = e.DestroyStep(ctx, resp.ID, id)
241 if err != nil {
242 return err
243 }
244
245 if state.ExitCode != 0 {
246 e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode)
247 return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n)
248 }
249 }
250
251 return nil
252
253}
254
255func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
256 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
257 select {
258 case err := <-errCh:
259 if err != nil {
260 return nil, err
261 }
262 case <-wait:
263 }
264
265 e.l.Info("waited for container", "name", containerID)
266
267 info, err := e.docker.ContainerInspect(ctx, containerID)
268 if err != nil {
269 return nil, err
270 }
271
272 return info.State, nil
273}
274
275func (e *Engine) TailStep(ctx context.Context, containerID, pipelineID string) error {
276 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
277 Follow: true,
278 ShowStdout: true,
279 ShowStderr: true,
280 Details: false,
281 Timestamps: false,
282 })
283 if err != nil {
284 return err
285 }
286
287 // using StdCopy we demux logs and stream stdout and stderr to different
288 // channels.
289 //
290 // stdout w||r stdoutCh
291 // stderr w||r stderrCh
292 //
293
294 rpipeOut, wpipeOut := io.Pipe()
295 rpipeErr, wpipeErr := io.Pipe()
296
297 go func() {
298 defer wpipeOut.Close()
299 defer wpipeErr.Close()
300 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs)
301 if err != nil && err != io.EOF {
302 e.l.Error("failed to copy logs", "error", err)
303 }
304 }()
305
306 // read from stdout and send to stdout pipe
307 // NOTE: the stdoutCh channnel is closed further up in StartSteps
308 // once all steps are done.
309 go func() {
310 e.chanMu.RLock()
311 stdoutCh := e.stdoutChans[pipelineID]
312 e.chanMu.RUnlock()
313
314 scanner := bufio.NewScanner(rpipeOut)
315 for scanner.Scan() {
316 stdoutCh <- scanner.Text()
317 }
318 if err := scanner.Err(); err != nil {
319 e.l.Error("failed to scan stdout", "error", err)
320 }
321 }()
322
323 // read from stderr and send to stderr pipe
324 // NOTE: the stderrCh channnel is closed further up in StartSteps
325 // once all steps are done.
326 go func() {
327 e.chanMu.RLock()
328 stderrCh := e.stderrChans[pipelineID]
329 e.chanMu.RUnlock()
330
331 scanner := bufio.NewScanner(rpipeErr)
332 for scanner.Scan() {
333 stderrCh <- scanner.Text()
334 }
335 if err := scanner.Err(); err != nil {
336 e.l.Error("failed to scan stderr", "error", err)
337 }
338 }()
339
340 return nil
341}
342
343func (e *Engine) DestroyStep(ctx context.Context, containerID, pipelineID string) error {
344 err := e.docker.ContainerKill(ctx, containerID, syscall.SIGKILL.String())
345 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
346 return err
347 }
348
349 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
350 RemoveVolumes: true,
351 RemoveLinks: false,
352 Force: false,
353 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
354 return err
355 }
356
357 return nil
358}
359
360func (e *Engine) DestroyWorkflow(ctx context.Context, pipelineID, workflowName string) error {
361 e.cleanupMu.Lock()
362 key := fmt.Sprintf("%s-%s", pipelineID, workflowName)
363
364 fns := e.cleanup[key]
365 delete(e.cleanup, key)
366 e.cleanupMu.Unlock()
367
368 for _, fn := range fns {
369 if err := fn(ctx); err != nil {
370 e.l.Error("failed to cleanup workflow resource", "pipeline", pipelineID, "workflow", workflowName, "err", err)
371 }
372 }
373 return nil
374}
375
376func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) {
377 e.chanMu.RLock()
378 defer e.chanMu.RUnlock()
379
380 stdoutCh, ok1 := e.stdoutChans[pipelineID]
381 stderrCh, ok2 := e.stderrChans[pipelineID]
382
383 if !ok1 || !ok2 {
384 return nil, nil, false
385 }
386 return stdoutCh, stderrCh, true
387}
388
389func (e *Engine) registerCleanup(pipelineID, workflowName string, fn cleanupFunc) {
390 e.cleanupMu.Lock()
391 defer e.cleanupMu.Unlock()
392
393 key := fmt.Sprintf("%s-%s", pipelineID, workflowName)
394 e.cleanup[key] = append(e.cleanup[key], fn)
395}
396
397func workspaceVolume(id, name string) string {
398 return fmt.Sprintf("workspace-%s-%s", id, name)
399}
400
401func nixVolume(id, name string) string {
402 return fmt.Sprintf("nix-%s-%s", id, name)
403}
404
405func networkName(id, name string) string {
406 return fmt.Sprintf("workflow-network-%s-%s", id, name)
407}
408
409func hostConfig(id, name string) *container.HostConfig {
410 hostConfig := &container.HostConfig{
411 Mounts: []mount.Mount{
412 {
413 Type: mount.TypeVolume,
414 Source: workspaceVolume(id, name),
415 Target: workspaceDir,
416 },
417 {
418 Type: mount.TypeVolume,
419 Source: nixVolume(id, name),
420 Target: "/nix",
421 },
422 },
423 ReadonlyRootfs: true,
424 CapDrop: []string{"ALL"},
425 SecurityOpt: []string{"no-new-privileges"},
426 }
427
428 return hostConfig
429}
430
431// thanks woodpecker
432func isErrContainerNotFoundOrNotRunning(err error) bool {
433 // Error response from daemon: Cannot kill container: ...: No such container: ...
434 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
435 // Error response from podman daemon: can only kill running containers. ... is in state exited
436 // Error: No such container: ...
437 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
438}