forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle 2 3import ( 4 "bufio" 5 "context" 6 "encoding/json" 7 "fmt" 8 "net/http" 9 "strconv" 10 "strings" 11 "time" 12 13 "tangled.sh/tangled.sh/core/spindle/engine" 14 "tangled.sh/tangled.sh/core/spindle/models" 15 16 "github.com/go-chi/chi/v5" 17 "github.com/gorilla/websocket" 18) 19 20var upgrader = websocket.Upgrader{ 21 ReadBufferSize: 1024, 22 WriteBufferSize: 1024, 23} 24 25func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 26 l := s.l.With("handler", "Events") 27 l.Debug("received new connection") 28 29 conn, err := upgrader.Upgrade(w, r, nil) 30 if err != nil { 31 l.Error("websocket upgrade failed", "err", err) 32 w.WriteHeader(http.StatusInternalServerError) 33 return 34 } 35 defer conn.Close() 36 l.Debug("upgraded http to wss") 37 38 ch := s.n.Subscribe() 39 defer s.n.Unsubscribe(ch) 40 41 ctx, cancel := context.WithCancel(r.Context()) 42 defer cancel() 43 go func() { 44 for { 45 if _, _, err := conn.NextReader(); err != nil { 46 l.Error("failed to read", "err", err) 47 cancel() 48 return 49 } 50 } 51 }() 52 53 defaultCursor := time.Now().UnixNano() 54 cursorStr := r.URL.Query().Get("cursor") 55 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 56 if err != nil { 57 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 58 } 59 if cursor == 0 { 60 cursor = defaultCursor 61 } 62 63 // complete backfill first before going to live data 64 l.Debug("going through backfill", "cursor", cursor) 65 if err := s.streamPipelines(conn, &cursor); err != nil { 66 l.Error("failed to backfill", "err", err) 67 return 68 } 69 70 for { 71 // wait for new data or timeout 72 select { 73 case <-ctx.Done(): 74 l.Debug("stopping stream: client closed connection") 75 return 76 case <-ch: 77 // we have been notified of new data 78 l.Debug("going through live data", "cursor", cursor) 79 if err := s.streamPipelines(conn, &cursor); err != nil { 80 l.Error("failed to stream", "err", err) 81 return 82 } 83 case <-time.After(30 * time.Second): 84 // send a keep-alive 85 l.Debug("sent keepalive") 86 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 87 l.Error("failed to write control", "err", err) 88 } 89 } 90 } 91} 92 93func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 94 wid, err := getWorkflowID(r) 95 if err != nil { 96 http.Error(w, err.Error(), http.StatusBadRequest) 97 return 98 } 99 100 s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error { 101 return s.streamLogs(ctx, conn, wid) 102 }) 103} 104 105func (s *Spindle) StepLogs(w http.ResponseWriter, r *http.Request) { 106 wid, err := getWorkflowID(r) 107 if err != nil { 108 http.Error(w, err.Error(), http.StatusBadRequest) 109 return 110 } 111 112 idxStr := chi.URLParam(r, "idx") 113 if idxStr == "" { 114 http.Error(w, "step index required", http.StatusBadRequest) 115 return 116 } 117 idx, err := strconv.Atoi(idxStr) 118 if err != nil { 119 http.Error(w, "bad step index", http.StatusBadRequest) 120 return 121 } 122 123 s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error { 124 return s.streamLogFromDisk(ctx, conn, wid, idx) 125 }) 126} 127 128func (s *Spindle) handleLogStream(w http.ResponseWriter, r *http.Request, streamFn func(ctx context.Context, conn *websocket.Conn) error) { 129 l := s.l.With("handler", "Logs") 130 131 conn, err := upgrader.Upgrade(w, r, nil) 132 if err != nil { 133 l.Error("websocket upgrade failed", "err", err) 134 http.Error(w, "failed to upgrade", http.StatusInternalServerError) 135 return 136 } 137 defer conn.Close() 138 l.Debug("upgraded http to wss") 139 140 ctx, cancel := context.WithCancel(r.Context()) 141 defer cancel() 142 143 go func() { 144 for { 145 if _, _, err := conn.NextReader(); err != nil { 146 l.Debug("client disconnected", "err", err) 147 cancel() 148 return 149 } 150 } 151 }() 152 153 if err := streamFn(ctx, conn); err != nil { 154 l.Error("log stream failed", "err", err) 155 } 156 l.Debug("logs connection closed") 157} 158 159func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error { 160 l := s.l.With("workflow_id", wid.String()) 161 162 stdoutCh, stderrCh, ok := s.eng.LogChannels(wid) 163 if !ok { 164 return fmt.Errorf("workflow_id %q not found", wid.String()) 165 } 166 167 done := make(chan struct{}) 168 169 go func() { 170 for { 171 select { 172 case line, ok := <-stdoutCh: 173 if !ok { 174 done <- struct{}{} 175 return 176 } 177 msg := map[string]string{"type": "stdout", "data": line} 178 if err := conn.WriteJSON(msg); err != nil { 179 l.Error("write stdout failed", "err", err) 180 done <- struct{}{} 181 return 182 } 183 case <-ctx.Done(): 184 done <- struct{}{} 185 return 186 } 187 } 188 }() 189 190 go func() { 191 for { 192 select { 193 case line, ok := <-stderrCh: 194 if !ok { 195 done <- struct{}{} 196 return 197 } 198 msg := map[string]string{"type": "stderr", "data": line} 199 if err := conn.WriteJSON(msg); err != nil { 200 l.Error("write stderr failed", "err", err) 201 done <- struct{}{} 202 return 203 } 204 case <-ctx.Done(): 205 done <- struct{}{} 206 return 207 } 208 } 209 }() 210 211 select { 212 case <-done: 213 case <-ctx.Done(): 214 } 215 216 return nil 217} 218 219func (s *Spindle) streamLogFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId, stepIdx int) error { 220 streams := []string{"stdout", "stderr"} 221 222 for _, stream := range streams { 223 data, err := engine.ReadStepLog(s.cfg.Pipelines.LogDir, wid.String(), stream, stepIdx) 224 if err != nil { 225 // log but continue to next stream 226 s.l.Error("failed to read step log", "stream", stream, "step", stepIdx, "wid", wid.String(), "err", err) 227 continue 228 } 229 230 scanner := bufio.NewScanner(strings.NewReader(data)) 231 for scanner.Scan() { 232 select { 233 case <-ctx.Done(): 234 return ctx.Err() 235 default: 236 msg := map[string]string{ 237 "type": stream, 238 "data": scanner.Text(), 239 } 240 if err := conn.WriteJSON(msg); err != nil { 241 return err 242 } 243 } 244 } 245 246 if err := scanner.Err(); err != nil { 247 return fmt.Errorf("error scanning %s log: %w", stream, err) 248 } 249 } 250 251 return nil 252} 253 254func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 255 events, err := s.db.GetEvents(*cursor) 256 if err != nil { 257 s.l.Debug("err", "err", err) 258 return err 259 } 260 s.l.Debug("ops", "ops", events) 261 262 for _, event := range events { 263 // first extract the inner json into a map 264 var eventJson map[string]any 265 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 266 if err != nil { 267 s.l.Error("failed to unmarshal event", "err", err) 268 return err 269 } 270 271 jsonMsg, err := json.Marshal(map[string]any{ 272 "rkey": event.Rkey, 273 "nsid": event.Nsid, 274 "event": eventJson, 275 }) 276 if err != nil { 277 s.l.Error("failed to marshal record", "err", err) 278 return err 279 } 280 281 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 282 s.l.Debug("err", "err", err) 283 return err 284 } 285 *cursor = event.Created 286 } 287 288 return nil 289} 290 291func getWorkflowID(r *http.Request) (models.WorkflowId, error) { 292 knot := chi.URLParam(r, "knot") 293 rkey := chi.URLParam(r, "rkey") 294 name := chi.URLParam(r, "name") 295 296 if knot == "" || rkey == "" || name == "" { 297 return models.WorkflowId{}, fmt.Errorf("missing required parameters") 298 } 299 300 return models.WorkflowId{ 301 PipelineId: models.PipelineId{ 302 Knot: knot, 303 Rkey: rkey, 304 }, 305 Name: name, 306 }, nil 307}