1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "os"
10 "strconv"
11 "time"
12
13 "tangled.sh/tangled.sh/core/spindle/models"
14
15 "github.com/go-chi/chi/v5"
16 "github.com/gorilla/websocket"
17 "github.com/hpcloud/tail"
18)
19
20var upgrader = websocket.Upgrader{
21 ReadBufferSize: 1024,
22 WriteBufferSize: 1024,
23}
24
25func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) {
26 l := s.l.With("handler", "Events")
27 l.Debug("received new connection")
28
29 conn, err := upgrader.Upgrade(w, r, nil)
30 if err != nil {
31 l.Error("websocket upgrade failed", "err", err)
32 w.WriteHeader(http.StatusInternalServerError)
33 return
34 }
35 defer conn.Close()
36 l.Debug("upgraded http to wss")
37
38 ch := s.n.Subscribe()
39 defer s.n.Unsubscribe(ch)
40
41 ctx, cancel := context.WithCancel(r.Context())
42 defer cancel()
43 go func() {
44 for {
45 if _, _, err := conn.NextReader(); err != nil {
46 l.Error("failed to read", "err", err)
47 cancel()
48 return
49 }
50 }
51 }()
52
53 defaultCursor := time.Now().UnixNano()
54 cursorStr := r.URL.Query().Get("cursor")
55 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
56 if err != nil {
57 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
58 }
59 if cursor == 0 {
60 cursor = defaultCursor
61 }
62
63 // complete backfill first before going to live data
64 l.Debug("going through backfill", "cursor", cursor)
65 if err := s.streamPipelines(conn, &cursor); err != nil {
66 l.Error("failed to backfill", "err", err)
67 return
68 }
69
70 for {
71 // wait for new data or timeout
72 select {
73 case <-ctx.Done():
74 l.Debug("stopping stream: client closed connection")
75 return
76 case <-ch:
77 // we have been notified of new data
78 l.Debug("going through live data", "cursor", cursor)
79 if err := s.streamPipelines(conn, &cursor); err != nil {
80 l.Error("failed to stream", "err", err)
81 return
82 }
83 case <-time.After(30 * time.Second):
84 // send a keep-alive
85 l.Debug("sent keepalive")
86 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
87 l.Error("failed to write control", "err", err)
88 }
89 }
90 }
91}
92
93func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
94 wid, err := getWorkflowID(r)
95 if err != nil {
96 http.Error(w, err.Error(), http.StatusBadRequest)
97 return
98 }
99
100 l := s.l.With("handler", "Logs")
101 l = s.l.With("wid", wid)
102
103 conn, err := upgrader.Upgrade(w, r, nil)
104 if err != nil {
105 l.Error("websocket upgrade failed", "err", err)
106 http.Error(w, "failed to upgrade", http.StatusInternalServerError)
107 return
108 }
109 defer func() {
110 _ = conn.WriteControl(
111 websocket.CloseMessage,
112 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
113 time.Now().Add(time.Second),
114 )
115 conn.Close()
116 }()
117 l.Debug("upgraded http to wss")
118
119 ctx, cancel := context.WithCancel(r.Context())
120 defer cancel()
121
122 go func() {
123 for {
124 if _, _, err := conn.NextReader(); err != nil {
125 l.Debug("client disconnected", "err", err)
126 cancel()
127 return
128 }
129 }
130 }()
131
132 if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil {
133 l.Info("log stream ended", "err", err)
134 }
135
136 l.Info("logs connection closed")
137}
138
139func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error {
140 status, err := s.db.GetStatus(wid)
141 if err != nil {
142 return err
143 }
144 isFinished := models.StatusKind(status.Status).IsFinish()
145
146 filePath := models.LogFilePath(s.cfg.Server.LogDir, wid)
147
148 if status.Status == models.StatusKindFailed.String() && status.Error != nil {
149 if _, err := os.Stat(filePath); os.IsNotExist(err) {
150 msgs := []models.LogLine{
151 {
152 Kind: models.LogKindControl,
153 Content: "",
154 StepId: 0,
155 StepKind: models.StepKindUser,
156 },
157 {
158 Kind: models.LogKindData,
159 Content: *status.Error,
160 },
161 }
162
163 for _, msg := range msgs {
164 b, err := json.Marshal(msg)
165 if err != nil {
166 return err
167 }
168
169 if err := conn.WriteMessage(websocket.TextMessage, b); err != nil {
170 return fmt.Errorf("failed to write to websocket: %w", err)
171 }
172 }
173
174 return nil
175 }
176 }
177
178 config := tail.Config{
179 Follow: !isFinished,
180 ReOpen: !isFinished,
181 MustExist: false,
182 Location: &tail.SeekInfo{
183 Offset: 0,
184 Whence: io.SeekStart,
185 },
186 // Logger: tail.DiscardingLogger,
187 }
188
189 t, err := tail.TailFile(filePath, config)
190 if err != nil {
191 return fmt.Errorf("failed to tail log file: %w", err)
192 }
193 defer t.Stop()
194
195 for {
196 select {
197 case <-ctx.Done():
198 return ctx.Err()
199 case line := <-t.Lines:
200 if line == nil && isFinished {
201 return fmt.Errorf("tail completed")
202 }
203
204 if line == nil {
205 return fmt.Errorf("tail channel closed unexpectedly")
206 }
207
208 if line.Err != nil {
209 return fmt.Errorf("error tailing log file: %w", line.Err)
210 }
211
212 if err := conn.WriteMessage(websocket.TextMessage, []byte(line.Text)); err != nil {
213 return fmt.Errorf("failed to write to websocket: %w", err)
214 }
215 }
216 }
217}
218
219func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
220 events, err := s.db.GetEvents(*cursor)
221 if err != nil {
222 s.l.Debug("err", "err", err)
223 return err
224 }
225 s.l.Debug("ops", "ops", events)
226
227 for _, event := range events {
228 // first extract the inner json into a map
229 var eventJson map[string]any
230 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
231 if err != nil {
232 s.l.Error("failed to unmarshal event", "err", err)
233 return err
234 }
235
236 jsonMsg, err := json.Marshal(map[string]any{
237 "rkey": event.Rkey,
238 "nsid": event.Nsid,
239 "event": eventJson,
240 })
241 if err != nil {
242 s.l.Error("failed to marshal record", "err", err)
243 return err
244 }
245
246 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
247 s.l.Debug("err", "err", err)
248 return err
249 }
250 *cursor = event.Created
251 }
252
253 return nil
254}
255
256func getWorkflowID(r *http.Request) (models.WorkflowId, error) {
257 knot := chi.URLParam(r, "knot")
258 rkey := chi.URLParam(r, "rkey")
259 name := chi.URLParam(r, "name")
260
261 if knot == "" || rkey == "" || name == "" {
262 return models.WorkflowId{}, fmt.Errorf("missing required parameters")
263 }
264
265 return models.WorkflowId{
266 PipelineId: models.PipelineId{
267 Knot: knot,
268 Rkey: rkey,
269 },
270 Name: name,
271 }, nil
272}