spindle: stream logs from disk #261

merged
opened by anirudh.fi targeting master from push-vwyomovpppwp
Changed files
+85 -21
spindle
+1
spindle/server.go
···
w.Write([]byte(s.cfg.Server.Owner))
})
mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
+
mux.HandleFunc("/logs/{knot}/{rkey}/{name}/{idx}", s.StepLogs)
return mux
}
+84 -21
spindle/stream.go
···
package spindle
import (
+
"bufio"
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
+
"strings"
"time"
+
"tangled.sh/tangled.sh/core/spindle/engine"
"tangled.sh/tangled.sh/core/spindle/models"
"github.com/go-chi/chi/v5"
···
}
func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
-
l := s.l.With("handler", "Logs")
-
-
knot := chi.URLParam(r, "knot")
-
if knot == "" {
-
http.Error(w, "knot required", http.StatusBadRequest)
+
wid, err := getWorkflowID(r)
+
if err != nil {
+
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
-
rkey := chi.URLParam(r, "rkey")
-
if rkey == "" {
-
http.Error(w, "rkey required", http.StatusBadRequest)
+
s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error {
+
return s.streamLogs(ctx, conn, wid)
+
})
+
}
+
+
func (s *Spindle) StepLogs(w http.ResponseWriter, r *http.Request) {
+
wid, err := getWorkflowID(r)
+
if err != nil {
+
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
-
name := chi.URLParam(r, "name")
-
if name == "" {
-
http.Error(w, "name required", http.StatusBadRequest)
+
idxStr := chi.URLParam(r, "idx")
+
if idxStr == "" {
+
http.Error(w, "step index required", http.StatusBadRequest)
return
}
-
-
wid := models.WorkflowId{
-
PipelineId: models.PipelineId{
-
Knot: knot,
-
Rkey: rkey,
-
},
-
Name: name,
+
idx, err := strconv.Atoi(idxStr)
+
if err != nil {
+
http.Error(w, "bad step index", http.StatusBadRequest)
+
return
}
-
l = l.With("knot", knot, "rkey", rkey, "name", name)
+
s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error {
+
return s.streamLogFromDisk(ctx, conn, wid, idx)
+
})
+
}
+
+
func (s *Spindle) handleLogStream(w http.ResponseWriter, r *http.Request, streamFn func(ctx context.Context, conn *websocket.Conn) error) {
+
l := s.l.With("handler", "Logs")
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
···
}
}()
-
if err := s.streamLogs(ctx, conn, wid); err != nil {
-
l.Error("streamLogs failed", "err", err)
+
if err := streamFn(ctx, conn); err != nil {
+
l.Error("log stream failed", "err", err)
}
l.Debug("logs connection closed")
}
···
return nil
}
+
func (s *Spindle) streamLogFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId, stepIdx int) error {
+
streams := []string{"stdout", "stderr"}
+
+
for _, stream := range streams {
+
data, err := engine.ReadStepLog(s.cfg.Pipelines.LogDir, wid.String(), stream, stepIdx)
+
if err != nil {
+
// log but continue to next stream
+
s.l.Error("failed to read step log", "stream", stream, "step", stepIdx, "wid", wid.String(), "err", err)
+
continue
+
}
+
+
scanner := bufio.NewScanner(strings.NewReader(data))
+
for scanner.Scan() {
+
select {
+
case <-ctx.Done():
+
return ctx.Err()
+
default:
+
msg := map[string]string{
+
"type": stream,
+
"data": scanner.Text(),
+
}
+
if err := conn.WriteJSON(msg); err != nil {
+
return err
+
}
+
}
+
}
+
+
if err := scanner.Err(); err != nil {
+
return fmt.Errorf("error scanning %s log: %w", stream, err)
+
}
+
}
+
+
return nil
+
}
+
func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
events, err := s.db.GetEvents(*cursor)
if err != nil {
···
return nil
}
+
+
func getWorkflowID(r *http.Request) (models.WorkflowId, error) {
+
knot := chi.URLParam(r, "knot")
+
rkey := chi.URLParam(r, "rkey")
+
name := chi.URLParam(r, "name")
+
+
if knot == "" || rkey == "" || name == "" {
+
return models.WorkflowId{}, fmt.Errorf("missing required parameters")
+
}
+
+
return models.WorkflowId{
+
PipelineId: models.PipelineId{
+
Knot: knot,
+
Rkey: rkey,
+
},
+
Name: name,
+
}, nil
+
}