forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 6.4 kB view raw
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "os" 10 "strconv" 11 "time" 12 13 "tangled.org/core/log" 14 "tangled.org/core/spindle/models" 15 16 "github.com/go-chi/chi/v5" 17 "github.com/gorilla/websocket" 18 "github.com/hpcloud/tail" 19) 20 21var upgrader = websocket.Upgrader{ 22 ReadBufferSize: 1024, 23 WriteBufferSize: 1024, 24} 25 26func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 27 l := log.SubLogger(s.l, "eventstream") 28 29 l.Debug("received new connection") 30 31 conn, err := upgrader.Upgrade(w, r, nil) 32 if err != nil { 33 l.Error("websocket upgrade failed", "err", err) 34 w.WriteHeader(http.StatusInternalServerError) 35 return 36 } 37 defer conn.Close() 38 l.Debug("upgraded http to wss") 39 40 ch := s.n.Subscribe() 41 defer s.n.Unsubscribe(ch) 42 43 ctx, cancel := context.WithCancel(r.Context()) 44 defer cancel() 45 go func() { 46 for { 47 if _, _, err := conn.NextReader(); err != nil { 48 l.Error("failed to read", "err", err) 49 cancel() 50 return 51 } 52 } 53 }() 54 55 defaultCursor := time.Now().UnixNano() 56 cursorStr := r.URL.Query().Get("cursor") 57 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 58 if err != nil { 59 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 60 } 61 if cursor == 0 { 62 cursor = defaultCursor 63 } 64 65 // complete backfill first before going to live data 66 l.Debug("going through backfill", "cursor", cursor) 67 if err := s.streamPipelines(conn, &cursor); err != nil { 68 l.Error("failed to backfill", "err", err) 69 return 70 } 71 72 for { 73 // wait for new data or timeout 74 select { 75 case <-ctx.Done(): 76 l.Debug("stopping stream: client closed connection") 77 return 78 case <-ch: 79 // we have been notified of new data 80 l.Debug("going through live data", "cursor", cursor) 81 if err := s.streamPipelines(conn, &cursor); err != nil { 82 l.Error("failed to stream", "err", err) 83 return 84 } 85 case <-time.After(30 * time.Second): 86 // send a keep-alive 87 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 88 l.Error("failed to write control", "err", err) 89 } 90 } 91 } 92} 93 94func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 95 wid, err := getWorkflowID(r) 96 if err != nil { 97 http.Error(w, err.Error(), http.StatusBadRequest) 98 return 99 } 100 101 l := s.l.With("handler", "Logs") 102 l = s.l.With("wid", wid) 103 104 conn, err := upgrader.Upgrade(w, r, nil) 105 if err != nil { 106 l.Error("websocket upgrade failed", "err", err) 107 http.Error(w, "failed to upgrade", http.StatusInternalServerError) 108 return 109 } 110 defer func() { 111 _ = conn.WriteControl( 112 websocket.CloseMessage, 113 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 114 time.Now().Add(time.Second), 115 ) 116 conn.Close() 117 }() 118 l.Debug("upgraded http to wss") 119 120 ctx, cancel := context.WithCancel(r.Context()) 121 defer cancel() 122 123 go func() { 124 for { 125 if _, _, err := conn.NextReader(); err != nil { 126 l.Debug("client disconnected", "err", err) 127 cancel() 128 return 129 } 130 } 131 }() 132 133 if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil { 134 l.Info("log stream ended", "err", err) 135 } 136 137 l.Info("logs connection closed") 138} 139 140func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error { 141 status, err := s.db.GetStatus(wid) 142 if err != nil { 143 return err 144 } 145 isFinished := models.StatusKind(status.Status).IsFinish() 146 147 filePath := models.LogFilePath(s.cfg.Server.LogDir, wid) 148 149 if status.Status == models.StatusKindFailed.String() && status.Error != nil { 150 if _, err := os.Stat(filePath); os.IsNotExist(err) { 151 msgs := []models.LogLine{ 152 { 153 Kind: models.LogKindControl, 154 Content: "", 155 StepId: 0, 156 StepKind: models.StepKindUser, 157 }, 158 { 159 Kind: models.LogKindData, 160 Content: *status.Error, 161 }, 162 } 163 164 for _, msg := range msgs { 165 b, err := json.Marshal(msg) 166 if err != nil { 167 return err 168 } 169 170 if err := conn.WriteMessage(websocket.TextMessage, b); err != nil { 171 return fmt.Errorf("failed to write to websocket: %w", err) 172 } 173 } 174 175 return nil 176 } 177 } 178 179 config := tail.Config{ 180 Follow: !isFinished, 181 ReOpen: !isFinished, 182 MustExist: false, 183 Location: &tail.SeekInfo{ 184 Offset: 0, 185 Whence: io.SeekStart, 186 }, 187 // Logger: tail.DiscardingLogger, 188 } 189 190 t, err := tail.TailFile(filePath, config) 191 if err != nil { 192 return fmt.Errorf("failed to tail log file: %w", err) 193 } 194 defer t.Stop() 195 196 for { 197 select { 198 case <-ctx.Done(): 199 return ctx.Err() 200 case line := <-t.Lines: 201 if line == nil && isFinished { 202 return fmt.Errorf("tail completed") 203 } 204 205 if line == nil { 206 return fmt.Errorf("tail channel closed unexpectedly") 207 } 208 209 if line.Err != nil { 210 return fmt.Errorf("error tailing log file: %w", line.Err) 211 } 212 213 if err := conn.WriteMessage(websocket.TextMessage, []byte(line.Text)); err != nil { 214 return fmt.Errorf("failed to write to websocket: %w", err) 215 } 216 case <-time.After(30 * time.Second): 217 // send a keep-alive 218 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 219 return fmt.Errorf("failed to write control: %w", err) 220 } 221 } 222 } 223} 224 225func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 226 events, err := s.db.GetEvents(*cursor) 227 if err != nil { 228 s.l.Debug("err", "err", err) 229 return err 230 } 231 232 for _, event := range events { 233 // first extract the inner json into a map 234 var eventJson map[string]any 235 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 236 if err != nil { 237 s.l.Error("failed to unmarshal event", "err", err) 238 return err 239 } 240 241 jsonMsg, err := json.Marshal(map[string]any{ 242 "rkey": event.Rkey, 243 "nsid": event.Nsid, 244 "event": eventJson, 245 }) 246 if err != nil { 247 s.l.Error("failed to marshal record", "err", err) 248 return err 249 } 250 251 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 252 s.l.Debug("err", "err", err) 253 return err 254 } 255 *cursor = event.Created 256 } 257 258 return nil 259} 260 261func getWorkflowID(r *http.Request) (models.WorkflowId, error) { 262 knot := chi.URLParam(r, "knot") 263 rkey := chi.URLParam(r, "rkey") 264 name := chi.URLParam(r, "name") 265 266 if knot == "" || rkey == "" || name == "" { 267 return models.WorkflowId{}, fmt.Errorf("missing required parameters") 268 } 269 270 return models.WorkflowId{ 271 PipelineId: models.PipelineId{ 272 Knot: knot, 273 Rkey: rkey, 274 }, 275 Name: name, 276 }, nil 277}