forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle
2
3import (
4 "bufio"
5 "context"
6 "encoding/json"
7 "fmt"
8 "net/http"
9 "strconv"
10 "strings"
11 "time"
12
13 "tangled.sh/tangled.sh/core/spindle/engine"
14 "tangled.sh/tangled.sh/core/spindle/models"
15
16 "github.com/go-chi/chi/v5"
17 "github.com/gorilla/websocket"
18)
19
20var upgrader = websocket.Upgrader{
21 ReadBufferSize: 1024,
22 WriteBufferSize: 1024,
23}
24
25func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) {
26 l := s.l.With("handler", "Events")
27 l.Debug("received new connection")
28
29 conn, err := upgrader.Upgrade(w, r, nil)
30 if err != nil {
31 l.Error("websocket upgrade failed", "err", err)
32 w.WriteHeader(http.StatusInternalServerError)
33 return
34 }
35 defer conn.Close()
36 l.Debug("upgraded http to wss")
37
38 ch := s.n.Subscribe()
39 defer s.n.Unsubscribe(ch)
40
41 ctx, cancel := context.WithCancel(r.Context())
42 defer cancel()
43 go func() {
44 for {
45 if _, _, err := conn.NextReader(); err != nil {
46 l.Error("failed to read", "err", err)
47 cancel()
48 return
49 }
50 }
51 }()
52
53 defaultCursor := time.Now().UnixNano()
54 cursorStr := r.URL.Query().Get("cursor")
55 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
56 if err != nil {
57 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
58 }
59 if cursor == 0 {
60 cursor = defaultCursor
61 }
62
63 // complete backfill first before going to live data
64 l.Debug("going through backfill", "cursor", cursor)
65 if err := s.streamPipelines(conn, &cursor); err != nil {
66 l.Error("failed to backfill", "err", err)
67 return
68 }
69
70 for {
71 // wait for new data or timeout
72 select {
73 case <-ctx.Done():
74 l.Debug("stopping stream: client closed connection")
75 return
76 case <-ch:
77 // we have been notified of new data
78 l.Debug("going through live data", "cursor", cursor)
79 if err := s.streamPipelines(conn, &cursor); err != nil {
80 l.Error("failed to stream", "err", err)
81 return
82 }
83 case <-time.After(30 * time.Second):
84 // send a keep-alive
85 l.Debug("sent keepalive")
86 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
87 l.Error("failed to write control", "err", err)
88 }
89 }
90 }
91}
92
93func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
94 wid, err := getWorkflowID(r)
95 if err != nil {
96 http.Error(w, err.Error(), http.StatusBadRequest)
97 return
98 }
99
100 s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error {
101 return s.streamLogs(ctx, conn, wid)
102 })
103}
104
105func (s *Spindle) StepLogs(w http.ResponseWriter, r *http.Request) {
106 wid, err := getWorkflowID(r)
107 if err != nil {
108 http.Error(w, err.Error(), http.StatusBadRequest)
109 return
110 }
111
112 idxStr := chi.URLParam(r, "idx")
113 if idxStr == "" {
114 http.Error(w, "step index required", http.StatusBadRequest)
115 return
116 }
117 idx, err := strconv.Atoi(idxStr)
118 if err != nil {
119 http.Error(w, "bad step index", http.StatusBadRequest)
120 return
121 }
122
123 s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error {
124 return s.streamLogFromDisk(ctx, conn, wid, idx)
125 })
126}
127
128func (s *Spindle) handleLogStream(w http.ResponseWriter, r *http.Request, streamFn func(ctx context.Context, conn *websocket.Conn) error) {
129 l := s.l.With("handler", "Logs")
130
131 conn, err := upgrader.Upgrade(w, r, nil)
132 if err != nil {
133 l.Error("websocket upgrade failed", "err", err)
134 http.Error(w, "failed to upgrade", http.StatusInternalServerError)
135 return
136 }
137 defer conn.Close()
138 l.Debug("upgraded http to wss")
139
140 ctx, cancel := context.WithCancel(r.Context())
141 defer cancel()
142
143 go func() {
144 for {
145 if _, _, err := conn.NextReader(); err != nil {
146 l.Debug("client disconnected", "err", err)
147 cancel()
148 return
149 }
150 }
151 }()
152
153 if err := streamFn(ctx, conn); err != nil {
154 l.Error("log stream failed", "err", err)
155 }
156 l.Debug("logs connection closed")
157}
158
159func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error {
160 l := s.l.With("workflow_id", wid.String())
161
162 stdoutCh, stderrCh, ok := s.eng.LogChannels(wid)
163 if !ok {
164 return fmt.Errorf("workflow_id %q not found", wid.String())
165 }
166
167 done := make(chan struct{})
168
169 go func() {
170 for {
171 select {
172 case line, ok := <-stdoutCh:
173 if !ok {
174 done <- struct{}{}
175 return
176 }
177 msg := map[string]string{"type": "stdout", "data": line}
178 if err := conn.WriteJSON(msg); err != nil {
179 l.Error("write stdout failed", "err", err)
180 done <- struct{}{}
181 return
182 }
183 case <-ctx.Done():
184 done <- struct{}{}
185 return
186 }
187 }
188 }()
189
190 go func() {
191 for {
192 select {
193 case line, ok := <-stderrCh:
194 if !ok {
195 done <- struct{}{}
196 return
197 }
198 msg := map[string]string{"type": "stderr", "data": line}
199 if err := conn.WriteJSON(msg); err != nil {
200 l.Error("write stderr failed", "err", err)
201 done <- struct{}{}
202 return
203 }
204 case <-ctx.Done():
205 done <- struct{}{}
206 return
207 }
208 }
209 }()
210
211 select {
212 case <-done:
213 case <-ctx.Done():
214 }
215
216 return nil
217}
218
219func (s *Spindle) streamLogFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId, stepIdx int) error {
220 streams := []string{"stdout", "stderr"}
221
222 for _, stream := range streams {
223 data, err := engine.ReadStepLog(s.cfg.Pipelines.LogDir, wid.String(), stream, stepIdx)
224 if err != nil {
225 // log but continue to next stream
226 s.l.Error("failed to read step log", "stream", stream, "step", stepIdx, "wid", wid.String(), "err", err)
227 continue
228 }
229
230 scanner := bufio.NewScanner(strings.NewReader(data))
231 for scanner.Scan() {
232 select {
233 case <-ctx.Done():
234 return ctx.Err()
235 default:
236 msg := map[string]string{
237 "type": stream,
238 "data": scanner.Text(),
239 }
240 if err := conn.WriteJSON(msg); err != nil {
241 return err
242 }
243 }
244 }
245
246 if err := scanner.Err(); err != nil {
247 return fmt.Errorf("error scanning %s log: %w", stream, err)
248 }
249 }
250
251 return nil
252}
253
254func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
255 events, err := s.db.GetEvents(*cursor)
256 if err != nil {
257 s.l.Debug("err", "err", err)
258 return err
259 }
260 s.l.Debug("ops", "ops", events)
261
262 for _, event := range events {
263 // first extract the inner json into a map
264 var eventJson map[string]any
265 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
266 if err != nil {
267 s.l.Error("failed to unmarshal event", "err", err)
268 return err
269 }
270
271 jsonMsg, err := json.Marshal(map[string]any{
272 "rkey": event.Rkey,
273 "nsid": event.Nsid,
274 "event": eventJson,
275 })
276 if err != nil {
277 s.l.Error("failed to marshal record", "err", err)
278 return err
279 }
280
281 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
282 s.l.Debug("err", "err", err)
283 return err
284 }
285 *cursor = event.Created
286 }
287
288 return nil
289}
290
291func getWorkflowID(r *http.Request) (models.WorkflowId, error) {
292 knot := chi.URLParam(r, "knot")
293 rkey := chi.URLParam(r, "rkey")
294 name := chi.URLParam(r, "name")
295
296 if knot == "" || rkey == "" || name == "" {
297 return models.WorkflowId{}, fmt.Errorf("missing required parameters")
298 }
299
300 return models.WorkflowId{
301 PipelineId: models.PipelineId{
302 Knot: knot,
303 Rkey: rkey,
304 },
305 Name: name,
306 }, nil
307}