spindle: emit control log lines for start and end of steps #730

merged
opened by oppi.li targeting master from push-lstutzylzylk
Changed files
+52 -23
spindle
engine
engines
nixery
models
+13 -3
spindle/engine/engine.go
···
defer cancel()
for stepIdx, step := range w.Steps {
if wfLogger != nil {
-
ctl := wfLogger.ControlWriter(stepIdx, step)
-
ctl.Write([]byte(step.Name()))
}
err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger)
if err != nil {
if errors.Is(err, ErrTimedOut) {
dbErr := db.StatusTimeout(wid, n)
···
if err := eg.Wait(); err != nil {
l.Error("failed to run one or more workflows", "err", err)
} else {
-
l.Error("successfully ran full pipeline")
}
}
···
defer cancel()
for stepIdx, step := range w.Steps {
+
// log start of step
if wfLogger != nil {
+
wfLogger.
+
ControlWriter(stepIdx, step, models.StepStatusStart).
+
Write([]byte{0})
}
err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger)
+
+
// log end of step
+
if wfLogger != nil {
+
wfLogger.
+
ControlWriter(stepIdx, step, models.StepStatusEnd).
+
Write([]byte{0})
+
}
+
if err != nil {
if errors.Is(err, ErrTimedOut) {
dbErr := db.StatusTimeout(wid, n)
···
if err := eg.Wait(); err != nil {
l.Error("failed to run one or more workflows", "err", err)
} else {
+
l.Info("successfully ran full pipeline")
}
}
+2 -2
spindle/engines/nixery/engine.go
···
defer logs.Close()
_, err = stdcopy.StdCopy(
-
wfLogger.DataWriter("stdout"),
-
wfLogger.DataWriter("stderr"),
logs.Reader,
)
if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
···
defer logs.Close()
_, err = stdcopy.StdCopy(
+
wfLogger.DataWriter(stepIdx, "stdout"),
+
wfLogger.DataWriter(stepIdx, "stderr"),
logs.Reader,
)
if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
+14 -10
spindle/models/logger.go
···
return l.file.Close()
}
-
func (l *WorkflowLogger) DataWriter(stream string) io.Writer {
return &dataWriter{
logger: l,
stream: stream,
}
}
-
func (l *WorkflowLogger) ControlWriter(idx int, step Step) io.Writer {
return &controlWriter{
-
logger: l,
-
idx: idx,
-
step: step,
}
}
type dataWriter struct {
logger *WorkflowLogger
stream string
}
func (w *dataWriter) Write(p []byte) (int, error) {
line := strings.TrimRight(string(p), "\r\n")
-
entry := NewDataLogLine(line, w.stream)
if err := w.logger.encoder.Encode(entry); err != nil {
return 0, err
}
···
}
type controlWriter struct {
-
logger *WorkflowLogger
-
idx int
-
step Step
}
func (w *controlWriter) Write(_ []byte) (int, error) {
-
entry := NewControlLogLine(w.idx, w.step)
if err := w.logger.encoder.Encode(entry); err != nil {
return 0, err
}
···
return l.file.Close()
}
+
func (l *WorkflowLogger) DataWriter(idx int, stream string) io.Writer {
return &dataWriter{
logger: l,
+
idx: idx,
stream: stream,
}
}
+
func (l *WorkflowLogger) ControlWriter(idx int, step Step, stepStatus StepStatus) io.Writer {
return &controlWriter{
+
logger: l,
+
idx: idx,
+
step: step,
+
stepStatus: stepStatus,
}
}
type dataWriter struct {
logger *WorkflowLogger
+
idx int
stream string
}
func (w *dataWriter) Write(p []byte) (int, error) {
line := strings.TrimRight(string(p), "\r\n")
+
entry := NewDataLogLine(w.idx, line, w.stream)
if err := w.logger.encoder.Encode(entry); err != nil {
return 0, err
}
···
}
type controlWriter struct {
+
logger *WorkflowLogger
+
idx int
+
step Step
+
stepStatus StepStatus
}
func (w *controlWriter) Write(_ []byte) (int, error) {
+
entry := NewControlLogLine(w.idx, w.step, w.stepStatus)
if err := w.logger.encoder.Encode(entry); err != nil {
return 0, err
}
+23 -8
spindle/models/models.go
···
"fmt"
"regexp"
"slices"
"tangled.org/core/api/tangled"
···
var (
// step log data
LogKindData LogKind = "data"
-
// indicates start/end of a step
LogKindControl LogKind = "control"
)
type LogLine struct {
-
Kind LogKind `json:"kind"`
-
Content string `json:"content"`
// fields if kind is "data"
Stream string `json:"stream,omitempty"`
// fields if kind is "control"
-
StepId int `json:"step_id,omitempty"`
-
StepKind StepKind `json:"step_kind,omitempty"`
-
StepCommand string `json:"step_command,omitempty"`
}
-
func NewDataLogLine(content, stream string) LogLine {
return LogLine{
Kind: LogKindData,
Content: content,
Stream: stream,
}
}
-
func NewControlLogLine(idx int, step Step) LogLine {
return LogLine{
Kind: LogKindControl,
Content: step.Name(),
StepId: idx,
StepKind: step.Kind(),
StepCommand: step.Command(),
}
···
"fmt"
"regexp"
"slices"
+
"time"
"tangled.org/core/api/tangled"
···
var (
// step log data
LogKindData LogKind = "data"
+
// indicates status of a step
LogKindControl LogKind = "control"
)
+
// step status indicator in control log lines
+
type StepStatus string
+
+
var (
+
StepStatusStart StepStatus = "start"
+
StepStatusEnd StepStatus = "end"
+
)
+
type LogLine struct {
+
Kind LogKind `json:"kind"`
+
Content string `json:"content"`
+
Time time.Time `json:"time"`
+
StepId int `json:"step_id"`
// fields if kind is "data"
Stream string `json:"stream,omitempty"`
// fields if kind is "control"
+
StepStatus StepStatus `json:"step_status,omitempty"`
+
StepKind StepKind `json:"step_kind,omitempty"`
+
StepCommand string `json:"step_command,omitempty"`
}
+
func NewDataLogLine(idx int, content, stream string) LogLine {
return LogLine{
Kind: LogKindData,
+
Time: time.Now(),
Content: content,
+
StepId: idx,
Stream: stream,
}
}
+
func NewControlLogLine(idx int, step Step, status StepStatus) LogLine {
return LogLine{
Kind: LogKindControl,
+
Time: time.Now(),
Content: step.Name(),
StepId: idx,
+
StepStatus: status,
StepKind: step.Kind(),
StepCommand: step.Command(),
}