forked from tangled.org/core
this repo has no description
at master 6.2 kB view raw
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "os" 10 "strconv" 11 "time" 12 13 "tangled.sh/tangled.sh/core/spindle/models" 14 15 "github.com/go-chi/chi/v5" 16 "github.com/gorilla/websocket" 17 "github.com/hpcloud/tail" 18) 19 20var upgrader = websocket.Upgrader{ 21 ReadBufferSize: 1024, 22 WriteBufferSize: 1024, 23} 24 25func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 26 l := s.l.With("handler", "Events") 27 l.Debug("received new connection") 28 29 conn, err := upgrader.Upgrade(w, r, nil) 30 if err != nil { 31 l.Error("websocket upgrade failed", "err", err) 32 w.WriteHeader(http.StatusInternalServerError) 33 return 34 } 35 defer conn.Close() 36 l.Debug("upgraded http to wss") 37 38 ch := s.n.Subscribe() 39 defer s.n.Unsubscribe(ch) 40 41 ctx, cancel := context.WithCancel(r.Context()) 42 defer cancel() 43 go func() { 44 for { 45 if _, _, err := conn.NextReader(); err != nil { 46 l.Error("failed to read", "err", err) 47 cancel() 48 return 49 } 50 } 51 }() 52 53 defaultCursor := time.Now().UnixNano() 54 cursorStr := r.URL.Query().Get("cursor") 55 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 56 if err != nil { 57 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 58 } 59 if cursor == 0 { 60 cursor = defaultCursor 61 } 62 63 // complete backfill first before going to live data 64 l.Debug("going through backfill", "cursor", cursor) 65 if err := s.streamPipelines(conn, &cursor); err != nil { 66 l.Error("failed to backfill", "err", err) 67 return 68 } 69 70 for { 71 // wait for new data or timeout 72 select { 73 case <-ctx.Done(): 74 l.Debug("stopping stream: client closed connection") 75 return 76 case <-ch: 77 // we have been notified of new data 78 l.Debug("going through live data", "cursor", cursor) 79 if err := s.streamPipelines(conn, &cursor); err != nil { 80 l.Error("failed to stream", "err", err) 81 return 82 } 83 case <-time.After(30 * time.Second): 84 // send a keep-alive 85 l.Debug("sent keepalive") 86 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 87 l.Error("failed to write control", "err", err) 88 } 89 } 90 } 91} 92 93func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 94 wid, err := getWorkflowID(r) 95 if err != nil { 96 http.Error(w, err.Error(), http.StatusBadRequest) 97 return 98 } 99 100 l := s.l.With("handler", "Logs") 101 l = s.l.With("wid", wid) 102 103 conn, err := upgrader.Upgrade(w, r, nil) 104 if err != nil { 105 l.Error("websocket upgrade failed", "err", err) 106 http.Error(w, "failed to upgrade", http.StatusInternalServerError) 107 return 108 } 109 defer func() { 110 _ = conn.WriteControl( 111 websocket.CloseMessage, 112 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 113 time.Now().Add(time.Second), 114 ) 115 conn.Close() 116 }() 117 l.Debug("upgraded http to wss") 118 119 ctx, cancel := context.WithCancel(r.Context()) 120 defer cancel() 121 122 go func() { 123 for { 124 if _, _, err := conn.NextReader(); err != nil { 125 l.Debug("client disconnected", "err", err) 126 cancel() 127 return 128 } 129 } 130 }() 131 132 if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil { 133 l.Info("log stream ended", "err", err) 134 } 135 136 l.Info("logs connection closed") 137} 138 139func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error { 140 status, err := s.db.GetStatus(wid) 141 if err != nil { 142 return err 143 } 144 isFinished := models.StatusKind(status.Status).IsFinish() 145 146 filePath := models.LogFilePath(s.cfg.Server.LogDir, wid) 147 148 if status.Status == models.StatusKindFailed.String() && status.Error != nil { 149 if _, err := os.Stat(filePath); os.IsNotExist(err) { 150 msgs := []models.LogLine{ 151 { 152 Kind: models.LogKindControl, 153 Content: "", 154 StepId: 0, 155 StepKind: models.StepKindUser, 156 }, 157 { 158 Kind: models.LogKindData, 159 Content: *status.Error, 160 }, 161 } 162 163 for _, msg := range msgs { 164 b, err := json.Marshal(msg) 165 if err != nil { 166 return err 167 } 168 169 if err := conn.WriteMessage(websocket.TextMessage, b); err != nil { 170 return fmt.Errorf("failed to write to websocket: %w", err) 171 } 172 } 173 174 return nil 175 } 176 } 177 178 config := tail.Config{ 179 Follow: !isFinished, 180 ReOpen: !isFinished, 181 MustExist: false, 182 Location: &tail.SeekInfo{ 183 Offset: 0, 184 Whence: io.SeekStart, 185 }, 186 // Logger: tail.DiscardingLogger, 187 } 188 189 t, err := tail.TailFile(filePath, config) 190 if err != nil { 191 return fmt.Errorf("failed to tail log file: %w", err) 192 } 193 defer t.Stop() 194 195 for { 196 select { 197 case <-ctx.Done(): 198 return ctx.Err() 199 case line := <-t.Lines: 200 if line == nil && isFinished { 201 return fmt.Errorf("tail completed") 202 } 203 204 if line == nil { 205 return fmt.Errorf("tail channel closed unexpectedly") 206 } 207 208 if line.Err != nil { 209 return fmt.Errorf("error tailing log file: %w", line.Err) 210 } 211 212 if err := conn.WriteMessage(websocket.TextMessage, []byte(line.Text)); err != nil { 213 return fmt.Errorf("failed to write to websocket: %w", err) 214 } 215 } 216 } 217} 218 219func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 220 events, err := s.db.GetEvents(*cursor) 221 if err != nil { 222 s.l.Debug("err", "err", err) 223 return err 224 } 225 s.l.Debug("ops", "ops", events) 226 227 for _, event := range events { 228 // first extract the inner json into a map 229 var eventJson map[string]any 230 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 231 if err != nil { 232 s.l.Error("failed to unmarshal event", "err", err) 233 return err 234 } 235 236 jsonMsg, err := json.Marshal(map[string]any{ 237 "rkey": event.Rkey, 238 "nsid": event.Nsid, 239 "event": eventJson, 240 }) 241 if err != nil { 242 s.l.Error("failed to marshal record", "err", err) 243 return err 244 } 245 246 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 247 s.l.Debug("err", "err", err) 248 return err 249 } 250 *cursor = event.Created 251 } 252 253 return nil 254} 255 256func getWorkflowID(r *http.Request) (models.WorkflowId, error) { 257 knot := chi.URLParam(r, "knot") 258 rkey := chi.URLParam(r, "rkey") 259 name := chi.URLParam(r, "name") 260 261 if knot == "" || rkey == "" || name == "" { 262 return models.WorkflowId{}, fmt.Errorf("missing required parameters") 263 } 264 265 return models.WorkflowId{ 266 PipelineId: models.PipelineId{ 267 Knot: knot, 268 Rkey: rkey, 269 }, 270 Name: name, 271 }, nil 272}