···
"tangled.sh/tangled.sh/core/spindle/models"
"github.com/go-chi/chi/v5"
···
func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
-
l := s.l.With("handler", "Logs")
-
knot := chi.URLParam(r, "knot")
-
http.Error(w, "knot required", http.StatusBadRequest)
-
rkey := chi.URLParam(r, "rkey")
-
http.Error(w, "rkey required", http.StatusBadRequest)
-
name := chi.URLParam(r, "name")
-
http.Error(w, "name required", http.StatusBadRequest)
-
wid := models.WorkflowId{
-
PipelineId: models.PipelineId{
-
l = l.With("knot", knot, "rkey", rkey, "name", name)
conn, err := upgrader.Upgrade(w, r, nil)
···
-
if err := s.streamLogs(ctx, conn, wid); err != nil {
-
l.Error("streamLogs failed", "err", err)
l.Debug("logs connection closed")
···
func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
events, err := s.db.GetEvents(*cursor)
···
···
+
"tangled.sh/tangled.sh/core/spindle/engine"
"tangled.sh/tangled.sh/core/spindle/models"
"github.com/go-chi/chi/v5"
···
func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
+
wid, err := getWorkflowID(r)
+
http.Error(w, err.Error(), http.StatusBadRequest)
+
s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error {
+
return s.streamLogs(ctx, conn, wid)
+
func (s *Spindle) StepLogs(w http.ResponseWriter, r *http.Request) {
+
wid, err := getWorkflowID(r)
+
http.Error(w, err.Error(), http.StatusBadRequest)
+
idxStr := chi.URLParam(r, "idx")
+
http.Error(w, "step index required", http.StatusBadRequest)
+
idx, err := strconv.Atoi(idxStr)
+
http.Error(w, "bad step index", http.StatusBadRequest)
+
s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error {
+
return s.streamLogFromDisk(ctx, conn, wid, idx)
+
func (s *Spindle) handleLogStream(w http.ResponseWriter, r *http.Request, streamFn func(ctx context.Context, conn *websocket.Conn) error) {
+
l := s.l.With("handler", "Logs")
conn, err := upgrader.Upgrade(w, r, nil)
···
+
if err := streamFn(ctx, conn); err != nil {
+
l.Error("log stream failed", "err", err)
l.Debug("logs connection closed")
···
+
func (s *Spindle) streamLogFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId, stepIdx int) error {
+
streams := []string{"stdout", "stderr"}
+
for _, stream := range streams {
+
data, err := engine.ReadStepLog(s.cfg.Pipelines.LogDir, wid.String(), stream, stepIdx)
+
// log but continue to next stream
+
s.l.Error("failed to read step log", "stream", stream, "step", stepIdx, "wid", wid.String(), "err", err)
+
scanner := bufio.NewScanner(strings.NewReader(data))
+
msg := map[string]string{
+
"data": scanner.Text(),
+
if err := conn.WriteJSON(msg); err != nil {
+
if err := scanner.Err(); err != nil {
+
return fmt.Errorf("error scanning %s log: %w", stream, err)
func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
events, err := s.db.GetEvents(*cursor)
···
+
func getWorkflowID(r *http.Request) (models.WorkflowId, error) {
+
knot := chi.URLParam(r, "knot")
+
rkey := chi.URLParam(r, "rkey")
+
name := chi.URLParam(r, "name")
+
if knot == "" || rkey == "" || name == "" {
+
return models.WorkflowId{}, fmt.Errorf("missing required parameters")
+
return models.WorkflowId{
+
PipelineId: models.PipelineId{