forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 "tangled.org/core/appview/config" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/oauth" 15 "tangled.org/core/appview/pages" 16 "tangled.org/core/appview/reporesolver" 17 "tangled.org/core/eventconsumer" 18 "tangled.org/core/idresolver" 19 "tangled.org/core/rbac" 20 spindlemodel "tangled.org/core/spindle/models" 21 22 "github.com/go-chi/chi/v5" 23 "github.com/gorilla/websocket" 24) 25 26type Pipelines struct { 27 repoResolver *reporesolver.RepoResolver 28 idResolver *idresolver.Resolver 29 config *config.Config 30 oauth *oauth.OAuth 31 pages *pages.Pages 32 spindlestream *eventconsumer.Consumer 33 db *db.DB 34 enforcer *rbac.Enforcer 35 logger *slog.Logger 36} 37 38func (p *Pipelines) Router() http.Handler { 39 r := chi.NewRouter() 40 r.Get("/", p.Index) 41 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 42 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 43 44 return r 45} 46 47func New( 48 oauth *oauth.OAuth, 49 repoResolver *reporesolver.RepoResolver, 50 pages *pages.Pages, 51 spindlestream *eventconsumer.Consumer, 52 idResolver *idresolver.Resolver, 53 db *db.DB, 54 config *config.Config, 55 enforcer *rbac.Enforcer, 56 logger *slog.Logger, 57) *Pipelines { 58 return &Pipelines{ 59 oauth: oauth, 60 repoResolver: repoResolver, 61 pages: pages, 62 idResolver: idResolver, 63 config: config, 64 spindlestream: spindlestream, 65 db: db, 66 enforcer: enforcer, 67 logger: logger, 68 } 69} 70 71func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 72 user := p.oauth.GetUser(r) 73 l := p.logger.With("handler", "Index") 74 75 f, err := p.repoResolver.Resolve(r) 76 if err != nil { 77 l.Error("failed to get repo and knot", "err", err) 78 return 79 } 80 81 repoInfo := f.RepoInfo(user) 82 83 ps, err := db.GetPipelineStatuses( 84 p.db, 85 30, 86 db.FilterEq("repo_owner", repoInfo.OwnerDid), 87 db.FilterEq("repo_name", repoInfo.Name), 88 db.FilterEq("knot", repoInfo.Knot), 89 ) 90 if err != nil { 91 l.Error("failed to query db", "err", err) 92 return 93 } 94 95 p.pages.Pipelines(w, pages.PipelinesParams{ 96 LoggedInUser: user, 97 RepoInfo: repoInfo, 98 Pipelines: ps, 99 }) 100} 101 102func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 103 user := p.oauth.GetUser(r) 104 l := p.logger.With("handler", "Workflow") 105 106 f, err := p.repoResolver.Resolve(r) 107 if err != nil { 108 l.Error("failed to get repo and knot", "err", err) 109 return 110 } 111 112 repoInfo := f.RepoInfo(user) 113 114 pipelineId := chi.URLParam(r, "pipeline") 115 if pipelineId == "" { 116 l.Error("empty pipeline ID") 117 return 118 } 119 120 workflow := chi.URLParam(r, "workflow") 121 if workflow == "" { 122 l.Error("empty workflow name") 123 return 124 } 125 126 ps, err := db.GetPipelineStatuses( 127 p.db, 128 1, 129 db.FilterEq("repo_owner", repoInfo.OwnerDid), 130 db.FilterEq("repo_name", repoInfo.Name), 131 db.FilterEq("knot", repoInfo.Knot), 132 db.FilterEq("id", pipelineId), 133 ) 134 if err != nil { 135 l.Error("failed to query db", "err", err) 136 return 137 } 138 139 if len(ps) != 1 { 140 l.Error("invalid number of pipelines", "len", len(ps)) 141 return 142 } 143 144 singlePipeline := ps[0] 145 146 p.pages.Workflow(w, pages.WorkflowParams{ 147 LoggedInUser: user, 148 RepoInfo: repoInfo, 149 Pipeline: singlePipeline, 150 Workflow: workflow, 151 }) 152} 153 154var upgrader = websocket.Upgrader{ 155 ReadBufferSize: 1024, 156 WriteBufferSize: 1024, 157} 158 159func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 160 l := p.logger.With("handler", "logs") 161 162 clientConn, err := upgrader.Upgrade(w, r, nil) 163 if err != nil { 164 l.Error("websocket upgrade failed", "err", err) 165 return 166 } 167 defer func() { 168 _ = clientConn.WriteControl( 169 websocket.CloseMessage, 170 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 171 time.Now().Add(time.Second), 172 ) 173 clientConn.Close() 174 }() 175 176 ctx, cancel := context.WithCancel(r.Context()) 177 defer cancel() 178 179 user := p.oauth.GetUser(r) 180 f, err := p.repoResolver.Resolve(r) 181 if err != nil { 182 l.Error("failed to get repo and knot", "err", err) 183 http.Error(w, "bad repo/knot", http.StatusBadRequest) 184 return 185 } 186 187 repoInfo := f.RepoInfo(user) 188 189 pipelineId := chi.URLParam(r, "pipeline") 190 workflow := chi.URLParam(r, "workflow") 191 if pipelineId == "" || workflow == "" { 192 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 193 return 194 } 195 196 ps, err := db.GetPipelineStatuses( 197 p.db, 198 1, 199 db.FilterEq("repo_owner", repoInfo.OwnerDid), 200 db.FilterEq("repo_name", repoInfo.Name), 201 db.FilterEq("knot", repoInfo.Knot), 202 db.FilterEq("id", pipelineId), 203 ) 204 if err != nil || len(ps) != 1 { 205 l.Error("pipeline query failed", "err", err, "count", len(ps)) 206 http.Error(w, "pipeline not found", http.StatusNotFound) 207 return 208 } 209 210 singlePipeline := ps[0] 211 spindle := repoInfo.Spindle 212 knot := repoInfo.Knot 213 rkey := singlePipeline.Rkey 214 215 if spindle == "" || knot == "" || rkey == "" { 216 http.Error(w, "invalid repo info", http.StatusBadRequest) 217 return 218 } 219 220 scheme := "wss" 221 if p.config.Core.Dev { 222 scheme = "ws" 223 } 224 225 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 226 l = l.With("url", url) 227 l.Info("logs endpoint hit") 228 229 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 230 if err != nil { 231 l.Error("websocket dial failed", "err", err) 232 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 233 return 234 } 235 defer spindleConn.Close() 236 237 // create a channel for incoming messages 238 evChan := make(chan logEvent, 100) 239 // start a goroutine to read from spindle 240 go readLogs(spindleConn, evChan) 241 242 stepStartTimes := make(map[int]time.Time) 243 var fragment bytes.Buffer 244 for { 245 select { 246 case <-ctx.Done(): 247 l.Info("client disconnected") 248 return 249 250 case ev, ok := <-evChan: 251 if !ok { 252 continue 253 } 254 255 if ev.err != nil && ev.isCloseError() { 256 l.Debug("graceful shutdown, tail complete", "err", err) 257 return 258 } 259 if ev.err != nil { 260 l.Error("error reading from spindle", "err", err) 261 return 262 } 263 264 var logLine spindlemodel.LogLine 265 if err = json.Unmarshal(ev.msg, &logLine); err != nil { 266 l.Error("failed to parse logline", "err", err) 267 continue 268 } 269 270 fragment.Reset() 271 272 switch logLine.Kind { 273 case spindlemodel.LogKindControl: 274 switch logLine.StepStatus { 275 case spindlemodel.StepStatusStart: 276 stepStartTimes[logLine.StepId] = logLine.Time 277 collapsed := false 278 if logLine.StepKind == spindlemodel.StepKindSystem { 279 collapsed = true 280 } 281 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 282 Id: logLine.StepId, 283 Name: logLine.Content, 284 Command: logLine.StepCommand, 285 Collapsed: collapsed, 286 StartTime: logLine.Time, 287 }) 288 case spindlemodel.StepStatusEnd: 289 startTime := stepStartTimes[logLine.StepId] 290 endTime := logLine.Time 291 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{ 292 Id: logLine.StepId, 293 StartTime: startTime, 294 EndTime: endTime, 295 }) 296 } 297 298 case spindlemodel.LogKindData: 299 // data messages simply insert new log lines into current step 300 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 301 Id: logLine.StepId, 302 Content: logLine.Content, 303 }) 304 } 305 if err != nil { 306 l.Error("failed to render log line", "err", err) 307 return 308 } 309 310 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 311 l.Error("error writing to client", "err", err) 312 return 313 } 314 315 case <-time.After(30 * time.Second): 316 l.Debug("sent keepalive") 317 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 318 l.Error("failed to write control", "err", err) 319 return 320 } 321 } 322 } 323} 324 325// either a message or an error 326type logEvent struct { 327 msg []byte 328 err error 329} 330 331func (ev *logEvent) isCloseError() bool { 332 return websocket.IsCloseError( 333 ev.err, 334 websocket.CloseNormalClosure, 335 websocket.CloseGoingAway, 336 websocket.CloseAbnormalClosure, 337 ) 338} 339 340// read logs from spindle and pass through to chan 341func readLogs(conn *websocket.Conn, ch chan logEvent) { 342 defer close(ch) 343 344 for { 345 if conn == nil { 346 return 347 } 348 349 _, msg, err := conn.ReadMessage() 350 if err != nil { 351 ch <- logEvent{err: err} 352 return 353 } 354 ch <- logEvent{msg: msg} 355 } 356}