1package engine
2
3import (
4 "bufio"
5 "context"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "path"
11 "strings"
12 "sync"
13
14 "github.com/docker/docker/api/types/container"
15 "github.com/docker/docker/api/types/image"
16 "github.com/docker/docker/api/types/mount"
17 "github.com/docker/docker/api/types/network"
18 "github.com/docker/docker/api/types/volume"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "tangled.sh/tangled.sh/core/api/tangled"
22 "tangled.sh/tangled.sh/core/log"
23 "tangled.sh/tangled.sh/core/notifier"
24 "tangled.sh/tangled.sh/core/spindle/db"
25 "tangled.sh/tangled.sh/core/spindle/models"
26)
27
28const (
29 workspaceDir = "/tangled/workspace"
30)
31
32type cleanupFunc func(context.Context) error
33
34type Engine struct {
35 docker client.APIClient
36 l *slog.Logger
37 db *db.DB
38 n *notifier.Notifier
39
40 chanMu sync.RWMutex
41 stdoutChans map[string]chan string
42 stderrChans map[string]chan string
43
44 cleanupMu sync.Mutex
45 cleanup map[string][]cleanupFunc
46}
47
48func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
49 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
50 if err != nil {
51 return nil, err
52 }
53
54 l := log.FromContext(ctx).With("component", "spindle")
55
56 e := &Engine{
57 docker: dcli,
58 l: l,
59 db: db,
60 n: n,
61 }
62
63 e.stdoutChans = make(map[string]chan string, 100)
64 e.stderrChans = make(map[string]chan string, 100)
65
66 e.cleanup = make(map[string][]cleanupFunc)
67
68 return e, nil
69}
70
71func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, pipelineId models.PipelineId) {
72 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
73
74 wg := sync.WaitGroup{}
75 for _, w := range pipeline.Workflows {
76 wg.Add(1)
77 go func() error {
78 defer wg.Done()
79 wid := models.WorkflowId{
80 PipelineId: pipelineId,
81 Name: w.Name,
82 }
83
84 err := e.db.StatusRunning(wid, e.n)
85 if err != nil {
86 return err
87 }
88
89 err = e.SetupWorkflow(ctx, wid)
90 if err != nil {
91 e.l.Error("setting up worklow", "wid", wid, "err", err)
92 return err
93 }
94 defer e.DestroyWorkflow(ctx, wid)
95
96 // TODO: actual checks for image/registry etc.
97 var deps string
98 for _, d := range w.Dependencies {
99 if d.Registry == "nixpkgs" {
100 deps = path.Join(d.Packages...)
101 }
102 }
103
104 // load defaults from somewhere else
105 deps = path.Join(deps, "bash", "git", "coreutils", "nix")
106
107 cimg := path.Join("nixery.dev", deps)
108 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{})
109 if err != nil {
110 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
111
112 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
113 if err != nil {
114 return err
115 }
116
117 return fmt.Errorf("pulling image: %w", err)
118 }
119 defer reader.Close()
120 io.Copy(os.Stdout, reader)
121
122 err = e.StartSteps(ctx, w.Steps, wid, cimg)
123 if err != nil {
124 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error())
125
126 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
127 if err != nil {
128 return err
129 }
130
131 return fmt.Errorf("starting steps image: %w", err)
132 }
133
134 err = e.db.StatusSuccess(wid, e.n)
135 if err != nil {
136 return err
137 }
138
139 return nil
140 }()
141 }
142
143 wg.Wait()
144}
145
146// SetupWorkflow sets up a new network for the workflow and volumes for
147// the workspace and Nix store. These are persisted across steps and are
148// destroyed at the end of the workflow.
149func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
150 e.l.Info("setting up workflow", "workflow", wid)
151
152 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
153 Name: workspaceVolume(wid),
154 Driver: "local",
155 })
156 if err != nil {
157 return err
158 }
159 e.registerCleanup(wid, func(ctx context.Context) error {
160 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
161 })
162
163 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
164 Name: nixVolume(wid),
165 Driver: "local",
166 })
167 if err != nil {
168 return err
169 }
170 e.registerCleanup(wid, func(ctx context.Context) error {
171 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
172 })
173
174 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
175 Driver: "bridge",
176 })
177 if err != nil {
178 return err
179 }
180 e.registerCleanup(wid, func(ctx context.Context) error {
181 return e.docker.NetworkRemove(ctx, networkName(wid))
182 })
183
184 return nil
185}
186
187// StartSteps starts all steps sequentially with the same base image.
188// ONLY marks pipeline as failed if container's exit code is non-zero.
189// All other errors are bubbled up.
190func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, wid models.WorkflowId, image string) error {
191 // set up logging channels
192 e.chanMu.Lock()
193 if _, exists := e.stdoutChans[wid.String()]; !exists {
194 e.stdoutChans[wid.String()] = make(chan string, 100)
195 }
196 if _, exists := e.stderrChans[wid.String()]; !exists {
197 e.stderrChans[wid.String()] = make(chan string, 100)
198 }
199 e.chanMu.Unlock()
200
201 // close channels after all steps are complete
202 defer func() {
203 close(e.stdoutChans[wid.String()])
204 close(e.stderrChans[wid.String()])
205 }()
206
207 for _, step := range steps {
208 envs := ConstructEnvs(step.Environment)
209 envs.AddEnv("HOME", workspaceDir)
210 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
211
212 hostConfig := hostConfig(wid)
213 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
214 Image: image,
215 Cmd: []string{"bash", "-c", step.Command},
216 WorkingDir: workspaceDir,
217 Tty: false,
218 Hostname: "spindle",
219 Env: envs.Slice(),
220 }, hostConfig, nil, nil, "")
221 if err != nil {
222 return fmt.Errorf("creating container: %w", err)
223 }
224
225 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
226 if err != nil {
227 return fmt.Errorf("connecting network: %w", err)
228 }
229
230 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
231 if err != nil {
232 return err
233 }
234 e.l.Info("started container", "name", resp.ID, "step", step.Name)
235
236 wg := sync.WaitGroup{}
237
238 wg.Add(1)
239 go func() {
240 defer wg.Done()
241 err := e.TailStep(ctx, resp.ID, wid)
242 if err != nil {
243 e.l.Error("failed to tail container", "container", resp.ID)
244 return
245 }
246 }()
247
248 // wait until all logs are piped
249 wg.Wait()
250
251 state, err := e.WaitStep(ctx, resp.ID)
252 if err != nil {
253 return err
254 }
255
256 err = e.DestroyStep(ctx, resp.ID)
257 if err != nil {
258 return err
259 }
260
261 if state.ExitCode != 0 {
262 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode)
263 return fmt.Errorf("%s", state.Error)
264 }
265 }
266
267 return nil
268
269}
270
271func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
272 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
273 select {
274 case err := <-errCh:
275 if err != nil {
276 return nil, err
277 }
278 case <-wait:
279 }
280
281 e.l.Info("waited for container", "name", containerID)
282
283 info, err := e.docker.ContainerInspect(ctx, containerID)
284 if err != nil {
285 return nil, err
286 }
287
288 return info.State, nil
289}
290
291func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error {
292 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
293 Follow: true,
294 ShowStdout: true,
295 ShowStderr: true,
296 Details: false,
297 Timestamps: false,
298 })
299 if err != nil {
300 return err
301 }
302
303 // using StdCopy we demux logs and stream stdout and stderr to different
304 // channels.
305 //
306 // stdout w||r stdoutCh
307 // stderr w||r stderrCh
308 //
309
310 rpipeOut, wpipeOut := io.Pipe()
311 rpipeErr, wpipeErr := io.Pipe()
312
313 go func() {
314 defer wpipeOut.Close()
315 defer wpipeErr.Close()
316 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs)
317 if err != nil && err != io.EOF {
318 e.l.Error("failed to copy logs", "error", err)
319 }
320 }()
321
322 // read from stdout and send to stdout pipe
323 // NOTE: the stdoutCh channnel is closed further up in StartSteps
324 // once all steps are done.
325 go func() {
326 e.chanMu.RLock()
327 stdoutCh := e.stdoutChans[wid.String()]
328 e.chanMu.RUnlock()
329
330 scanner := bufio.NewScanner(rpipeOut)
331 for scanner.Scan() {
332 stdoutCh <- scanner.Text()
333 }
334 if err := scanner.Err(); err != nil {
335 e.l.Error("failed to scan stdout", "error", err)
336 }
337 }()
338
339 // read from stderr and send to stderr pipe
340 // NOTE: the stderrCh channnel is closed further up in StartSteps
341 // once all steps are done.
342 go func() {
343 e.chanMu.RLock()
344 stderrCh := e.stderrChans[wid.String()]
345 e.chanMu.RUnlock()
346
347 scanner := bufio.NewScanner(rpipeErr)
348 for scanner.Scan() {
349 stderrCh <- scanner.Text()
350 }
351 if err := scanner.Err(); err != nil {
352 e.l.Error("failed to scan stderr", "error", err)
353 }
354 }()
355
356 return nil
357}
358
359func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
360 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
361 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
362 return err
363 }
364
365 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
366 RemoveVolumes: true,
367 RemoveLinks: false,
368 Force: false,
369 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
370 return err
371 }
372
373 return nil
374}
375
376func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
377 e.cleanupMu.Lock()
378 key := wid.String()
379
380 fns := e.cleanup[key]
381 delete(e.cleanup, key)
382 e.cleanupMu.Unlock()
383
384 for _, fn := range fns {
385 if err := fn(ctx); err != nil {
386 e.l.Error("failed to cleanup workflow resource", "workflowId", wid)
387 }
388 }
389 return nil
390}
391
392func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) {
393 e.chanMu.RLock()
394 defer e.chanMu.RUnlock()
395
396 stdoutCh, ok1 := e.stdoutChans[wid.String()]
397 stderrCh, ok2 := e.stderrChans[wid.String()]
398
399 if !ok1 || !ok2 {
400 return nil, nil, false
401 }
402 return stdoutCh, stderrCh, true
403}
404
405func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
406 e.cleanupMu.Lock()
407 defer e.cleanupMu.Unlock()
408
409 key := wid.String()
410 e.cleanup[key] = append(e.cleanup[key], fn)
411}
412
413func workspaceVolume(wid models.WorkflowId) string {
414 return fmt.Sprintf("workspace-%s", wid)
415}
416
417func nixVolume(wid models.WorkflowId) string {
418 return fmt.Sprintf("nix-%s", wid)
419}
420
421func networkName(wid models.WorkflowId) string {
422 return fmt.Sprintf("workflow-network-%s", wid)
423}
424
425func hostConfig(wid models.WorkflowId) *container.HostConfig {
426 hostConfig := &container.HostConfig{
427 Mounts: []mount.Mount{
428 {
429 Type: mount.TypeVolume,
430 Source: workspaceVolume(wid),
431 Target: workspaceDir,
432 },
433 {
434 Type: mount.TypeVolume,
435 Source: nixVolume(wid),
436 Target: "/nix",
437 },
438 },
439 ReadonlyRootfs: true,
440 CapDrop: []string{"ALL"},
441 SecurityOpt: []string{"no-new-privileges"},
442 }
443
444 return hostConfig
445}
446
447// thanks woodpecker
448func isErrContainerNotFoundOrNotRunning(err error) bool {
449 // Error response from daemon: Cannot kill container: ...: No such container: ...
450 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
451 // Error response from podman daemon: can only kill running containers. ... is in state exited
452 // Error: No such container: ...
453 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
454}