forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "net/http" 8 "strconv" 9 "time" 10 11 "tangled.sh/tangled.sh/core/spindle/models" 12 13 "github.com/go-chi/chi/v5" 14 "github.com/gorilla/websocket" 15) 16 17var upgrader = websocket.Upgrader{ 18 ReadBufferSize: 1024, 19 WriteBufferSize: 1024, 20} 21 22func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 23 l := s.l.With("handler", "Events") 24 l.Debug("received new connection") 25 26 conn, err := upgrader.Upgrade(w, r, nil) 27 if err != nil { 28 l.Error("websocket upgrade failed", "err", err) 29 w.WriteHeader(http.StatusInternalServerError) 30 return 31 } 32 defer conn.Close() 33 l.Debug("upgraded http to wss") 34 35 ch := s.n.Subscribe() 36 defer s.n.Unsubscribe(ch) 37 38 ctx, cancel := context.WithCancel(r.Context()) 39 defer cancel() 40 go func() { 41 for { 42 if _, _, err := conn.NextReader(); err != nil { 43 l.Error("failed to read", "err", err) 44 cancel() 45 return 46 } 47 } 48 }() 49 50 defaultCursor := time.Now().UnixNano() 51 cursorStr := r.URL.Query().Get("cursor") 52 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 53 if err != nil { 54 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 55 } 56 if cursor == 0 { 57 cursor = defaultCursor 58 } 59 60 // complete backfill first before going to live data 61 l.Debug("going through backfill", "cursor", cursor) 62 if err := s.streamPipelines(conn, &cursor); err != nil { 63 l.Error("failed to backfill", "err", err) 64 return 65 } 66 67 for { 68 // wait for new data or timeout 69 select { 70 case <-ctx.Done(): 71 l.Debug("stopping stream: client closed connection") 72 return 73 case <-ch: 74 // we have been notified of new data 75 l.Debug("going through live data", "cursor", cursor) 76 if err := s.streamPipelines(conn, &cursor); err != nil { 77 l.Error("failed to stream", "err", err) 78 return 79 } 80 case <-time.After(30 * time.Second): 81 // send a keep-alive 82 l.Debug("sent keepalive") 83 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 84 l.Error("failed to write control", "err", err) 85 } 86 } 87 } 88} 89 90func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 91 l := s.l.With("handler", "Logs") 92 93 knot := chi.URLParam(r, "knot") 94 if knot == "" { 95 http.Error(w, "knot required", http.StatusBadRequest) 96 return 97 } 98 99 rkey := chi.URLParam(r, "rkey") 100 if rkey == "" { 101 http.Error(w, "rkey required", http.StatusBadRequest) 102 return 103 } 104 105 name := chi.URLParam(r, "name") 106 if name == "" { 107 http.Error(w, "name required", http.StatusBadRequest) 108 return 109 } 110 111 wid := models.WorkflowId{ 112 PipelineId: models.PipelineId{ 113 Knot: knot, 114 Rkey: rkey, 115 }, 116 Name: name, 117 } 118 119 l = l.With("knot", knot, "rkey", rkey, "name", name) 120 121 conn, err := upgrader.Upgrade(w, r, nil) 122 if err != nil { 123 l.Error("websocket upgrade failed", "err", err) 124 http.Error(w, "failed to upgrade", http.StatusInternalServerError) 125 return 126 } 127 defer conn.Close() 128 l.Debug("upgraded http to wss") 129 130 ctx, cancel := context.WithCancel(r.Context()) 131 defer cancel() 132 133 go func() { 134 for { 135 if _, _, err := conn.NextReader(); err != nil { 136 l.Debug("client disconnected", "err", err) 137 cancel() 138 return 139 } 140 } 141 }() 142 143 if err := s.streamLogs(ctx, conn, wid); err != nil { 144 l.Error("streamLogs failed", "err", err) 145 } 146 l.Debug("logs connection closed") 147} 148 149func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error { 150 l := s.l.With("workflow_id", wid.String()) 151 152 stdoutCh, stderrCh, ok := s.eng.LogChannels(wid) 153 if !ok { 154 return fmt.Errorf("workflow_id %q not found", wid.String()) 155 } 156 157 done := make(chan struct{}) 158 159 go func() { 160 for { 161 select { 162 case line, ok := <-stdoutCh: 163 if !ok { 164 done <- struct{}{} 165 return 166 } 167 msg := map[string]string{"type": "stdout", "data": line} 168 if err := conn.WriteJSON(msg); err != nil { 169 l.Error("write stdout failed", "err", err) 170 done <- struct{}{} 171 return 172 } 173 case <-ctx.Done(): 174 done <- struct{}{} 175 return 176 } 177 } 178 }() 179 180 go func() { 181 for { 182 select { 183 case line, ok := <-stderrCh: 184 if !ok { 185 done <- struct{}{} 186 return 187 } 188 msg := map[string]string{"type": "stderr", "data": line} 189 if err := conn.WriteJSON(msg); err != nil { 190 l.Error("write stderr failed", "err", err) 191 done <- struct{}{} 192 return 193 } 194 case <-ctx.Done(): 195 done <- struct{}{} 196 return 197 } 198 } 199 }() 200 201 select { 202 case <-done: 203 case <-ctx.Done(): 204 } 205 206 return nil 207} 208 209func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 210 events, err := s.db.GetEvents(*cursor) 211 if err != nil { 212 s.l.Debug("err", "err", err) 213 return err 214 } 215 s.l.Debug("ops", "ops", events) 216 217 for _, event := range events { 218 // first extract the inner json into a map 219 var eventJson map[string]any 220 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 221 if err != nil { 222 s.l.Error("failed to unmarshal event", "err", err) 223 return err 224 } 225 226 jsonMsg, err := json.Marshal(map[string]any{ 227 "rkey": event.Rkey, 228 "nsid": event.Nsid, 229 "event": eventJson, 230 }) 231 if err != nil { 232 s.l.Error("failed to marshal record", "err", err) 233 return err 234 } 235 236 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 237 s.l.Debug("err", "err", err) 238 return err 239 } 240 *cursor = event.Created 241 } 242 243 return nil 244}