1package spindle
2
3import (
4 "fmt"
5 "net/http"
6 "time"
7
8 "context"
9
10 "github.com/go-chi/chi/v5"
11 "github.com/gorilla/websocket"
12)
13
14var upgrader = websocket.Upgrader{
15 ReadBufferSize: 1024,
16 WriteBufferSize: 1024,
17}
18
19func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) {
20 l := s.l.With("handler", "Events")
21 l.Info("received new connection")
22
23 conn, err := upgrader.Upgrade(w, r, nil)
24 if err != nil {
25 l.Error("websocket upgrade failed", "err", err)
26 w.WriteHeader(http.StatusInternalServerError)
27 return
28 }
29 defer conn.Close()
30 l.Info("upgraded http to wss")
31
32 ch := s.n.Subscribe()
33 defer s.n.Unsubscribe(ch)
34
35 ctx, cancel := context.WithCancel(r.Context())
36 defer cancel()
37 go func() {
38 for {
39 if _, _, err := conn.NextReader(); err != nil {
40 l.Error("failed to read", "err", err)
41 cancel()
42 return
43 }
44 }
45 }()
46
47 cursor := ""
48
49 // complete backfill first before going to live data
50 l.Info("going through backfill", "cursor", cursor)
51 if err := s.streamPipelines(conn, &cursor); err != nil {
52 l.Error("failed to backfill", "err", err)
53 return
54 }
55
56 for {
57 // wait for new data or timeout
58 select {
59 case <-ctx.Done():
60 l.Info("stopping stream: client closed connection")
61 return
62 case <-ch:
63 // we have been notified of new data
64 l.Info("going through live data", "cursor", cursor)
65 if err := s.streamPipelines(conn, &cursor); err != nil {
66 l.Error("failed to stream", "err", err)
67 return
68 }
69 case <-time.After(30 * time.Second):
70 // send a keep-alive
71 l.Info("sent keepalive")
72 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
73 l.Error("failed to write control", "err", err)
74 }
75 }
76 }
77}
78
79func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
80 l := s.l.With("handler", "Logs")
81
82 pipelineID := chi.URLParam(r, "pipelineID")
83 if pipelineID == "" {
84 http.Error(w, "pipelineID required", http.StatusBadRequest)
85 return
86 }
87 l = l.With("pipelineID", pipelineID)
88
89 conn, err := upgrader.Upgrade(w, r, nil)
90 if err != nil {
91 l.Error("websocket upgrade failed", "err", err)
92 http.Error(w, "failed to upgrade", http.StatusInternalServerError)
93 return
94 }
95 defer conn.Close()
96 l.Info("upgraded http to wss")
97
98 ctx, cancel := context.WithCancel(r.Context())
99 defer cancel()
100
101 go func() {
102 for {
103 if _, _, err := conn.NextReader(); err != nil {
104 l.Info("client disconnected", "err", err)
105 cancel()
106 return
107 }
108 }
109 }()
110
111 if err := s.streamLogs(ctx, conn, pipelineID); err != nil {
112 l.Error("streamLogs failed", "err", err)
113 }
114 l.Info("logs connection closed")
115}
116
117func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, pipelineID string) error {
118 l := s.l.With("pipelineID", pipelineID)
119
120 stdoutCh, stderrCh, ok := s.eng.LogChannels(pipelineID)
121 if !ok {
122 return fmt.Errorf("pipelineID %q not found", pipelineID)
123 }
124
125 done := make(chan struct{})
126
127 go func() {
128 for {
129 select {
130 case line, ok := <-stdoutCh:
131 if !ok {
132 done <- struct{}{}
133 return
134 }
135 msg := map[string]string{"type": "stdout", "data": line}
136 if err := conn.WriteJSON(msg); err != nil {
137 l.Error("write stdout failed", "err", err)
138 done <- struct{}{}
139 return
140 }
141 case <-ctx.Done():
142 done <- struct{}{}
143 return
144 }
145 }
146 }()
147
148 go func() {
149 for {
150 select {
151 case line, ok := <-stderrCh:
152 if !ok {
153 done <- struct{}{}
154 return
155 }
156 msg := map[string]string{"type": "stderr", "data": line}
157 if err := conn.WriteJSON(msg); err != nil {
158 l.Error("write stderr failed", "err", err)
159 done <- struct{}{}
160 return
161 }
162 case <-ctx.Done():
163 done <- struct{}{}
164 return
165 }
166 }
167 }()
168
169 select {
170 case <-done:
171 case <-ctx.Done():
172 }
173
174 return nil
175}
176
177func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *string) error {
178 ops, err := s.db.GetPipelineStatusAsRecords(*cursor)
179 if err != nil {
180 s.l.Debug("err", "err", err)
181 return err
182 }
183 s.l.Debug("ops", "ops", ops)
184
185 for _, op := range ops {
186 if err := conn.WriteJSON(op); err != nil {
187 s.l.Debug("err", "err", err)
188 return err
189 }
190 *cursor = op.Rkey
191 }
192
193 return nil
194}