forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 "tangled.org/core/appview/config" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/oauth" 15 "tangled.org/core/appview/pages" 16 "tangled.org/core/appview/reporesolver" 17 "tangled.org/core/eventconsumer" 18 "tangled.org/core/idresolver" 19 "tangled.org/core/rbac" 20 spindlemodel "tangled.org/core/spindle/models" 21 22 "github.com/go-chi/chi/v5" 23 "github.com/gorilla/websocket" 24) 25 26type Pipelines struct { 27 repoResolver *reporesolver.RepoResolver 28 idResolver *idresolver.Resolver 29 config *config.Config 30 oauth *oauth.OAuth 31 pages *pages.Pages 32 spindlestream *eventconsumer.Consumer 33 db *db.DB 34 enforcer *rbac.Enforcer 35 logger *slog.Logger 36} 37 38func (p *Pipelines) Router() http.Handler { 39 r := chi.NewRouter() 40 r.Get("/", p.Index) 41 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 42 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 43 44 return r 45} 46 47func New( 48 oauth *oauth.OAuth, 49 repoResolver *reporesolver.RepoResolver, 50 pages *pages.Pages, 51 spindlestream *eventconsumer.Consumer, 52 idResolver *idresolver.Resolver, 53 db *db.DB, 54 config *config.Config, 55 enforcer *rbac.Enforcer, 56 logger *slog.Logger, 57) *Pipelines { 58 return &Pipelines{ 59 oauth: oauth, 60 repoResolver: repoResolver, 61 pages: pages, 62 idResolver: idResolver, 63 config: config, 64 spindlestream: spindlestream, 65 db: db, 66 enforcer: enforcer, 67 logger: logger, 68 } 69} 70 71func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 72 user := p.oauth.GetUser(r) 73 l := p.logger.With("handler", "Index") 74 75 f, err := p.repoResolver.Resolve(r) 76 if err != nil { 77 l.Error("failed to get repo and knot", "err", err) 78 return 79 } 80 81 ps, err := db.GetPipelineStatuses( 82 p.db, 83 30, 84 db.FilterEq("repo_owner", f.Did), 85 db.FilterEq("repo_name", f.Name), 86 db.FilterEq("knot", f.Knot), 87 ) 88 if err != nil { 89 l.Error("failed to query db", "err", err) 90 return 91 } 92 93 p.pages.Pipelines(w, pages.PipelinesParams{ 94 LoggedInUser: user, 95 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 96 Pipelines: ps, 97 }) 98} 99 100func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 101 user := p.oauth.GetUser(r) 102 l := p.logger.With("handler", "Workflow") 103 104 f, err := p.repoResolver.Resolve(r) 105 if err != nil { 106 l.Error("failed to get repo and knot", "err", err) 107 return 108 } 109 110 pipelineId := chi.URLParam(r, "pipeline") 111 if pipelineId == "" { 112 l.Error("empty pipeline ID") 113 return 114 } 115 116 workflow := chi.URLParam(r, "workflow") 117 if workflow == "" { 118 l.Error("empty workflow name") 119 return 120 } 121 122 ps, err := db.GetPipelineStatuses( 123 p.db, 124 1, 125 db.FilterEq("repo_owner", f.Did), 126 db.FilterEq("repo_name", f.Name), 127 db.FilterEq("knot", f.Knot), 128 db.FilterEq("id", pipelineId), 129 ) 130 if err != nil { 131 l.Error("failed to query db", "err", err) 132 return 133 } 134 135 if len(ps) != 1 { 136 l.Error("invalid number of pipelines", "len", len(ps)) 137 return 138 } 139 140 singlePipeline := ps[0] 141 142 p.pages.Workflow(w, pages.WorkflowParams{ 143 LoggedInUser: user, 144 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 145 Pipeline: singlePipeline, 146 Workflow: workflow, 147 }) 148} 149 150var upgrader = websocket.Upgrader{ 151 ReadBufferSize: 1024, 152 WriteBufferSize: 1024, 153} 154 155func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 156 l := p.logger.With("handler", "logs") 157 158 clientConn, err := upgrader.Upgrade(w, r, nil) 159 if err != nil { 160 l.Error("websocket upgrade failed", "err", err) 161 return 162 } 163 defer func() { 164 _ = clientConn.WriteControl( 165 websocket.CloseMessage, 166 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 167 time.Now().Add(time.Second), 168 ) 169 clientConn.Close() 170 }() 171 172 ctx, cancel := context.WithCancel(r.Context()) 173 defer cancel() 174 175 f, err := p.repoResolver.Resolve(r) 176 if err != nil { 177 l.Error("failed to get repo and knot", "err", err) 178 http.Error(w, "bad repo/knot", http.StatusBadRequest) 179 return 180 } 181 182 pipelineId := chi.URLParam(r, "pipeline") 183 workflow := chi.URLParam(r, "workflow") 184 if pipelineId == "" || workflow == "" { 185 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 186 return 187 } 188 189 ps, err := db.GetPipelineStatuses( 190 p.db, 191 1, 192 db.FilterEq("repo_owner", f.Did), 193 db.FilterEq("repo_name", f.Name), 194 db.FilterEq("knot", f.Knot), 195 db.FilterEq("id", pipelineId), 196 ) 197 if err != nil || len(ps) != 1 { 198 l.Error("pipeline query failed", "err", err, "count", len(ps)) 199 http.Error(w, "pipeline not found", http.StatusNotFound) 200 return 201 } 202 203 singlePipeline := ps[0] 204 spindle := f.Spindle 205 knot := f.Knot 206 rkey := singlePipeline.Rkey 207 208 if spindle == "" || knot == "" || rkey == "" { 209 http.Error(w, "invalid repo info", http.StatusBadRequest) 210 return 211 } 212 213 scheme := "wss" 214 if p.config.Core.Dev { 215 scheme = "ws" 216 } 217 218 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 219 l = l.With("url", url) 220 l.Info("logs endpoint hit") 221 222 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 223 if err != nil { 224 l.Error("websocket dial failed", "err", err) 225 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 226 return 227 } 228 defer spindleConn.Close() 229 230 // create a channel for incoming messages 231 evChan := make(chan logEvent, 100) 232 // start a goroutine to read from spindle 233 go readLogs(spindleConn, evChan) 234 235 stepStartTimes := make(map[int]time.Time) 236 var fragment bytes.Buffer 237 for { 238 select { 239 case <-ctx.Done(): 240 l.Info("client disconnected") 241 return 242 243 case ev, ok := <-evChan: 244 if !ok { 245 continue 246 } 247 248 if ev.err != nil && ev.isCloseError() { 249 l.Debug("graceful shutdown, tail complete", "err", err) 250 return 251 } 252 if ev.err != nil { 253 l.Error("error reading from spindle", "err", err) 254 return 255 } 256 257 var logLine spindlemodel.LogLine 258 if err = json.Unmarshal(ev.msg, &logLine); err != nil { 259 l.Error("failed to parse logline", "err", err) 260 continue 261 } 262 263 fragment.Reset() 264 265 switch logLine.Kind { 266 case spindlemodel.LogKindControl: 267 switch logLine.StepStatus { 268 case spindlemodel.StepStatusStart: 269 stepStartTimes[logLine.StepId] = logLine.Time 270 collapsed := false 271 if logLine.StepKind == spindlemodel.StepKindSystem { 272 collapsed = true 273 } 274 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 275 Id: logLine.StepId, 276 Name: logLine.Content, 277 Command: logLine.StepCommand, 278 Collapsed: collapsed, 279 StartTime: logLine.Time, 280 }) 281 case spindlemodel.StepStatusEnd: 282 startTime := stepStartTimes[logLine.StepId] 283 endTime := logLine.Time 284 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{ 285 Id: logLine.StepId, 286 StartTime: startTime, 287 EndTime: endTime, 288 }) 289 } 290 291 case spindlemodel.LogKindData: 292 // data messages simply insert new log lines into current step 293 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 294 Id: logLine.StepId, 295 Content: logLine.Content, 296 }) 297 } 298 if err != nil { 299 l.Error("failed to render log line", "err", err) 300 return 301 } 302 303 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 304 l.Error("error writing to client", "err", err) 305 return 306 } 307 308 case <-time.After(30 * time.Second): 309 l.Debug("sent keepalive") 310 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 311 l.Error("failed to write control", "err", err) 312 return 313 } 314 } 315 } 316} 317 318// either a message or an error 319type logEvent struct { 320 msg []byte 321 err error 322} 323 324func (ev *logEvent) isCloseError() bool { 325 return websocket.IsCloseError( 326 ev.err, 327 websocket.CloseNormalClosure, 328 websocket.CloseGoingAway, 329 websocket.CloseAbnormalClosure, 330 ) 331} 332 333// read logs from spindle and pass through to chan 334func readLogs(conn *websocket.Conn, ch chan logEvent) { 335 defer close(ch) 336 337 for { 338 if conn == nil { 339 return 340 } 341 342 _, msg, err := conn.ReadMessage() 343 if err != nil { 344 ch <- logEvent{err: err} 345 return 346 } 347 ch <- logEvent{msg: msg} 348 } 349}