From 783496a917aa370d6caed334054aac83ff028325 Mon Sep 17 00:00:00 2001 From: oppiliappan Date: Thu, 30 Oct 2025 12:04:16 +0000 Subject: [PATCH] spindle: emit control log lines for start and end of steps Change-Id: kpvttrlkwpwpkkskwuslxnylxkkuqrnk Signed-off-by: oppiliappan --- spindle/engine/engine.go | 16 +++++++++++++--- spindle/engines/nixery/engine.go | 4 ++-- spindle/models/logger.go | 24 ++++++++++++++---------- spindle/models/models.go | 31 +++++++++++++++++++++++-------- 4 files changed, 52 insertions(+), 23 deletions(-) diff --git a/spindle/engine/engine.go b/spindle/engine/engine.go index 22072cc8..25988042 100644 --- a/spindle/engine/engine.go +++ b/spindle/engine/engine.go @@ -79,12 +79,22 @@ func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, d defer cancel() for stepIdx, step := range w.Steps { + // log start of step if wfLogger != nil { - ctl := wfLogger.ControlWriter(stepIdx, step) - ctl.Write([]byte(step.Name())) + wfLogger. + ControlWriter(stepIdx, step, models.StepStatusStart). + Write([]byte{0}) } err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger) + + // log end of step + if wfLogger != nil { + wfLogger. + ControlWriter(stepIdx, step, models.StepStatusEnd). + Write([]byte{0}) + } + if err != nil { if errors.Is(err, ErrTimedOut) { dbErr := db.StatusTimeout(wid, n) @@ -115,6 +125,6 @@ func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, d if err := eg.Wait(); err != nil { l.Error("failed to run one or more workflows", "err", err) } else { - l.Error("successfully ran full pipeline") + l.Info("successfully ran full pipeline") } } diff --git a/spindle/engines/nixery/engine.go b/spindle/engines/nixery/engine.go index 668b4445..64f0b5f7 100644 --- a/spindle/engines/nixery/engine.go +++ b/spindle/engines/nixery/engine.go @@ -381,8 +381,8 @@ func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, defer logs.Close() _, err = stdcopy.StdCopy( - wfLogger.DataWriter("stdout"), - wfLogger.DataWriter("stderr"), + wfLogger.DataWriter(stepIdx, "stdout"), + wfLogger.DataWriter(stepIdx, "stderr"), logs.Reader, ) if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { diff --git a/spindle/models/logger.go b/spindle/models/logger.go index aa872c6a..3a43ccbf 100644 --- a/spindle/models/logger.go +++ b/spindle/models/logger.go @@ -37,29 +37,32 @@ func (l *WorkflowLogger) Close() error { return l.file.Close() } -func (l *WorkflowLogger) DataWriter(stream string) io.Writer { +func (l *WorkflowLogger) DataWriter(idx int, stream string) io.Writer { return &dataWriter{ logger: l, + idx: idx, stream: stream, } } -func (l *WorkflowLogger) ControlWriter(idx int, step Step) io.Writer { +func (l *WorkflowLogger) ControlWriter(idx int, step Step, stepStatus StepStatus) io.Writer { return &controlWriter{ - logger: l, - idx: idx, - step: step, + logger: l, + idx: idx, + step: step, + stepStatus: stepStatus, } } type dataWriter struct { logger *WorkflowLogger + idx int stream string } func (w *dataWriter) Write(p []byte) (int, error) { line := strings.TrimRight(string(p), "\r\n") - entry := NewDataLogLine(line, w.stream) + entry := NewDataLogLine(w.idx, line, w.stream) if err := w.logger.encoder.Encode(entry); err != nil { return 0, err } @@ -67,13 +70,14 @@ func (w *dataWriter) Write(p []byte) (int, error) { } type controlWriter struct { - logger *WorkflowLogger - idx int - step Step + logger *WorkflowLogger + idx int + step Step + stepStatus StepStatus } func (w *controlWriter) Write(_ []byte) (int, error) { - entry := NewControlLogLine(w.idx, w.step) + entry := NewControlLogLine(w.idx, w.step, w.stepStatus) if err := w.logger.encoder.Encode(entry); err != nil { return 0, err } diff --git a/spindle/models/models.go b/spindle/models/models.go index c692ec19..abd675ce 100644 --- a/spindle/models/models.go +++ b/spindle/models/models.go @@ -4,6 +4,7 @@ import ( "fmt" "regexp" "slices" + "time" "tangled.org/core/api/tangled" @@ -76,36 +77,50 @@ type LogKind string var ( // step log data LogKindData LogKind = "data" - // indicates start/end of a step + // indicates status of a step LogKindControl LogKind = "control" ) +// step status indicator in control log lines +type StepStatus string + +var ( + StepStatusStart StepStatus = "start" + StepStatusEnd StepStatus = "end" +) + type LogLine struct { - Kind LogKind `json:"kind"` - Content string `json:"content"` + Kind LogKind `json:"kind"` + Content string `json:"content"` + Time time.Time `json:"time"` + StepId int `json:"step_id"` // fields if kind is "data" Stream string `json:"stream,omitempty"` // fields if kind is "control" - StepId int `json:"step_id,omitempty"` - StepKind StepKind `json:"step_kind,omitempty"` - StepCommand string `json:"step_command,omitempty"` + StepStatus StepStatus `json:"step_status,omitempty"` + StepKind StepKind `json:"step_kind,omitempty"` + StepCommand string `json:"step_command,omitempty"` } -func NewDataLogLine(content, stream string) LogLine { +func NewDataLogLine(idx int, content, stream string) LogLine { return LogLine{ Kind: LogKindData, + Time: time.Now(), Content: content, + StepId: idx, Stream: stream, } } -func NewControlLogLine(idx int, step Step) LogLine { +func NewControlLogLine(idx int, step Step, status StepStatus) LogLine { return LogLine{ Kind: LogKindControl, + Time: time.Now(), Content: step.Name(), StepId: idx, + StepStatus: status, StepKind: step.Kind(), StepCommand: step.Command(), } -- 2.43.0