forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 8.1 kB view raw
1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 "tangled.org/core/appview/config" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/oauth" 15 "tangled.org/core/appview/pages" 16 "tangled.org/core/appview/reporesolver" 17 "tangled.org/core/eventconsumer" 18 "tangled.org/core/idresolver" 19 "tangled.org/core/orm" 20 "tangled.org/core/rbac" 21 spindlemodel "tangled.org/core/spindle/models" 22 23 "github.com/go-chi/chi/v5" 24 "github.com/gorilla/websocket" 25) 26 27type Pipelines struct { 28 repoResolver *reporesolver.RepoResolver 29 idResolver *idresolver.Resolver 30 config *config.Config 31 oauth *oauth.OAuth 32 pages *pages.Pages 33 spindlestream *eventconsumer.Consumer 34 db *db.DB 35 enforcer *rbac.Enforcer 36 logger *slog.Logger 37} 38 39func (p *Pipelines) Router() http.Handler { 40 r := chi.NewRouter() 41 r.Get("/", p.Index) 42 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 43 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 44 45 return r 46} 47 48func New( 49 oauth *oauth.OAuth, 50 repoResolver *reporesolver.RepoResolver, 51 pages *pages.Pages, 52 spindlestream *eventconsumer.Consumer, 53 idResolver *idresolver.Resolver, 54 db *db.DB, 55 config *config.Config, 56 enforcer *rbac.Enforcer, 57 logger *slog.Logger, 58) *Pipelines { 59 return &Pipelines{ 60 oauth: oauth, 61 repoResolver: repoResolver, 62 pages: pages, 63 idResolver: idResolver, 64 config: config, 65 spindlestream: spindlestream, 66 db: db, 67 enforcer: enforcer, 68 logger: logger, 69 } 70} 71 72func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 73 user := p.oauth.GetUser(r) 74 l := p.logger.With("handler", "Index") 75 76 f, err := p.repoResolver.Resolve(r) 77 if err != nil { 78 l.Error("failed to get repo and knot", "err", err) 79 return 80 } 81 82 ps, err := db.GetPipelineStatuses( 83 p.db, 84 30, 85 orm.FilterEq("repo_owner", f.Did), 86 orm.FilterEq("repo_name", f.Name), 87 orm.FilterEq("knot", f.Knot), 88 ) 89 if err != nil { 90 l.Error("failed to query db", "err", err) 91 return 92 } 93 94 p.pages.Pipelines(w, pages.PipelinesParams{ 95 LoggedInUser: user, 96 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 97 Pipelines: ps, 98 }) 99} 100 101func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 102 user := p.oauth.GetUser(r) 103 l := p.logger.With("handler", "Workflow") 104 105 f, err := p.repoResolver.Resolve(r) 106 if err != nil { 107 l.Error("failed to get repo and knot", "err", err) 108 return 109 } 110 111 pipelineId := chi.URLParam(r, "pipeline") 112 if pipelineId == "" { 113 l.Error("empty pipeline ID") 114 return 115 } 116 117 workflow := chi.URLParam(r, "workflow") 118 if workflow == "" { 119 l.Error("empty workflow name") 120 return 121 } 122 123 ps, err := db.GetPipelineStatuses( 124 p.db, 125 1, 126 orm.FilterEq("repo_owner", f.Did), 127 orm.FilterEq("repo_name", f.Name), 128 orm.FilterEq("knot", f.Knot), 129 orm.FilterEq("id", pipelineId), 130 ) 131 if err != nil { 132 l.Error("failed to query db", "err", err) 133 return 134 } 135 136 if len(ps) != 1 { 137 l.Error("invalid number of pipelines", "len", len(ps)) 138 return 139 } 140 141 singlePipeline := ps[0] 142 143 p.pages.Workflow(w, pages.WorkflowParams{ 144 LoggedInUser: user, 145 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 146 Pipeline: singlePipeline, 147 Workflow: workflow, 148 }) 149} 150 151var upgrader = websocket.Upgrader{ 152 ReadBufferSize: 1024, 153 WriteBufferSize: 1024, 154} 155 156func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 157 l := p.logger.With("handler", "logs") 158 159 clientConn, err := upgrader.Upgrade(w, r, nil) 160 if err != nil { 161 l.Error("websocket upgrade failed", "err", err) 162 return 163 } 164 defer func() { 165 _ = clientConn.WriteControl( 166 websocket.CloseMessage, 167 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 168 time.Now().Add(time.Second), 169 ) 170 clientConn.Close() 171 }() 172 173 ctx, cancel := context.WithCancel(r.Context()) 174 defer cancel() 175 176 f, err := p.repoResolver.Resolve(r) 177 if err != nil { 178 l.Error("failed to get repo and knot", "err", err) 179 http.Error(w, "bad repo/knot", http.StatusBadRequest) 180 return 181 } 182 183 pipelineId := chi.URLParam(r, "pipeline") 184 workflow := chi.URLParam(r, "workflow") 185 if pipelineId == "" || workflow == "" { 186 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 187 return 188 } 189 190 ps, err := db.GetPipelineStatuses( 191 p.db, 192 1, 193 orm.FilterEq("repo_owner", f.Did), 194 orm.FilterEq("repo_name", f.Name), 195 orm.FilterEq("knot", f.Knot), 196 orm.FilterEq("id", pipelineId), 197 ) 198 if err != nil || len(ps) != 1 { 199 l.Error("pipeline query failed", "err", err, "count", len(ps)) 200 http.Error(w, "pipeline not found", http.StatusNotFound) 201 return 202 } 203 204 singlePipeline := ps[0] 205 spindle := f.Spindle 206 knot := f.Knot 207 rkey := singlePipeline.Rkey 208 209 if spindle == "" || knot == "" || rkey == "" { 210 http.Error(w, "invalid repo info", http.StatusBadRequest) 211 return 212 } 213 214 scheme := "wss" 215 if p.config.Core.Dev { 216 scheme = "ws" 217 } 218 219 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 220 l = l.With("url", url) 221 l.Info("logs endpoint hit") 222 223 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 224 if err != nil { 225 l.Error("websocket dial failed", "err", err) 226 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 227 return 228 } 229 defer spindleConn.Close() 230 231 // create a channel for incoming messages 232 evChan := make(chan logEvent, 100) 233 // start a goroutine to read from spindle 234 go readLogs(spindleConn, evChan) 235 236 stepStartTimes := make(map[int]time.Time) 237 var fragment bytes.Buffer 238 for { 239 select { 240 case <-ctx.Done(): 241 l.Info("client disconnected") 242 return 243 244 case ev, ok := <-evChan: 245 if !ok { 246 continue 247 } 248 249 if ev.err != nil && ev.isCloseError() { 250 l.Debug("graceful shutdown, tail complete", "err", err) 251 return 252 } 253 if ev.err != nil { 254 l.Error("error reading from spindle", "err", err) 255 return 256 } 257 258 var logLine spindlemodel.LogLine 259 if err = json.Unmarshal(ev.msg, &logLine); err != nil { 260 l.Error("failed to parse logline", "err", err) 261 continue 262 } 263 264 fragment.Reset() 265 266 switch logLine.Kind { 267 case spindlemodel.LogKindControl: 268 switch logLine.StepStatus { 269 case spindlemodel.StepStatusStart: 270 stepStartTimes[logLine.StepId] = logLine.Time 271 collapsed := false 272 if logLine.StepKind == spindlemodel.StepKindSystem { 273 collapsed = true 274 } 275 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 276 Id: logLine.StepId, 277 Name: logLine.Content, 278 Command: logLine.StepCommand, 279 Collapsed: collapsed, 280 StartTime: logLine.Time, 281 }) 282 case spindlemodel.StepStatusEnd: 283 startTime := stepStartTimes[logLine.StepId] 284 endTime := logLine.Time 285 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{ 286 Id: logLine.StepId, 287 StartTime: startTime, 288 EndTime: endTime, 289 }) 290 } 291 292 case spindlemodel.LogKindData: 293 // data messages simply insert new log lines into current step 294 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 295 Id: logLine.StepId, 296 Content: logLine.Content, 297 }) 298 } 299 if err != nil { 300 l.Error("failed to render log line", "err", err) 301 return 302 } 303 304 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 305 l.Error("error writing to client", "err", err) 306 return 307 } 308 309 case <-time.After(30 * time.Second): 310 l.Debug("sent keepalive") 311 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 312 l.Error("failed to write control", "err", err) 313 return 314 } 315 } 316 } 317} 318 319// either a message or an error 320type logEvent struct { 321 msg []byte 322 err error 323} 324 325func (ev *logEvent) isCloseError() bool { 326 return websocket.IsCloseError( 327 ev.err, 328 websocket.CloseNormalClosure, 329 websocket.CloseGoingAway, 330 websocket.CloseAbnormalClosure, 331 ) 332} 333 334// read logs from spindle and pass through to chan 335func readLogs(conn *websocket.Conn, ch chan logEvent) { 336 defer close(ch) 337 338 for { 339 if conn == nil { 340 return 341 } 342 343 _, msg, err := conn.ReadMessage() 344 if err != nil { 345 ch <- logEvent{err: err} 346 return 347 } 348 ch <- logEvent{msg: msg} 349 } 350}