forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle 2 3import ( 4 "fmt" 5 "net/http" 6 "time" 7 8 "context" 9 10 "github.com/go-chi/chi/v5" 11 "github.com/gorilla/websocket" 12) 13 14var upgrader = websocket.Upgrader{ 15 ReadBufferSize: 1024, 16 WriteBufferSize: 1024, 17} 18 19func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 20 l := s.l.With("handler", "Events") 21 l.Info("received new connection") 22 23 conn, err := upgrader.Upgrade(w, r, nil) 24 if err != nil { 25 l.Error("websocket upgrade failed", "err", err) 26 w.WriteHeader(http.StatusInternalServerError) 27 return 28 } 29 defer conn.Close() 30 l.Info("upgraded http to wss") 31 32 ch := s.n.Subscribe() 33 defer s.n.Unsubscribe(ch) 34 35 ctx, cancel := context.WithCancel(r.Context()) 36 defer cancel() 37 go func() { 38 for { 39 if _, _, err := conn.NextReader(); err != nil { 40 l.Error("failed to read", "err", err) 41 cancel() 42 return 43 } 44 } 45 }() 46 47 cursor := "" 48 49 // complete backfill first before going to live data 50 l.Info("going through backfill", "cursor", cursor) 51 if err := s.streamPipelines(conn, &cursor); err != nil { 52 l.Error("failed to backfill", "err", err) 53 return 54 } 55 56 for { 57 // wait for new data or timeout 58 select { 59 case <-ctx.Done(): 60 l.Info("stopping stream: client closed connection") 61 return 62 case <-ch: 63 // we have been notified of new data 64 l.Info("going through live data", "cursor", cursor) 65 if err := s.streamPipelines(conn, &cursor); err != nil { 66 l.Error("failed to stream", "err", err) 67 return 68 } 69 case <-time.After(30 * time.Second): 70 // send a keep-alive 71 l.Info("sent keepalive") 72 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 73 l.Error("failed to write control", "err", err) 74 } 75 } 76 } 77} 78 79func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 80 l := s.l.With("handler", "Logs") 81 82 pipelineID := chi.URLParam(r, "pipelineID") 83 if pipelineID == "" { 84 http.Error(w, "pipelineID required", http.StatusBadRequest) 85 return 86 } 87 l = l.With("pipelineID", pipelineID) 88 89 conn, err := upgrader.Upgrade(w, r, nil) 90 if err != nil { 91 l.Error("websocket upgrade failed", "err", err) 92 http.Error(w, "failed to upgrade", http.StatusInternalServerError) 93 return 94 } 95 defer conn.Close() 96 l.Info("upgraded http to wss") 97 98 ctx, cancel := context.WithCancel(r.Context()) 99 defer cancel() 100 101 go func() { 102 for { 103 if _, _, err := conn.NextReader(); err != nil { 104 l.Info("client disconnected", "err", err) 105 cancel() 106 return 107 } 108 } 109 }() 110 111 if err := s.streamLogs(ctx, conn, pipelineID); err != nil { 112 l.Error("streamLogs failed", "err", err) 113 } 114 l.Info("logs connection closed") 115} 116 117func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, pipelineID string) error { 118 l := s.l.With("pipelineID", pipelineID) 119 120 stdoutCh, stderrCh, ok := s.eng.LogChannels(pipelineID) 121 if !ok { 122 return fmt.Errorf("pipelineID %q not found", pipelineID) 123 } 124 125 done := make(chan struct{}) 126 127 go func() { 128 for { 129 select { 130 case line, ok := <-stdoutCh: 131 if !ok { 132 done <- struct{}{} 133 return 134 } 135 msg := map[string]string{"type": "stdout", "data": line} 136 if err := conn.WriteJSON(msg); err != nil { 137 l.Error("write stdout failed", "err", err) 138 done <- struct{}{} 139 return 140 } 141 case <-ctx.Done(): 142 done <- struct{}{} 143 return 144 } 145 } 146 }() 147 148 go func() { 149 for { 150 select { 151 case line, ok := <-stderrCh: 152 if !ok { 153 done <- struct{}{} 154 return 155 } 156 msg := map[string]string{"type": "stderr", "data": line} 157 if err := conn.WriteJSON(msg); err != nil { 158 l.Error("write stderr failed", "err", err) 159 done <- struct{}{} 160 return 161 } 162 case <-ctx.Done(): 163 done <- struct{}{} 164 return 165 } 166 } 167 }() 168 169 select { 170 case <-done: 171 case <-ctx.Done(): 172 } 173 174 return nil 175} 176 177func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *string) error { 178 ops, err := s.db.GetPipelineStatusAsRecords(*cursor) 179 if err != nil { 180 s.l.Debug("err", "err", err) 181 return err 182 } 183 s.l.Debug("ops", "ops", ops) 184 185 for _, op := range ops { 186 if err := conn.WriteJSON(op); err != nil { 187 s.l.Debug("err", "err", err) 188 return err 189 } 190 *cursor = op.Rkey 191 } 192 193 return nil 194}