1package engine
2
3import (
4 "bufio"
5 "context"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "path"
11 "strings"
12 "sync"
13
14 "github.com/docker/docker/api/types/container"
15 "github.com/docker/docker/api/types/image"
16 "github.com/docker/docker/api/types/mount"
17 "github.com/docker/docker/api/types/network"
18 "github.com/docker/docker/api/types/volume"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "tangled.sh/tangled.sh/core/api/tangled"
22 "tangled.sh/tangled.sh/core/log"
23 "tangled.sh/tangled.sh/core/notifier"
24 "tangled.sh/tangled.sh/core/spindle/db"
25 "tangled.sh/tangled.sh/core/spindle/models"
26)
27
28const (
29 workspaceDir = "/tangled/workspace"
30)
31
32type cleanupFunc func(context.Context) error
33
34type Engine struct {
35 docker client.APIClient
36 l *slog.Logger
37 db *db.DB
38 n *notifier.Notifier
39
40 chanMu sync.RWMutex
41 stdoutChans map[string]chan string
42 stderrChans map[string]chan string
43
44 cleanupMu sync.Mutex
45 cleanup map[string][]cleanupFunc
46}
47
48func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
49 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
50 if err != nil {
51 return nil, err
52 }
53
54 l := log.FromContext(ctx).With("component", "spindle")
55
56 e := &Engine{
57 docker: dcli,
58 l: l,
59 db: db,
60 n: n,
61 }
62
63 e.stdoutChans = make(map[string]chan string, 100)
64 e.stderrChans = make(map[string]chan string, 100)
65
66 e.cleanup = make(map[string][]cleanupFunc)
67
68 return e, nil
69}
70
71func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, pipelineId models.PipelineId) {
72 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
73
74 wg := sync.WaitGroup{}
75 for _, w := range pipeline.Workflows {
76 wg.Add(1)
77 go func() error {
78 defer wg.Done()
79 wid := models.WorkflowId{
80 PipelineId: pipelineId,
81 Name: w.Name,
82 }
83
84 err := e.db.StatusRunning(wid, e.n)
85 if err != nil {
86 return err
87 }
88
89 err = e.SetupWorkflow(ctx, wid)
90 if err != nil {
91 e.l.Error("setting up worklow", "wid", wid, "err", err)
92 return err
93 }
94 defer e.DestroyWorkflow(ctx, wid)
95
96 // TODO: actual checks for image/registry etc.
97 var deps string
98 for _, d := range w.Dependencies {
99 if d.Registry == "nixpkgs" {
100 deps = path.Join(d.Packages...)
101 }
102 }
103
104 // load defaults from somewhere else
105 deps = path.Join(deps, "bash", "git", "coreutils", "nix")
106
107 cimg := path.Join("nixery.dev", deps)
108 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{})
109 if err != nil {
110 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
111
112 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
113 if err != nil {
114 return err
115 }
116
117 return fmt.Errorf("pulling image: %w", err)
118 }
119 defer reader.Close()
120 io.Copy(os.Stdout, reader)
121
122 err = e.StartSteps(ctx, w.Steps, wid, cimg)
123 if err != nil {
124 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error())
125
126 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
127 if err != nil {
128 return err
129 }
130 }
131
132 err = e.db.StatusSuccess(wid, e.n)
133 if err != nil {
134 return err
135 }
136
137 return nil
138 }()
139 }
140
141 wg.Wait()
142}
143
144// SetupWorkflow sets up a new network for the workflow and volumes for
145// the workspace and Nix store. These are persisted across steps and are
146// destroyed at the end of the workflow.
147func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
148 e.l.Info("setting up workflow", "workflow", wid)
149
150 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
151 Name: workspaceVolume(wid),
152 Driver: "local",
153 })
154 if err != nil {
155 return err
156 }
157 e.registerCleanup(wid, func(ctx context.Context) error {
158 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
159 })
160
161 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
162 Name: nixVolume(wid),
163 Driver: "local",
164 })
165 if err != nil {
166 return err
167 }
168 e.registerCleanup(wid, func(ctx context.Context) error {
169 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
170 })
171
172 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
173 Driver: "bridge",
174 })
175 if err != nil {
176 return err
177 }
178 e.registerCleanup(wid, func(ctx context.Context) error {
179 return e.docker.NetworkRemove(ctx, networkName(wid))
180 })
181
182 return nil
183}
184
185// StartSteps starts all steps sequentially with the same base image.
186// ONLY marks pipeline as failed if container's exit code is non-zero.
187// All other errors are bubbled up.
188func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, wid models.WorkflowId, image string) error {
189 // set up logging channels
190 e.chanMu.Lock()
191 if _, exists := e.stdoutChans[wid.String()]; !exists {
192 e.stdoutChans[wid.String()] = make(chan string, 100)
193 }
194 if _, exists := e.stderrChans[wid.String()]; !exists {
195 e.stderrChans[wid.String()] = make(chan string, 100)
196 }
197 e.chanMu.Unlock()
198
199 // close channels after all steps are complete
200 defer func() {
201 close(e.stdoutChans[wid.String()])
202 close(e.stderrChans[wid.String()])
203 }()
204
205 for _, step := range steps {
206 hostConfig := hostConfig(wid)
207 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
208 Image: image,
209 Cmd: []string{"bash", "-c", step.Command},
210 WorkingDir: workspaceDir,
211 Tty: false,
212 Hostname: "spindle",
213 Env: []string{"HOME=" + workspaceDir},
214 }, hostConfig, nil, nil, "")
215 if err != nil {
216 return fmt.Errorf("creating container: %w", err)
217 }
218
219 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
220 if err != nil {
221 return fmt.Errorf("connecting network: %w", err)
222 }
223
224 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
225 if err != nil {
226 return err
227 }
228 e.l.Info("started container", "name", resp.ID, "step", step.Name)
229
230 wg := sync.WaitGroup{}
231
232 wg.Add(1)
233 go func() {
234 defer wg.Done()
235 err := e.TailStep(ctx, resp.ID, wid)
236 if err != nil {
237 e.l.Error("failed to tail container", "container", resp.ID)
238 return
239 }
240 }()
241
242 // wait until all logs are piped
243 wg.Wait()
244
245 state, err := e.WaitStep(ctx, resp.ID)
246 if err != nil {
247 return err
248 }
249
250 err = e.DestroyStep(ctx, resp.ID)
251 if err != nil {
252 return err
253 }
254
255 if state.ExitCode != 0 {
256 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode)
257 // return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n)
258 }
259 }
260
261 return nil
262
263}
264
265func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
266 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
267 select {
268 case err := <-errCh:
269 if err != nil {
270 return nil, err
271 }
272 case <-wait:
273 }
274
275 e.l.Info("waited for container", "name", containerID)
276
277 info, err := e.docker.ContainerInspect(ctx, containerID)
278 if err != nil {
279 return nil, err
280 }
281
282 return info.State, nil
283}
284
285func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error {
286 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
287 Follow: true,
288 ShowStdout: true,
289 ShowStderr: true,
290 Details: false,
291 Timestamps: false,
292 })
293 if err != nil {
294 return err
295 }
296
297 // using StdCopy we demux logs and stream stdout and stderr to different
298 // channels.
299 //
300 // stdout w||r stdoutCh
301 // stderr w||r stderrCh
302 //
303
304 rpipeOut, wpipeOut := io.Pipe()
305 rpipeErr, wpipeErr := io.Pipe()
306
307 go func() {
308 defer wpipeOut.Close()
309 defer wpipeErr.Close()
310 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs)
311 if err != nil && err != io.EOF {
312 e.l.Error("failed to copy logs", "error", err)
313 }
314 }()
315
316 // read from stdout and send to stdout pipe
317 // NOTE: the stdoutCh channnel is closed further up in StartSteps
318 // once all steps are done.
319 go func() {
320 e.chanMu.RLock()
321 stdoutCh := e.stdoutChans[wid.String()]
322 e.chanMu.RUnlock()
323
324 scanner := bufio.NewScanner(rpipeOut)
325 for scanner.Scan() {
326 stdoutCh <- scanner.Text()
327 }
328 if err := scanner.Err(); err != nil {
329 e.l.Error("failed to scan stdout", "error", err)
330 }
331 }()
332
333 // read from stderr and send to stderr pipe
334 // NOTE: the stderrCh channnel is closed further up in StartSteps
335 // once all steps are done.
336 go func() {
337 e.chanMu.RLock()
338 stderrCh := e.stderrChans[wid.String()]
339 e.chanMu.RUnlock()
340
341 scanner := bufio.NewScanner(rpipeErr)
342 for scanner.Scan() {
343 stderrCh <- scanner.Text()
344 }
345 if err := scanner.Err(); err != nil {
346 e.l.Error("failed to scan stderr", "error", err)
347 }
348 }()
349
350 return nil
351}
352
353func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
354 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
355 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
356 return err
357 }
358
359 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
360 RemoveVolumes: true,
361 RemoveLinks: false,
362 Force: false,
363 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
364 return err
365 }
366
367 return nil
368}
369
370func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
371 e.cleanupMu.Lock()
372 key := wid.String()
373
374 fns := e.cleanup[key]
375 delete(e.cleanup, key)
376 e.cleanupMu.Unlock()
377
378 for _, fn := range fns {
379 if err := fn(ctx); err != nil {
380 e.l.Error("failed to cleanup workflow resource", "workflowId", wid)
381 }
382 }
383 return nil
384}
385
386func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) {
387 e.chanMu.RLock()
388 defer e.chanMu.RUnlock()
389
390 stdoutCh, ok1 := e.stdoutChans[wid.String()]
391 stderrCh, ok2 := e.stderrChans[wid.String()]
392
393 if !ok1 || !ok2 {
394 return nil, nil, false
395 }
396 return stdoutCh, stderrCh, true
397}
398
399func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
400 e.cleanupMu.Lock()
401 defer e.cleanupMu.Unlock()
402
403 key := wid.String()
404 e.cleanup[key] = append(e.cleanup[key], fn)
405}
406
407func workspaceVolume(wid models.WorkflowId) string {
408 return fmt.Sprintf("workspace-%s", wid)
409}
410
411func nixVolume(wid models.WorkflowId) string {
412 return fmt.Sprintf("nix-%s", wid)
413}
414
415func networkName(wid models.WorkflowId) string {
416 return fmt.Sprintf("workflow-network-%s", wid)
417}
418
419func hostConfig(wid models.WorkflowId) *container.HostConfig {
420 hostConfig := &container.HostConfig{
421 Mounts: []mount.Mount{
422 {
423 Type: mount.TypeVolume,
424 Source: workspaceVolume(wid),
425 Target: workspaceDir,
426 },
427 {
428 Type: mount.TypeVolume,
429 Source: nixVolume(wid),
430 Target: "/nix",
431 },
432 },
433 ReadonlyRootfs: true,
434 CapDrop: []string{"ALL"},
435 SecurityOpt: []string{"no-new-privileges"},
436 }
437
438 return hostConfig
439}
440
441// thanks woodpecker
442func isErrContainerNotFoundOrNotRunning(err error) bool {
443 // Error response from daemon: Cannot kill container: ...: No such container: ...
444 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
445 // Error response from podman daemon: can only kill running containers. ... is in state exited
446 // Error: No such container: ...
447 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
448}