From 91739825dae3cf3ac07526bcbd8620f27c751c60 Mon Sep 17 00:00:00 2001 From: Anirudh Oppiliappan Date: Fri, 20 Jun 2025 14:28:28 +0300 Subject: [PATCH] spindle/engine: write logs to SPINDLE_PIPELINES_LOG_DIR Change-Id: pskpmmsmqpwvxotmwmtzwxopysntutoo Signed-off-by: Anirudh Oppiliappan --- spindle/config/config.go | 1 + spindle/engine/engine.go | 26 +++++++++++---- spindle/engine/logger.go | 72 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 7 deletions(-) create mode 100644 spindle/engine/logger.go diff --git a/spindle/config/config.go b/spindle/config/config.go index 1750103..fb9041f 100644 --- a/spindle/config/config.go +++ b/spindle/config/config.go @@ -19,6 +19,7 @@ type Pipelines struct { // TODO: change default to nixery.tangled.sh Nixery string `env:"NIXERY, default=nixery.dev"` StepTimeout string `env:"STEP_TIMEOUT, default=5m"` + LogDir string `env:"LOG_DIR, default=/var/log/spindle"` } type Config struct { diff --git a/spindle/engine/engine.go b/spindle/engine/engine.go index 6a7175c..57dfacd 100644 --- a/spindle/engine/engine.go +++ b/spindle/engine/engine.go @@ -203,7 +203,7 @@ func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models close(e.stderrChans[wid.String()]) }() - for _, step := range steps { + for stepIdx, step := range steps { envs := ConstructEnvs(step.Environment) envs.AddEnv("HOME", workspaceDir) e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) @@ -238,7 +238,7 @@ func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models // start tailing logs in background tailDone := make(chan error, 1) go func() { - tailDone <- e.TailStep(stepCtx, resp.ID, wid) + tailDone <- e.TailStep(stepCtx, resp.ID, wid, stepIdx) }() // wait for container completion or timeout @@ -314,7 +314,7 @@ func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.S return info.State, nil } -func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error { +func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int) error { logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ Follow: true, ShowStdout: true, @@ -326,12 +326,18 @@ func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.Wo return err } - var devOutput io.Writer = io.Discard + stepLogger, err := NewStepLogger(e.cfg.Pipelines.LogDir, wid.String(), stepIdx) + if err != nil { + e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err) + } + + var logOutput io.Writer = io.Discard + if e.cfg.Server.Dev { - devOutput = &ansiStrippingWriter{underlying: os.Stdout} + logOutput = &ansiStrippingWriter{underlying: os.Stdout} } - tee := io.TeeReader(logs, devOutput) + tee := io.TeeReader(logs, logOutput) // using StdCopy we demux logs and stream stdout and stderr to different // channels. @@ -343,6 +349,11 @@ func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.Wo rpipeOut, wpipeOut := io.Pipe() rpipeErr, wpipeErr := io.Pipe() + // sets up a io.MultiWriter to write to both the pipe + // and the file-based logger. + multiOut := io.MultiWriter(wpipeOut, stepLogger.Stdout()) + multiErr := io.MultiWriter(wpipeErr, stepLogger.Stderr()) + wg := sync.WaitGroup{} wg.Add(1) @@ -350,7 +361,8 @@ func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.Wo defer wg.Done() defer wpipeOut.Close() defer wpipeErr.Close() - _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee) + defer stepLogger.Close() + _, err := stdcopy.StdCopy(multiOut, multiErr, tee) if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) { e.l.Error("failed to copy logs", "error", err) } diff --git a/spindle/engine/logger.go b/spindle/engine/logger.go new file mode 100644 index 0000000..48ab65a --- /dev/null +++ b/spindle/engine/logger.go @@ -0,0 +1,72 @@ +package engine + +import ( + "fmt" + "io" + "os" + "path/filepath" +) + +type StepLogger struct { + stderr *os.File + stdout *os.File +} + +func NewStepLogger(baseDir, workflowID string, stepIdx int) (*StepLogger, error) { + dir := filepath.Join(baseDir, workflowID) + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("creating log dir: %w", err) + } + + stdoutPath := logFilePath(baseDir, workflowID, "stdout", stepIdx) + stderrPath := logFilePath(baseDir, workflowID, "stderr", stepIdx) + + stdoutFile, err := os.Create(stdoutPath) + if err != nil { + return nil, fmt.Errorf("creating stdout log file: %w", err) + } + + stderrFile, err := os.Create(stderrPath) + if err != nil { + stdoutFile.Close() + return nil, fmt.Errorf("creating stderr log file: %w", err) + } + + return &StepLogger{ + stdout: stdoutFile, + stderr: stderrFile, + }, nil +} + +func (l *StepLogger) Stdout() io.Writer { + return l.stdout +} + +func (l *StepLogger) Stderr() io.Writer { + return l.stderr +} + +func (l *StepLogger) Close() error { + err1 := l.stdout.Close() + err2 := l.stderr.Close() + if err1 != nil { + return err1 + } + return err2 +} + +func ReadStepLog(baseDir, workflowID, stream string, stepIdx int) (string, error) { + logPath := logFilePath(baseDir, workflowID, stream, stepIdx) + + data, err := os.ReadFile(logPath) + if err != nil { + return "", fmt.Errorf("error reading log file: %w", err) + } + + return string(data), nil +} + +func logFilePath(baseDir, workflowID, stream string, stepIdx int) string { + logFilePath := filepath.Join(baseDir, workflowID, fmt.Sprintf("%d-%s.log", stepIdx, stream)) + return logFilePath +} -- 2.43.0