1package spindle
2
3import (
4 "context"
5 "fmt"
6 "net/http"
7 "strconv"
8 "time"
9
10 "tangled.sh/tangled.sh/core/spindle/models"
11
12 "github.com/go-chi/chi/v5"
13 "github.com/gorilla/websocket"
14)
15
16var upgrader = websocket.Upgrader{
17 ReadBufferSize: 1024,
18 WriteBufferSize: 1024,
19}
20
21func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) {
22 l := s.l.With("handler", "Events")
23 l.Debug("received new connection")
24
25 conn, err := upgrader.Upgrade(w, r, nil)
26 if err != nil {
27 l.Error("websocket upgrade failed", "err", err)
28 w.WriteHeader(http.StatusInternalServerError)
29 return
30 }
31 defer conn.Close()
32 l.Debug("upgraded http to wss")
33
34 ch := s.n.Subscribe()
35 defer s.n.Unsubscribe(ch)
36
37 ctx, cancel := context.WithCancel(r.Context())
38 defer cancel()
39 go func() {
40 for {
41 if _, _, err := conn.NextReader(); err != nil {
42 l.Error("failed to read", "err", err)
43 cancel()
44 return
45 }
46 }
47 }()
48
49 defaultCursor := time.Now().UnixNano()
50 cursorStr := r.URL.Query().Get("cursor")
51 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
52 if err != nil {
53 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
54 }
55 if cursor == 0 {
56 cursor = defaultCursor
57 }
58
59 // complete backfill first before going to live data
60 l.Debug("going through backfill", "cursor", cursor)
61 if err := s.streamPipelines(conn, &cursor); err != nil {
62 l.Error("failed to backfill", "err", err)
63 return
64 }
65
66 for {
67 // wait for new data or timeout
68 select {
69 case <-ctx.Done():
70 l.Debug("stopping stream: client closed connection")
71 return
72 case <-ch:
73 // we have been notified of new data
74 l.Debug("going through live data", "cursor", cursor)
75 if err := s.streamPipelines(conn, &cursor); err != nil {
76 l.Error("failed to stream", "err", err)
77 return
78 }
79 case <-time.After(30 * time.Second):
80 // send a keep-alive
81 l.Debug("sent keepalive")
82 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
83 l.Error("failed to write control", "err", err)
84 }
85 }
86 }
87}
88
89func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
90 l := s.l.With("handler", "Logs")
91
92 knot := chi.URLParam(r, "knot")
93 if knot == "" {
94 http.Error(w, "knot required", http.StatusBadRequest)
95 return
96 }
97
98 rkey := chi.URLParam(r, "rkey")
99 if rkey == "" {
100 http.Error(w, "rkey required", http.StatusBadRequest)
101 return
102 }
103
104 name := chi.URLParam(r, "name")
105 if name == "" {
106 http.Error(w, "name required", http.StatusBadRequest)
107 return
108 }
109
110 wid := models.WorkflowId{
111 PipelineId: models.PipelineId{
112 Knot: knot,
113 Rkey: rkey,
114 },
115 Name: name,
116 }
117
118 l = l.With("knot", knot, "rkey", rkey, "name", name)
119
120 conn, err := upgrader.Upgrade(w, r, nil)
121 if err != nil {
122 l.Error("websocket upgrade failed", "err", err)
123 http.Error(w, "failed to upgrade", http.StatusInternalServerError)
124 return
125 }
126 defer conn.Close()
127 l.Debug("upgraded http to wss")
128
129 ctx, cancel := context.WithCancel(r.Context())
130 defer cancel()
131
132 go func() {
133 for {
134 if _, _, err := conn.NextReader(); err != nil {
135 l.Debug("client disconnected", "err", err)
136 cancel()
137 return
138 }
139 }
140 }()
141
142 if err := s.streamLogs(ctx, conn, wid); err != nil {
143 l.Error("streamLogs failed", "err", err)
144 }
145 l.Debug("logs connection closed")
146}
147
148func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error {
149 l := s.l.With("workflow_id", wid.String())
150
151 stdoutCh, stderrCh, ok := s.eng.LogChannels(wid)
152 if !ok {
153 return fmt.Errorf("workflow_id %q not found", wid.String())
154 }
155
156 done := make(chan struct{})
157
158 go func() {
159 for {
160 select {
161 case line, ok := <-stdoutCh:
162 if !ok {
163 done <- struct{}{}
164 return
165 }
166 msg := map[string]string{"type": "stdout", "data": line}
167 if err := conn.WriteJSON(msg); err != nil {
168 l.Error("write stdout failed", "err", err)
169 done <- struct{}{}
170 return
171 }
172 case <-ctx.Done():
173 done <- struct{}{}
174 return
175 }
176 }
177 }()
178
179 go func() {
180 for {
181 select {
182 case line, ok := <-stderrCh:
183 if !ok {
184 done <- struct{}{}
185 return
186 }
187 msg := map[string]string{"type": "stderr", "data": line}
188 if err := conn.WriteJSON(msg); err != nil {
189 l.Error("write stderr failed", "err", err)
190 done <- struct{}{}
191 return
192 }
193 case <-ctx.Done():
194 done <- struct{}{}
195 return
196 }
197 }
198 }()
199
200 select {
201 case <-done:
202 case <-ctx.Done():
203 }
204
205 return nil
206}
207
208func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
209 ops, err := s.db.GetEvents(*cursor)
210 if err != nil {
211 s.l.Debug("err", "err", err)
212 return err
213 }
214 s.l.Debug("ops", "ops", ops)
215
216 for _, op := range ops {
217 if err := conn.WriteJSON(op); err != nil {
218 s.l.Debug("err", "err", err)
219 return err
220 }
221 *cursor = op.Created
222 }
223
224 return nil
225}