···
13
+
"tangled.sh/tangled.sh/core/spindle/engine"
"tangled.sh/tangled.sh/core/spindle/models"
"github.com/go-chi/chi/v5"
···
func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
91
-
l := s.l.With("handler", "Logs")
93
-
knot := chi.URLParam(r, "knot")
95
-
http.Error(w, "knot required", http.StatusBadRequest)
94
+
wid, err := getWorkflowID(r)
96
+
http.Error(w, err.Error(), http.StatusBadRequest)
99
-
rkey := chi.URLParam(r, "rkey")
101
-
http.Error(w, "rkey required", http.StatusBadRequest)
100
+
s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error {
101
+
return s.streamLogs(ctx, conn, wid)
105
+
func (s *Spindle) StepLogs(w http.ResponseWriter, r *http.Request) {
106
+
wid, err := getWorkflowID(r)
108
+
http.Error(w, err.Error(), http.StatusBadRequest)
105
-
name := chi.URLParam(r, "name")
107
-
http.Error(w, "name required", http.StatusBadRequest)
112
+
idxStr := chi.URLParam(r, "idx")
114
+
http.Error(w, "step index required", http.StatusBadRequest)
111
-
wid := models.WorkflowId{
112
-
PipelineId: models.PipelineId{
117
+
idx, err := strconv.Atoi(idxStr)
119
+
http.Error(w, "bad step index", http.StatusBadRequest)
119
-
l = l.With("knot", knot, "rkey", rkey, "name", name)
123
+
s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error {
124
+
return s.streamLogFromDisk(ctx, conn, wid, idx)
128
+
func (s *Spindle) handleLogStream(w http.ResponseWriter, r *http.Request, streamFn func(ctx context.Context, conn *websocket.Conn) error) {
129
+
l := s.l.With("handler", "Logs")
conn, err := upgrader.Upgrade(w, r, nil)
···
143
-
if err := s.streamLogs(ctx, conn, wid); err != nil {
144
-
l.Error("streamLogs failed", "err", err)
153
+
if err := streamFn(ctx, conn); err != nil {
154
+
l.Error("log stream failed", "err", err)
l.Debug("logs connection closed")
···
219
+
func (s *Spindle) streamLogFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId, stepIdx int) error {
220
+
streams := []string{"stdout", "stderr"}
222
+
for _, stream := range streams {
223
+
data, err := engine.ReadStepLog(s.cfg.Pipelines.LogDir, wid.String(), stream, stepIdx)
225
+
// log but continue to next stream
226
+
s.l.Error("failed to read step log", "stream", stream, "step", stepIdx, "wid", wid.String(), "err", err)
230
+
scanner := bufio.NewScanner(strings.NewReader(data))
231
+
for scanner.Scan() {
236
+
msg := map[string]string{
238
+
"data": scanner.Text(),
240
+
if err := conn.WriteJSON(msg); err != nil {
246
+
if err := scanner.Err(); err != nil {
247
+
return fmt.Errorf("error scanning %s log: %w", stream, err)
func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
events, err := s.db.GetEvents(*cursor)
···
291
+
func getWorkflowID(r *http.Request) (models.WorkflowId, error) {
292
+
knot := chi.URLParam(r, "knot")
293
+
rkey := chi.URLParam(r, "rkey")
294
+
name := chi.URLParam(r, "name")
296
+
if knot == "" || rkey == "" || name == "" {
297
+
return models.WorkflowId{}, fmt.Errorf("missing required parameters")
300
+
return models.WorkflowId{
301
+
PipelineId: models.PipelineId{