1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "strconv"
9 "time"
10
11 "tangled.sh/tangled.sh/core/spindle/models"
12
13 "github.com/go-chi/chi/v5"
14 "github.com/gorilla/websocket"
15)
16
17var upgrader = websocket.Upgrader{
18 ReadBufferSize: 1024,
19 WriteBufferSize: 1024,
20}
21
22func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) {
23 l := s.l.With("handler", "Events")
24 l.Debug("received new connection")
25
26 conn, err := upgrader.Upgrade(w, r, nil)
27 if err != nil {
28 l.Error("websocket upgrade failed", "err", err)
29 w.WriteHeader(http.StatusInternalServerError)
30 return
31 }
32 defer conn.Close()
33 l.Debug("upgraded http to wss")
34
35 ch := s.n.Subscribe()
36 defer s.n.Unsubscribe(ch)
37
38 ctx, cancel := context.WithCancel(r.Context())
39 defer cancel()
40 go func() {
41 for {
42 if _, _, err := conn.NextReader(); err != nil {
43 l.Error("failed to read", "err", err)
44 cancel()
45 return
46 }
47 }
48 }()
49
50 defaultCursor := time.Now().UnixNano()
51 cursorStr := r.URL.Query().Get("cursor")
52 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
53 if err != nil {
54 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
55 }
56 if cursor == 0 {
57 cursor = defaultCursor
58 }
59
60 // complete backfill first before going to live data
61 l.Debug("going through backfill", "cursor", cursor)
62 if err := s.streamPipelines(conn, &cursor); err != nil {
63 l.Error("failed to backfill", "err", err)
64 return
65 }
66
67 for {
68 // wait for new data or timeout
69 select {
70 case <-ctx.Done():
71 l.Debug("stopping stream: client closed connection")
72 return
73 case <-ch:
74 // we have been notified of new data
75 l.Debug("going through live data", "cursor", cursor)
76 if err := s.streamPipelines(conn, &cursor); err != nil {
77 l.Error("failed to stream", "err", err)
78 return
79 }
80 case <-time.After(30 * time.Second):
81 // send a keep-alive
82 l.Debug("sent keepalive")
83 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
84 l.Error("failed to write control", "err", err)
85 }
86 }
87 }
88}
89
90func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
91 l := s.l.With("handler", "Logs")
92
93 knot := chi.URLParam(r, "knot")
94 if knot == "" {
95 http.Error(w, "knot required", http.StatusBadRequest)
96 return
97 }
98
99 rkey := chi.URLParam(r, "rkey")
100 if rkey == "" {
101 http.Error(w, "rkey required", http.StatusBadRequest)
102 return
103 }
104
105 name := chi.URLParam(r, "name")
106 if name == "" {
107 http.Error(w, "name required", http.StatusBadRequest)
108 return
109 }
110
111 wid := models.WorkflowId{
112 PipelineId: models.PipelineId{
113 Knot: knot,
114 Rkey: rkey,
115 },
116 Name: name,
117 }
118
119 l = l.With("knot", knot, "rkey", rkey, "name", name)
120
121 conn, err := upgrader.Upgrade(w, r, nil)
122 if err != nil {
123 l.Error("websocket upgrade failed", "err", err)
124 http.Error(w, "failed to upgrade", http.StatusInternalServerError)
125 return
126 }
127 defer conn.Close()
128 l.Debug("upgraded http to wss")
129
130 ctx, cancel := context.WithCancel(r.Context())
131 defer cancel()
132
133 go func() {
134 for {
135 if _, _, err := conn.NextReader(); err != nil {
136 l.Debug("client disconnected", "err", err)
137 cancel()
138 return
139 }
140 }
141 }()
142
143 if err := s.streamLogs(ctx, conn, wid); err != nil {
144 l.Error("streamLogs failed", "err", err)
145 }
146 l.Debug("logs connection closed")
147}
148
149func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error {
150 l := s.l.With("workflow_id", wid.String())
151
152 stdoutCh, stderrCh, ok := s.eng.LogChannels(wid)
153 if !ok {
154 return fmt.Errorf("workflow_id %q not found", wid.String())
155 }
156
157 done := make(chan struct{})
158
159 go func() {
160 for {
161 select {
162 case line, ok := <-stdoutCh:
163 if !ok {
164 done <- struct{}{}
165 return
166 }
167 msg := map[string]string{"type": "stdout", "data": line}
168 if err := conn.WriteJSON(msg); err != nil {
169 l.Error("write stdout failed", "err", err)
170 done <- struct{}{}
171 return
172 }
173 case <-ctx.Done():
174 done <- struct{}{}
175 return
176 }
177 }
178 }()
179
180 go func() {
181 for {
182 select {
183 case line, ok := <-stderrCh:
184 if !ok {
185 done <- struct{}{}
186 return
187 }
188 msg := map[string]string{"type": "stderr", "data": line}
189 if err := conn.WriteJSON(msg); err != nil {
190 l.Error("write stderr failed", "err", err)
191 done <- struct{}{}
192 return
193 }
194 case <-ctx.Done():
195 done <- struct{}{}
196 return
197 }
198 }
199 }()
200
201 select {
202 case <-done:
203 case <-ctx.Done():
204 }
205
206 return nil
207}
208
209func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
210 events, err := s.db.GetEvents(*cursor)
211 if err != nil {
212 s.l.Debug("err", "err", err)
213 return err
214 }
215 s.l.Debug("ops", "ops", events)
216
217 for _, event := range events {
218 // first extract the inner json into a map
219 var eventJson map[string]any
220 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
221 if err != nil {
222 s.l.Error("failed to unmarshal event", "err", err)
223 return err
224 }
225
226 jsonMsg, err := json.Marshal(map[string]any{
227 "rkey": event.Rkey,
228 "nsid": event.Nsid,
229 "event": eventJson,
230 })
231 if err != nil {
232 s.l.Error("failed to marshal record", "err", err)
233 return err
234 }
235
236 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
237 s.l.Debug("err", "err", err)
238 return err
239 }
240 *cursor = event.Created
241 }
242
243 return nil
244}