spindle/engine: write logs to SPINDLE_PIPELINES_LOG_DIR #260

merged
opened by anirudh.fi targeting master from push-vwyomovpppwp
Changed files
+92 -7
spindle
config
engine
+1
spindle/config/config.go
···
// TODO: change default to nixery.tangled.sh
Nixery string `env:"NIXERY, default=nixery.dev"`
StepTimeout string `env:"STEP_TIMEOUT, default=5m"`
+
LogDir string `env:"LOG_DIR, default=/var/log/spindle"`
}
type Config struct {
+19 -7
spindle/engine/engine.go
···
close(e.stderrChans[wid.String()])
}()
-
for _, step := range steps {
+
for stepIdx, step := range steps {
envs := ConstructEnvs(step.Environment)
envs.AddEnv("HOME", workspaceDir)
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
···
// start tailing logs in background
tailDone := make(chan error, 1)
go func() {
-
tailDone <- e.TailStep(stepCtx, resp.ID, wid)
+
tailDone <- e.TailStep(stepCtx, resp.ID, wid, stepIdx)
}()
// wait for container completion or timeout
···
return info.State, nil
}
-
func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error {
+
func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int) error {
logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
Follow: true,
ShowStdout: true,
···
return err
}
-
var devOutput io.Writer = io.Discard
+
stepLogger, err := NewStepLogger(e.cfg.Pipelines.LogDir, wid.String(), stepIdx)
+
if err != nil {
+
e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
+
}
+
+
var logOutput io.Writer = io.Discard
+
if e.cfg.Server.Dev {
-
devOutput = &ansiStrippingWriter{underlying: os.Stdout}
+
logOutput = &ansiStrippingWriter{underlying: os.Stdout}
}
-
tee := io.TeeReader(logs, devOutput)
+
tee := io.TeeReader(logs, logOutput)
// using StdCopy we demux logs and stream stdout and stderr to different
// channels.
···
rpipeOut, wpipeOut := io.Pipe()
rpipeErr, wpipeErr := io.Pipe()
+
// sets up a io.MultiWriter to write to both the pipe
+
// and the file-based logger.
+
multiOut := io.MultiWriter(wpipeOut, stepLogger.Stdout())
+
multiErr := io.MultiWriter(wpipeErr, stepLogger.Stderr())
+
wg := sync.WaitGroup{}
wg.Add(1)
···
defer wg.Done()
defer wpipeOut.Close()
defer wpipeErr.Close()
-
_, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee)
+
defer stepLogger.Close()
+
_, err := stdcopy.StdCopy(multiOut, multiErr, tee)
if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) {
e.l.Error("failed to copy logs", "error", err)
}
+72
spindle/engine/logger.go
···
+
package engine
+
+
import (
+
"fmt"
+
"io"
+
"os"
+
"path/filepath"
+
)
+
+
type StepLogger struct {
+
stderr *os.File
+
stdout *os.File
+
}
+
+
func NewStepLogger(baseDir, workflowID string, stepIdx int) (*StepLogger, error) {
+
dir := filepath.Join(baseDir, workflowID)
+
if err := os.MkdirAll(dir, 0755); err != nil {
+
return nil, fmt.Errorf("creating log dir: %w", err)
+
}
+
+
stdoutPath := logFilePath(baseDir, workflowID, "stdout", stepIdx)
+
stderrPath := logFilePath(baseDir, workflowID, "stderr", stepIdx)
+
+
stdoutFile, err := os.Create(stdoutPath)
+
if err != nil {
+
return nil, fmt.Errorf("creating stdout log file: %w", err)
+
}
+
+
stderrFile, err := os.Create(stderrPath)
+
if err != nil {
+
stdoutFile.Close()
+
return nil, fmt.Errorf("creating stderr log file: %w", err)
+
}
+
+
return &StepLogger{
+
stdout: stdoutFile,
+
stderr: stderrFile,
+
}, nil
+
}
+
+
func (l *StepLogger) Stdout() io.Writer {
+
return l.stdout
+
}
+
+
func (l *StepLogger) Stderr() io.Writer {
+
return l.stderr
+
}
+
+
func (l *StepLogger) Close() error {
+
err1 := l.stdout.Close()
+
err2 := l.stderr.Close()
+
if err1 != nil {
+
return err1
+
}
+
return err2
+
}
+
+
func ReadStepLog(baseDir, workflowID, stream string, stepIdx int) (string, error) {
+
logPath := logFilePath(baseDir, workflowID, stream, stepIdx)
+
+
data, err := os.ReadFile(logPath)
+
if err != nil {
+
return "", fmt.Errorf("error reading log file: %w", err)
+
}
+
+
return string(data), nil
+
}
+
+
func logFilePath(baseDir, workflowID, stream string, stepIdx int) string {
+
logFilePath := filepath.Join(baseDir, workflowID, fmt.Sprintf("%d-%s.log", stepIdx, stream))
+
return logFilePath
+
}