From 6315beeb7b510a590555b3f7ac0eb4bdb0917882 Mon Sep 17 00:00:00 2001 From: Anirudh Oppiliappan Date: Fri, 20 Jun 2025 14:58:11 +0300 Subject: [PATCH] spindle: stream logs from disk Change-Id: vwyomovpppwpvnmtpomqrspvxssrolyp Signed-off-by: Anirudh Oppiliappan --- spindle/server.go | 1 + spindle/stream.go | 105 ++++++++++++++++++++++++++++++++++++---------- 2 files changed, 85 insertions(+), 21 deletions(-) diff --git a/spindle/server.go b/spindle/server.go index 87d7f2c..d31aeb1 100644 --- a/spindle/server.go +++ b/spindle/server.go @@ -148,6 +148,7 @@ func (s *Spindle) Router() http.Handler { w.Write([]byte(s.cfg.Server.Owner)) }) mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) + mux.HandleFunc("/logs/{knot}/{rkey}/{name}/{idx}", s.StepLogs) return mux } diff --git a/spindle/stream.go b/spindle/stream.go index 28c9954..b7ee63a 100644 --- a/spindle/stream.go +++ b/spindle/stream.go @@ -1,13 +1,16 @@ package spindle import ( + "bufio" "context" "encoding/json" "fmt" "net/http" "strconv" + "strings" "time" + "tangled.sh/tangled.sh/core/spindle/engine" "tangled.sh/tangled.sh/core/spindle/models" "github.com/go-chi/chi/v5" @@ -88,35 +91,42 @@ func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { } func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { - l := s.l.With("handler", "Logs") - - knot := chi.URLParam(r, "knot") - if knot == "" { - http.Error(w, "knot required", http.StatusBadRequest) + wid, err := getWorkflowID(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) return } - rkey := chi.URLParam(r, "rkey") - if rkey == "" { - http.Error(w, "rkey required", http.StatusBadRequest) + s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error { + return s.streamLogs(ctx, conn, wid) + }) +} + +func (s *Spindle) StepLogs(w http.ResponseWriter, r *http.Request) { + wid, err := getWorkflowID(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) return } - name := chi.URLParam(r, "name") - if name == "" { - http.Error(w, "name required", http.StatusBadRequest) + idxStr := chi.URLParam(r, "idx") + if idxStr == "" { + http.Error(w, "step index required", http.StatusBadRequest) return } - - wid := models.WorkflowId{ - PipelineId: models.PipelineId{ - Knot: knot, - Rkey: rkey, - }, - Name: name, + idx, err := strconv.Atoi(idxStr) + if err != nil { + http.Error(w, "bad step index", http.StatusBadRequest) + return } - l = l.With("knot", knot, "rkey", rkey, "name", name) + s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error { + return s.streamLogFromDisk(ctx, conn, wid, idx) + }) +} + +func (s *Spindle) handleLogStream(w http.ResponseWriter, r *http.Request, streamFn func(ctx context.Context, conn *websocket.Conn) error) { + l := s.l.With("handler", "Logs") conn, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -140,8 +150,8 @@ func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { } }() - if err := s.streamLogs(ctx, conn, wid); err != nil { - l.Error("streamLogs failed", "err", err) + if err := streamFn(ctx, conn); err != nil { + l.Error("log stream failed", "err", err) } l.Debug("logs connection closed") } @@ -206,6 +216,41 @@ func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, wid mode return nil } +func (s *Spindle) streamLogFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId, stepIdx int) error { + streams := []string{"stdout", "stderr"} + + for _, stream := range streams { + data, err := engine.ReadStepLog(s.cfg.Pipelines.LogDir, wid.String(), stream, stepIdx) + if err != nil { + // log but continue to next stream + s.l.Error("failed to read step log", "stream", stream, "step", stepIdx, "wid", wid.String(), "err", err) + continue + } + + scanner := bufio.NewScanner(strings.NewReader(data)) + for scanner.Scan() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + msg := map[string]string{ + "type": stream, + "data": scanner.Text(), + } + if err := conn.WriteJSON(msg); err != nil { + return err + } + } + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("error scanning %s log: %w", stream, err) + } + } + + return nil +} + func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { events, err := s.db.GetEvents(*cursor) if err != nil { @@ -242,3 +287,21 @@ func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { return nil } + +func getWorkflowID(r *http.Request) (models.WorkflowId, error) { + knot := chi.URLParam(r, "knot") + rkey := chi.URLParam(r, "rkey") + name := chi.URLParam(r, "name") + + if knot == "" || rkey == "" || name == "" { + return models.WorkflowId{}, fmt.Errorf("missing required parameters") + } + + return models.WorkflowId{ + PipelineId: models.PipelineId{ + Knot: knot, + Rkey: rkey, + }, + Name: name, + }, nil +} -- 2.43.0