forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "strconv" 10 "time" 11 12 "tangled.sh/tangled.sh/core/spindle/engine" 13 "tangled.sh/tangled.sh/core/spindle/models" 14 15 "github.com/go-chi/chi/v5" 16 "github.com/gorilla/websocket" 17 "github.com/hpcloud/tail" 18) 19 20var upgrader = websocket.Upgrader{ 21 ReadBufferSize: 1024, 22 WriteBufferSize: 1024, 23} 24 25func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 26 l := s.l.With("handler", "Events") 27 l.Debug("received new connection") 28 29 conn, err := upgrader.Upgrade(w, r, nil) 30 if err != nil { 31 l.Error("websocket upgrade failed", "err", err) 32 w.WriteHeader(http.StatusInternalServerError) 33 return 34 } 35 defer conn.Close() 36 l.Debug("upgraded http to wss") 37 38 ch := s.n.Subscribe() 39 defer s.n.Unsubscribe(ch) 40 41 ctx, cancel := context.WithCancel(r.Context()) 42 defer cancel() 43 go func() { 44 for { 45 if _, _, err := conn.NextReader(); err != nil { 46 l.Error("failed to read", "err", err) 47 cancel() 48 return 49 } 50 } 51 }() 52 53 defaultCursor := time.Now().UnixNano() 54 cursorStr := r.URL.Query().Get("cursor") 55 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 56 if err != nil { 57 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 58 } 59 if cursor == 0 { 60 cursor = defaultCursor 61 } 62 63 // complete backfill first before going to live data 64 l.Debug("going through backfill", "cursor", cursor) 65 if err := s.streamPipelines(conn, &cursor); err != nil { 66 l.Error("failed to backfill", "err", err) 67 return 68 } 69 70 for { 71 // wait for new data or timeout 72 select { 73 case <-ctx.Done(): 74 l.Debug("stopping stream: client closed connection") 75 return 76 case <-ch: 77 // we have been notified of new data 78 l.Debug("going through live data", "cursor", cursor) 79 if err := s.streamPipelines(conn, &cursor); err != nil { 80 l.Error("failed to stream", "err", err) 81 return 82 } 83 case <-time.After(30 * time.Second): 84 // send a keep-alive 85 l.Debug("sent keepalive") 86 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 87 l.Error("failed to write control", "err", err) 88 } 89 } 90 } 91} 92 93func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 94 wid, err := getWorkflowID(r) 95 if err != nil { 96 http.Error(w, err.Error(), http.StatusBadRequest) 97 return 98 } 99 100 l := s.l.With("handler", "Logs") 101 l = s.l.With("wid", wid) 102 103 conn, err := upgrader.Upgrade(w, r, nil) 104 if err != nil { 105 l.Error("websocket upgrade failed", "err", err) 106 http.Error(w, "failed to upgrade", http.StatusInternalServerError) 107 return 108 } 109 defer func() { 110 _ = conn.WriteControl( 111 websocket.CloseMessage, 112 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 113 time.Now().Add(time.Second), 114 ) 115 conn.Close() 116 }() 117 l.Debug("upgraded http to wss") 118 119 ctx, cancel := context.WithCancel(r.Context()) 120 defer cancel() 121 122 go func() { 123 for { 124 if _, _, err := conn.NextReader(); err != nil { 125 l.Debug("client disconnected", "err", err) 126 cancel() 127 return 128 } 129 } 130 }() 131 132 if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil { 133 l.Info("log stream ended", "err", err) 134 } 135 136 l.Info("logs connection closed") 137} 138 139func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error { 140 status, err := s.db.GetStatus(wid) 141 if err != nil { 142 return err 143 } 144 isFinished := models.StatusKind(status.Status).IsFinish() 145 146 filePath := engine.LogFilePath(s.cfg.Pipelines.LogDir, wid) 147 148 config := tail.Config{ 149 Follow: !isFinished, 150 ReOpen: !isFinished, 151 MustExist: false, 152 Location: &tail.SeekInfo{ 153 Offset: 0, 154 Whence: io.SeekStart, 155 }, 156 // Logger: tail.DiscardingLogger, 157 } 158 159 t, err := tail.TailFile(filePath, config) 160 if err != nil { 161 return fmt.Errorf("failed to tail log file: %w", err) 162 } 163 defer t.Stop() 164 165 for { 166 select { 167 case <-ctx.Done(): 168 return ctx.Err() 169 case line := <-t.Lines: 170 if line == nil && isFinished { 171 return fmt.Errorf("tail completed") 172 } 173 174 if line == nil { 175 return fmt.Errorf("tail channel closed unexpectedly") 176 } 177 178 if line.Err != nil { 179 return fmt.Errorf("error tailing log file: %w", line.Err) 180 } 181 182 if err := conn.WriteMessage(websocket.TextMessage, []byte(line.Text)); err != nil { 183 return fmt.Errorf("failed to write to websocket: %w", err) 184 } 185 } 186 } 187} 188 189func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 190 events, err := s.db.GetEvents(*cursor) 191 if err != nil { 192 s.l.Debug("err", "err", err) 193 return err 194 } 195 s.l.Debug("ops", "ops", events) 196 197 for _, event := range events { 198 // first extract the inner json into a map 199 var eventJson map[string]any 200 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 201 if err != nil { 202 s.l.Error("failed to unmarshal event", "err", err) 203 return err 204 } 205 206 jsonMsg, err := json.Marshal(map[string]any{ 207 "rkey": event.Rkey, 208 "nsid": event.Nsid, 209 "event": eventJson, 210 }) 211 if err != nil { 212 s.l.Error("failed to marshal record", "err", err) 213 return err 214 } 215 216 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 217 s.l.Debug("err", "err", err) 218 return err 219 } 220 *cursor = event.Created 221 } 222 223 return nil 224} 225 226func getWorkflowID(r *http.Request) (models.WorkflowId, error) { 227 knot := chi.URLParam(r, "knot") 228 rkey := chi.URLParam(r, "rkey") 229 name := chi.URLParam(r, "name") 230 231 if knot == "" || rkey == "" || name == "" { 232 return models.WorkflowId{}, fmt.Errorf("missing required parameters") 233 } 234 235 return models.WorkflowId{ 236 PipelineId: models.PipelineId{ 237 Knot: knot, 238 Rkey: rkey, 239 }, 240 Name: name, 241 }, nil 242}