forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle 2 3import ( 4 "context" 5 "fmt" 6 "net/http" 7 "strconv" 8 "time" 9 10 "tangled.sh/tangled.sh/core/spindle/models" 11 12 "github.com/go-chi/chi/v5" 13 "github.com/gorilla/websocket" 14) 15 16var upgrader = websocket.Upgrader{ 17 ReadBufferSize: 1024, 18 WriteBufferSize: 1024, 19} 20 21func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 22 l := s.l.With("handler", "Events") 23 l.Debug("received new connection") 24 25 conn, err := upgrader.Upgrade(w, r, nil) 26 if err != nil { 27 l.Error("websocket upgrade failed", "err", err) 28 w.WriteHeader(http.StatusInternalServerError) 29 return 30 } 31 defer conn.Close() 32 l.Debug("upgraded http to wss") 33 34 ch := s.n.Subscribe() 35 defer s.n.Unsubscribe(ch) 36 37 ctx, cancel := context.WithCancel(r.Context()) 38 defer cancel() 39 go func() { 40 for { 41 if _, _, err := conn.NextReader(); err != nil { 42 l.Error("failed to read", "err", err) 43 cancel() 44 return 45 } 46 } 47 }() 48 49 defaultCursor := time.Now().UnixNano() 50 cursorStr := r.URL.Query().Get("cursor") 51 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 52 if err != nil { 53 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 54 } 55 if cursor == 0 { 56 cursor = defaultCursor 57 } 58 59 // complete backfill first before going to live data 60 l.Debug("going through backfill", "cursor", cursor) 61 if err := s.streamPipelines(conn, &cursor); err != nil { 62 l.Error("failed to backfill", "err", err) 63 return 64 } 65 66 for { 67 // wait for new data or timeout 68 select { 69 case <-ctx.Done(): 70 l.Debug("stopping stream: client closed connection") 71 return 72 case <-ch: 73 // we have been notified of new data 74 l.Debug("going through live data", "cursor", cursor) 75 if err := s.streamPipelines(conn, &cursor); err != nil { 76 l.Error("failed to stream", "err", err) 77 return 78 } 79 case <-time.After(30 * time.Second): 80 // send a keep-alive 81 l.Debug("sent keepalive") 82 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 83 l.Error("failed to write control", "err", err) 84 } 85 } 86 } 87} 88 89func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 90 l := s.l.With("handler", "Logs") 91 92 knot := chi.URLParam(r, "knot") 93 if knot == "" { 94 http.Error(w, "knot required", http.StatusBadRequest) 95 return 96 } 97 98 rkey := chi.URLParam(r, "rkey") 99 if rkey == "" { 100 http.Error(w, "rkey required", http.StatusBadRequest) 101 return 102 } 103 104 name := chi.URLParam(r, "name") 105 if name == "" { 106 http.Error(w, "name required", http.StatusBadRequest) 107 return 108 } 109 110 wid := models.WorkflowId{ 111 PipelineId: models.PipelineId{ 112 Knot: knot, 113 Rkey: rkey, 114 }, 115 Name: name, 116 } 117 118 l = l.With("knot", knot, "rkey", rkey, "name", name) 119 120 conn, err := upgrader.Upgrade(w, r, nil) 121 if err != nil { 122 l.Error("websocket upgrade failed", "err", err) 123 http.Error(w, "failed to upgrade", http.StatusInternalServerError) 124 return 125 } 126 defer conn.Close() 127 l.Debug("upgraded http to wss") 128 129 ctx, cancel := context.WithCancel(r.Context()) 130 defer cancel() 131 132 go func() { 133 for { 134 if _, _, err := conn.NextReader(); err != nil { 135 l.Debug("client disconnected", "err", err) 136 cancel() 137 return 138 } 139 } 140 }() 141 142 if err := s.streamLogs(ctx, conn, wid); err != nil { 143 l.Error("streamLogs failed", "err", err) 144 } 145 l.Debug("logs connection closed") 146} 147 148func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error { 149 l := s.l.With("workflow_id", wid.String()) 150 151 stdoutCh, stderrCh, ok := s.eng.LogChannels(wid) 152 if !ok { 153 return fmt.Errorf("workflow_id %q not found", wid.String()) 154 } 155 156 done := make(chan struct{}) 157 158 go func() { 159 for { 160 select { 161 case line, ok := <-stdoutCh: 162 if !ok { 163 done <- struct{}{} 164 return 165 } 166 msg := map[string]string{"type": "stdout", "data": line} 167 if err := conn.WriteJSON(msg); err != nil { 168 l.Error("write stdout failed", "err", err) 169 done <- struct{}{} 170 return 171 } 172 case <-ctx.Done(): 173 done <- struct{}{} 174 return 175 } 176 } 177 }() 178 179 go func() { 180 for { 181 select { 182 case line, ok := <-stderrCh: 183 if !ok { 184 done <- struct{}{} 185 return 186 } 187 msg := map[string]string{"type": "stderr", "data": line} 188 if err := conn.WriteJSON(msg); err != nil { 189 l.Error("write stderr failed", "err", err) 190 done <- struct{}{} 191 return 192 } 193 case <-ctx.Done(): 194 done <- struct{}{} 195 return 196 } 197 } 198 }() 199 200 select { 201 case <-done: 202 case <-ctx.Done(): 203 } 204 205 return nil 206} 207 208func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 209 ops, err := s.db.GetEvents(*cursor) 210 if err != nil { 211 s.l.Debug("err", "err", err) 212 return err 213 } 214 s.l.Debug("ops", "ops", ops) 215 216 for _, op := range ops { 217 if err := conn.WriteJSON(op); err != nil { 218 s.l.Debug("err", "err", err) 219 return err 220 } 221 *cursor = op.Created 222 } 223 224 return nil 225}