forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "net/http" 8 "strconv" 9 "time" 10 11 "tangled.sh/tangled.sh/core/spindle/engine" 12 "tangled.sh/tangled.sh/core/spindle/models" 13 14 "github.com/go-chi/chi/v5" 15 "github.com/gorilla/websocket" 16 "github.com/hpcloud/tail" 17) 18 19var upgrader = websocket.Upgrader{ 20 ReadBufferSize: 1024, 21 WriteBufferSize: 1024, 22} 23 24func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 25 l := s.l.With("handler", "Events") 26 l.Debug("received new connection") 27 28 conn, err := upgrader.Upgrade(w, r, nil) 29 if err != nil { 30 l.Error("websocket upgrade failed", "err", err) 31 w.WriteHeader(http.StatusInternalServerError) 32 return 33 } 34 defer conn.Close() 35 l.Debug("upgraded http to wss") 36 37 ch := s.n.Subscribe() 38 defer s.n.Unsubscribe(ch) 39 40 ctx, cancel := context.WithCancel(r.Context()) 41 defer cancel() 42 go func() { 43 for { 44 if _, _, err := conn.NextReader(); err != nil { 45 l.Error("failed to read", "err", err) 46 cancel() 47 return 48 } 49 } 50 }() 51 52 defaultCursor := time.Now().UnixNano() 53 cursorStr := r.URL.Query().Get("cursor") 54 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 55 if err != nil { 56 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 57 } 58 if cursor == 0 { 59 cursor = defaultCursor 60 } 61 62 // complete backfill first before going to live data 63 l.Debug("going through backfill", "cursor", cursor) 64 if err := s.streamPipelines(conn, &cursor); err != nil { 65 l.Error("failed to backfill", "err", err) 66 return 67 } 68 69 for { 70 // wait for new data or timeout 71 select { 72 case <-ctx.Done(): 73 l.Debug("stopping stream: client closed connection") 74 return 75 case <-ch: 76 // we have been notified of new data 77 l.Debug("going through live data", "cursor", cursor) 78 if err := s.streamPipelines(conn, &cursor); err != nil { 79 l.Error("failed to stream", "err", err) 80 return 81 } 82 case <-time.After(30 * time.Second): 83 // send a keep-alive 84 l.Debug("sent keepalive") 85 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 86 l.Error("failed to write control", "err", err) 87 } 88 } 89 } 90} 91 92func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 93 wid, err := getWorkflowID(r) 94 if err != nil { 95 http.Error(w, err.Error(), http.StatusBadRequest) 96 return 97 } 98 99 l := s.l.With("handler", "Logs") 100 l = s.l.With("wid", wid) 101 102 conn, err := upgrader.Upgrade(w, r, nil) 103 if err != nil { 104 l.Error("websocket upgrade failed", "err", err) 105 http.Error(w, "failed to upgrade", http.StatusInternalServerError) 106 return 107 } 108 defer conn.Close() 109 l.Debug("upgraded http to wss") 110 111 ctx, cancel := context.WithCancel(r.Context()) 112 defer cancel() 113 114 go func() { 115 for { 116 if _, _, err := conn.NextReader(); err != nil { 117 l.Debug("client disconnected", "err", err) 118 cancel() 119 return 120 } 121 } 122 }() 123 124 if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil { 125 l.Error("log stream failed", "err", err) 126 } 127 l.Debug("logs connection closed") 128} 129 130func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error { 131 filePath := engine.LogFilePath(s.cfg.Pipelines.LogDir, wid) 132 133 config := tail.Config{ 134 Follow: true, 135 ReOpen: true, 136 MustExist: false, 137 Location: &tail.SeekInfo{Offset: 0, Whence: 0}, 138 Logger: tail.DiscardingLogger, 139 } 140 141 t, err := tail.TailFile(filePath, config) 142 if err != nil { 143 return fmt.Errorf("failed to tail log file: %w", err) 144 } 145 defer t.Stop() 146 147 for { 148 select { 149 case <-ctx.Done(): 150 return ctx.Err() 151 case line := <-t.Lines: 152 if line == nil { 153 return fmt.Errorf("tail channel closed unexpectedly") 154 } 155 156 if line.Err != nil { 157 return fmt.Errorf("error tailing log file: %w", line.Err) 158 } 159 160 if err := conn.WriteMessage(websocket.TextMessage, []byte(line.Text)); err != nil { 161 return fmt.Errorf("failed to write to websocket: %w", err) 162 } 163 } 164 } 165} 166 167func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 168 events, err := s.db.GetEvents(*cursor) 169 if err != nil { 170 s.l.Debug("err", "err", err) 171 return err 172 } 173 s.l.Debug("ops", "ops", events) 174 175 for _, event := range events { 176 // first extract the inner json into a map 177 var eventJson map[string]any 178 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 179 if err != nil { 180 s.l.Error("failed to unmarshal event", "err", err) 181 return err 182 } 183 184 jsonMsg, err := json.Marshal(map[string]any{ 185 "rkey": event.Rkey, 186 "nsid": event.Nsid, 187 "event": eventJson, 188 }) 189 if err != nil { 190 s.l.Error("failed to marshal record", "err", err) 191 return err 192 } 193 194 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 195 s.l.Debug("err", "err", err) 196 return err 197 } 198 *cursor = event.Created 199 } 200 201 return nil 202} 203 204func getWorkflowID(r *http.Request) (models.WorkflowId, error) { 205 knot := chi.URLParam(r, "knot") 206 rkey := chi.URLParam(r, "rkey") 207 name := chi.URLParam(r, "name") 208 209 if knot == "" || rkey == "" || name == "" { 210 return models.WorkflowId{}, fmt.Errorf("missing required parameters") 211 } 212 213 return models.WorkflowId{ 214 PipelineId: models.PipelineId{ 215 Knot: knot, 216 Rkey: rkey, 217 }, 218 Name: name, 219 }, nil 220}