forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 "tangled.org/core/appview/config" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/oauth" 15 "tangled.org/core/appview/pages" 16 "tangled.org/core/appview/reporesolver" 17 "tangled.org/core/eventconsumer" 18 "tangled.org/core/idresolver" 19 "tangled.org/core/rbac" 20 spindlemodel "tangled.org/core/spindle/models" 21 22 "github.com/go-chi/chi/v5" 23 "github.com/gorilla/websocket" 24) 25 26type Pipelines struct { 27 repoResolver *reporesolver.RepoResolver 28 idResolver *idresolver.Resolver 29 config *config.Config 30 oauth *oauth.OAuth 31 pages *pages.Pages 32 spindlestream *eventconsumer.Consumer 33 db *db.DB 34 enforcer *rbac.Enforcer 35 logger *slog.Logger 36} 37 38func (p *Pipelines) Router() http.Handler { 39 r := chi.NewRouter() 40 r.Get("/", p.Index) 41 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 42 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 43 44 return r 45} 46 47func New( 48 oauth *oauth.OAuth, 49 repoResolver *reporesolver.RepoResolver, 50 pages *pages.Pages, 51 spindlestream *eventconsumer.Consumer, 52 idResolver *idresolver.Resolver, 53 db *db.DB, 54 config *config.Config, 55 enforcer *rbac.Enforcer, 56 logger *slog.Logger, 57) *Pipelines { 58 return &Pipelines{ 59 oauth: oauth, 60 repoResolver: repoResolver, 61 pages: pages, 62 idResolver: idResolver, 63 config: config, 64 spindlestream: spindlestream, 65 db: db, 66 enforcer: enforcer, 67 logger: logger, 68 } 69} 70 71func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 72 user := p.oauth.GetUser(r) 73 l := p.logger.With("handler", "Index") 74 75 f, err := p.repoResolver.Resolve(r) 76 if err != nil { 77 l.Error("failed to get repo and knot", "err", err) 78 return 79 } 80 81 repoInfo := f.RepoInfo(user) 82 83 ps, err := db.GetPipelineStatuses( 84 p.db, 85 db.FilterEq("repo_owner", repoInfo.OwnerDid), 86 db.FilterEq("repo_name", repoInfo.Name), 87 db.FilterEq("knot", repoInfo.Knot), 88 ) 89 if err != nil { 90 l.Error("failed to query db", "err", err) 91 return 92 } 93 94 p.pages.Pipelines(w, pages.PipelinesParams{ 95 LoggedInUser: user, 96 RepoInfo: repoInfo, 97 Pipelines: ps, 98 }) 99} 100 101func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 102 user := p.oauth.GetUser(r) 103 l := p.logger.With("handler", "Workflow") 104 105 f, err := p.repoResolver.Resolve(r) 106 if err != nil { 107 l.Error("failed to get repo and knot", "err", err) 108 return 109 } 110 111 repoInfo := f.RepoInfo(user) 112 113 pipelineId := chi.URLParam(r, "pipeline") 114 if pipelineId == "" { 115 l.Error("empty pipeline ID") 116 return 117 } 118 119 workflow := chi.URLParam(r, "workflow") 120 if workflow == "" { 121 l.Error("empty workflow name") 122 return 123 } 124 125 ps, err := db.GetPipelineStatuses( 126 p.db, 127 db.FilterEq("repo_owner", repoInfo.OwnerDid), 128 db.FilterEq("repo_name", repoInfo.Name), 129 db.FilterEq("knot", repoInfo.Knot), 130 db.FilterEq("id", pipelineId), 131 ) 132 if err != nil { 133 l.Error("failed to query db", "err", err) 134 return 135 } 136 137 if len(ps) != 1 { 138 l.Error("invalid number of pipelines", "len", len(ps)) 139 return 140 } 141 142 singlePipeline := ps[0] 143 144 p.pages.Workflow(w, pages.WorkflowParams{ 145 LoggedInUser: user, 146 RepoInfo: repoInfo, 147 Pipeline: singlePipeline, 148 Workflow: workflow, 149 }) 150} 151 152var upgrader = websocket.Upgrader{ 153 ReadBufferSize: 1024, 154 WriteBufferSize: 1024, 155} 156 157func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 158 l := p.logger.With("handler", "logs") 159 160 clientConn, err := upgrader.Upgrade(w, r, nil) 161 if err != nil { 162 l.Error("websocket upgrade failed", "err", err) 163 return 164 } 165 defer func() { 166 _ = clientConn.WriteControl( 167 websocket.CloseMessage, 168 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 169 time.Now().Add(time.Second), 170 ) 171 clientConn.Close() 172 }() 173 174 ctx, cancel := context.WithCancel(r.Context()) 175 defer cancel() 176 177 user := p.oauth.GetUser(r) 178 f, err := p.repoResolver.Resolve(r) 179 if err != nil { 180 l.Error("failed to get repo and knot", "err", err) 181 http.Error(w, "bad repo/knot", http.StatusBadRequest) 182 return 183 } 184 185 repoInfo := f.RepoInfo(user) 186 187 pipelineId := chi.URLParam(r, "pipeline") 188 workflow := chi.URLParam(r, "workflow") 189 if pipelineId == "" || workflow == "" { 190 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 191 return 192 } 193 194 ps, err := db.GetPipelineStatuses( 195 p.db, 196 db.FilterEq("repo_owner", repoInfo.OwnerDid), 197 db.FilterEq("repo_name", repoInfo.Name), 198 db.FilterEq("knot", repoInfo.Knot), 199 db.FilterEq("id", pipelineId), 200 ) 201 if err != nil || len(ps) != 1 { 202 l.Error("pipeline query failed", "err", err, "count", len(ps)) 203 http.Error(w, "pipeline not found", http.StatusNotFound) 204 return 205 } 206 207 singlePipeline := ps[0] 208 spindle := repoInfo.Spindle 209 knot := repoInfo.Knot 210 rkey := singlePipeline.Rkey 211 212 if spindle == "" || knot == "" || rkey == "" { 213 http.Error(w, "invalid repo info", http.StatusBadRequest) 214 return 215 } 216 217 scheme := "wss" 218 if p.config.Core.Dev { 219 scheme = "ws" 220 } 221 222 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 223 l = l.With("url", url) 224 l.Info("logs endpoint hit") 225 226 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 227 if err != nil { 228 l.Error("websocket dial failed", "err", err) 229 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 230 return 231 } 232 defer spindleConn.Close() 233 234 // create a channel for incoming messages 235 evChan := make(chan logEvent, 100) 236 // start a goroutine to read from spindle 237 go readLogs(spindleConn, evChan) 238 239 stepStartTimes := make(map[int]time.Time) 240 var fragment bytes.Buffer 241 for { 242 select { 243 case <-ctx.Done(): 244 l.Info("client disconnected") 245 return 246 247 case ev, ok := <-evChan: 248 if !ok { 249 continue 250 } 251 252 if ev.err != nil && ev.isCloseError() { 253 l.Debug("graceful shutdown, tail complete", "err", err) 254 return 255 } 256 if ev.err != nil { 257 l.Error("error reading from spindle", "err", err) 258 return 259 } 260 261 var logLine spindlemodel.LogLine 262 if err = json.Unmarshal(ev.msg, &logLine); err != nil { 263 l.Error("failed to parse logline", "err", err) 264 continue 265 } 266 267 fragment.Reset() 268 269 switch logLine.Kind { 270 case spindlemodel.LogKindControl: 271 switch logLine.StepStatus { 272 case spindlemodel.StepStatusStart: 273 stepStartTimes[logLine.StepId] = logLine.Time 274 collapsed := false 275 if logLine.StepKind == spindlemodel.StepKindSystem { 276 collapsed = true 277 } 278 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 279 Id: logLine.StepId, 280 Name: logLine.Content, 281 Command: logLine.StepCommand, 282 Collapsed: collapsed, 283 StartTime: logLine.Time, 284 }) 285 case spindlemodel.StepStatusEnd: 286 startTime := stepStartTimes[logLine.StepId] 287 endTime := logLine.Time 288 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{ 289 Id: logLine.StepId, 290 StartTime: startTime, 291 EndTime: endTime, 292 }) 293 } 294 295 case spindlemodel.LogKindData: 296 // data messages simply insert new log lines into current step 297 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 298 Id: logLine.StepId, 299 Content: logLine.Content, 300 }) 301 } 302 if err != nil { 303 l.Error("failed to render log line", "err", err) 304 return 305 } 306 307 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 308 l.Error("error writing to client", "err", err) 309 return 310 } 311 312 case <-time.After(30 * time.Second): 313 l.Debug("sent keepalive") 314 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 315 l.Error("failed to write control", "err", err) 316 return 317 } 318 } 319 } 320} 321 322// either a message or an error 323type logEvent struct { 324 msg []byte 325 err error 326} 327 328func (ev *logEvent) isCloseError() bool { 329 return websocket.IsCloseError( 330 ev.err, 331 websocket.CloseNormalClosure, 332 websocket.CloseGoingAway, 333 websocket.CloseAbnormalClosure, 334 ) 335} 336 337// read logs from spindle and pass through to chan 338func readLogs(conn *websocket.Conn, ch chan logEvent) { 339 defer close(ch) 340 341 for { 342 if conn == nil { 343 return 344 } 345 346 _, msg, err := conn.ReadMessage() 347 if err != nil { 348 ch <- logEvent{err: err} 349 return 350 } 351 ch <- logEvent{msg: msg} 352 } 353}