1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "os"
10 "strconv"
11 "time"
12
13 "tangled.org/core/log"
14 "tangled.org/core/spindle/models"
15
16 "github.com/go-chi/chi/v5"
17 "github.com/gorilla/websocket"
18 "github.com/hpcloud/tail"
19)
20
21var upgrader = websocket.Upgrader{
22 ReadBufferSize: 1024,
23 WriteBufferSize: 1024,
24}
25
26func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) {
27 l := log.SubLogger(s.l, "eventstream")
28
29 l.Debug("received new connection")
30
31 conn, err := upgrader.Upgrade(w, r, nil)
32 if err != nil {
33 l.Error("websocket upgrade failed", "err", err)
34 w.WriteHeader(http.StatusInternalServerError)
35 return
36 }
37 defer conn.Close()
38 l.Debug("upgraded http to wss")
39
40 ch := s.n.Subscribe()
41 defer s.n.Unsubscribe(ch)
42
43 ctx, cancel := context.WithCancel(r.Context())
44 defer cancel()
45 go func() {
46 for {
47 if _, _, err := conn.NextReader(); err != nil {
48 l.Error("failed to read", "err", err)
49 cancel()
50 return
51 }
52 }
53 }()
54
55 defaultCursor := time.Now().UnixNano()
56 cursorStr := r.URL.Query().Get("cursor")
57 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
58 if err != nil {
59 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
60 }
61 if cursor == 0 {
62 cursor = defaultCursor
63 }
64
65 // complete backfill first before going to live data
66 l.Debug("going through backfill", "cursor", cursor)
67 if err := s.streamPipelines(conn, &cursor); err != nil {
68 l.Error("failed to backfill", "err", err)
69 return
70 }
71
72 for {
73 // wait for new data or timeout
74 select {
75 case <-ctx.Done():
76 l.Debug("stopping stream: client closed connection")
77 return
78 case <-ch:
79 // we have been notified of new data
80 l.Debug("going through live data", "cursor", cursor)
81 if err := s.streamPipelines(conn, &cursor); err != nil {
82 l.Error("failed to stream", "err", err)
83 return
84 }
85 case <-time.After(30 * time.Second):
86 // send a keep-alive
87 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
88 l.Error("failed to write control", "err", err)
89 }
90 }
91 }
92}
93
94func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
95 wid, err := getWorkflowID(r)
96 if err != nil {
97 http.Error(w, err.Error(), http.StatusBadRequest)
98 return
99 }
100
101 l := s.l.With("handler", "Logs")
102 l = s.l.With("wid", wid)
103
104 conn, err := upgrader.Upgrade(w, r, nil)
105 if err != nil {
106 l.Error("websocket upgrade failed", "err", err)
107 http.Error(w, "failed to upgrade", http.StatusInternalServerError)
108 return
109 }
110 defer func() {
111 _ = conn.WriteControl(
112 websocket.CloseMessage,
113 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
114 time.Now().Add(time.Second),
115 )
116 conn.Close()
117 }()
118 l.Debug("upgraded http to wss")
119
120 ctx, cancel := context.WithCancel(r.Context())
121 defer cancel()
122
123 go func() {
124 for {
125 if _, _, err := conn.NextReader(); err != nil {
126 l.Debug("client disconnected", "err", err)
127 cancel()
128 return
129 }
130 }
131 }()
132
133 if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil {
134 l.Info("log stream ended", "err", err)
135 }
136
137 l.Info("logs connection closed")
138}
139
140func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error {
141 status, err := s.db.GetStatus(wid)
142 if err != nil {
143 return err
144 }
145 isFinished := models.StatusKind(status.Status).IsFinish()
146
147 filePath := models.LogFilePath(s.cfg.Server.LogDir, wid)
148
149 if status.Status == models.StatusKindFailed.String() && status.Error != nil {
150 if _, err := os.Stat(filePath); os.IsNotExist(err) {
151 msgs := []models.LogLine{
152 {
153 Kind: models.LogKindControl,
154 Content: "",
155 StepId: 0,
156 StepKind: models.StepKindUser,
157 },
158 {
159 Kind: models.LogKindData,
160 Content: *status.Error,
161 },
162 }
163
164 for _, msg := range msgs {
165 b, err := json.Marshal(msg)
166 if err != nil {
167 return err
168 }
169
170 if err := conn.WriteMessage(websocket.TextMessage, b); err != nil {
171 return fmt.Errorf("failed to write to websocket: %w", err)
172 }
173 }
174
175 return nil
176 }
177 }
178
179 config := tail.Config{
180 Follow: !isFinished,
181 ReOpen: !isFinished,
182 MustExist: false,
183 Location: &tail.SeekInfo{
184 Offset: 0,
185 Whence: io.SeekStart,
186 },
187 // Logger: tail.DiscardingLogger,
188 }
189
190 t, err := tail.TailFile(filePath, config)
191 if err != nil {
192 return fmt.Errorf("failed to tail log file: %w", err)
193 }
194 defer t.Stop()
195
196 for {
197 select {
198 case <-ctx.Done():
199 return ctx.Err()
200 case line := <-t.Lines:
201 if line == nil && isFinished {
202 return fmt.Errorf("tail completed")
203 }
204
205 if line == nil {
206 return fmt.Errorf("tail channel closed unexpectedly")
207 }
208
209 if line.Err != nil {
210 return fmt.Errorf("error tailing log file: %w", line.Err)
211 }
212
213 if err := conn.WriteMessage(websocket.TextMessage, []byte(line.Text)); err != nil {
214 return fmt.Errorf("failed to write to websocket: %w", err)
215 }
216 case <-time.After(30 * time.Second):
217 // send a keep-alive
218 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
219 return fmt.Errorf("failed to write control: %w", err)
220 }
221 }
222 }
223}
224
225func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
226 events, err := s.db.GetEvents(*cursor)
227 if err != nil {
228 s.l.Debug("err", "err", err)
229 return err
230 }
231
232 for _, event := range events {
233 // first extract the inner json into a map
234 var eventJson map[string]any
235 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
236 if err != nil {
237 s.l.Error("failed to unmarshal event", "err", err)
238 return err
239 }
240
241 jsonMsg, err := json.Marshal(map[string]any{
242 "rkey": event.Rkey,
243 "nsid": event.Nsid,
244 "event": eventJson,
245 })
246 if err != nil {
247 s.l.Error("failed to marshal record", "err", err)
248 return err
249 }
250
251 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
252 s.l.Debug("err", "err", err)
253 return err
254 }
255 *cursor = event.Created
256 }
257
258 return nil
259}
260
261func getWorkflowID(r *http.Request) (models.WorkflowId, error) {
262 knot := chi.URLParam(r, "knot")
263 rkey := chi.URLParam(r, "rkey")
264 name := chi.URLParam(r, "name")
265
266 if knot == "" || rkey == "" || name == "" {
267 return models.WorkflowId{}, fmt.Errorf("missing required parameters")
268 }
269
270 return models.WorkflowId{
271 PipelineId: models.PipelineId{
272 Knot: knot,
273 Rkey: rkey,
274 },
275 Name: name,
276 }, nil
277}