forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 "tangled.org/core/appview/config" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/oauth" 15 "tangled.org/core/appview/pages" 16 "tangled.org/core/appview/reporesolver" 17 "tangled.org/core/eventconsumer" 18 "tangled.org/core/idresolver" 19 "tangled.org/core/rbac" 20 spindlemodel "tangled.org/core/spindle/models" 21 22 "github.com/go-chi/chi/v5" 23 "github.com/gorilla/websocket" 24) 25 26type Pipelines struct { 27 repoResolver *reporesolver.RepoResolver 28 idResolver *idresolver.Resolver 29 config *config.Config 30 oauth *oauth.OAuth 31 pages *pages.Pages 32 spindlestream *eventconsumer.Consumer 33 db *db.DB 34 enforcer *rbac.Enforcer 35 logger *slog.Logger 36} 37 38func New( 39 oauth *oauth.OAuth, 40 repoResolver *reporesolver.RepoResolver, 41 pages *pages.Pages, 42 spindlestream *eventconsumer.Consumer, 43 idResolver *idresolver.Resolver, 44 db *db.DB, 45 config *config.Config, 46 enforcer *rbac.Enforcer, 47 logger *slog.Logger, 48) *Pipelines { 49 return &Pipelines{ 50 oauth: oauth, 51 repoResolver: repoResolver, 52 pages: pages, 53 idResolver: idResolver, 54 config: config, 55 spindlestream: spindlestream, 56 db: db, 57 enforcer: enforcer, 58 logger: logger, 59 } 60} 61 62func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 63 user := p.oauth.GetUser(r) 64 l := p.logger.With("handler", "Index") 65 66 f, err := p.repoResolver.Resolve(r) 67 if err != nil { 68 l.Error("failed to get repo and knot", "err", err) 69 return 70 } 71 72 repoInfo := f.RepoInfo(user) 73 74 ps, err := db.GetPipelineStatuses( 75 p.db, 76 db.FilterEq("repo_owner", repoInfo.OwnerDid), 77 db.FilterEq("repo_name", repoInfo.Name), 78 db.FilterEq("knot", repoInfo.Knot), 79 ) 80 if err != nil { 81 l.Error("failed to query db", "err", err) 82 return 83 } 84 85 p.pages.Pipelines(w, pages.PipelinesParams{ 86 LoggedInUser: user, 87 RepoInfo: repoInfo, 88 Pipelines: ps, 89 }) 90} 91 92func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 93 user := p.oauth.GetUser(r) 94 l := p.logger.With("handler", "Workflow") 95 96 f, err := p.repoResolver.Resolve(r) 97 if err != nil { 98 l.Error("failed to get repo and knot", "err", err) 99 return 100 } 101 102 repoInfo := f.RepoInfo(user) 103 104 pipelineId := chi.URLParam(r, "pipeline") 105 if pipelineId == "" { 106 l.Error("empty pipeline ID") 107 return 108 } 109 110 workflow := chi.URLParam(r, "workflow") 111 if workflow == "" { 112 l.Error("empty workflow name") 113 return 114 } 115 116 ps, err := db.GetPipelineStatuses( 117 p.db, 118 db.FilterEq("repo_owner", repoInfo.OwnerDid), 119 db.FilterEq("repo_name", repoInfo.Name), 120 db.FilterEq("knot", repoInfo.Knot), 121 db.FilterEq("id", pipelineId), 122 ) 123 if err != nil { 124 l.Error("failed to query db", "err", err) 125 return 126 } 127 128 if len(ps) != 1 { 129 l.Error("invalid number of pipelines", "len", len(ps)) 130 return 131 } 132 133 singlePipeline := ps[0] 134 135 p.pages.Workflow(w, pages.WorkflowParams{ 136 LoggedInUser: user, 137 RepoInfo: repoInfo, 138 Pipeline: singlePipeline, 139 Workflow: workflow, 140 }) 141} 142 143var upgrader = websocket.Upgrader{ 144 ReadBufferSize: 1024, 145 WriteBufferSize: 1024, 146} 147 148func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 149 l := p.logger.With("handler", "logs") 150 151 clientConn, err := upgrader.Upgrade(w, r, nil) 152 if err != nil { 153 l.Error("websocket upgrade failed", "err", err) 154 return 155 } 156 defer func() { 157 _ = clientConn.WriteControl( 158 websocket.CloseMessage, 159 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 160 time.Now().Add(time.Second), 161 ) 162 clientConn.Close() 163 }() 164 165 ctx, cancel := context.WithCancel(r.Context()) 166 defer cancel() 167 168 user := p.oauth.GetUser(r) 169 f, err := p.repoResolver.Resolve(r) 170 if err != nil { 171 l.Error("failed to get repo and knot", "err", err) 172 http.Error(w, "bad repo/knot", http.StatusBadRequest) 173 return 174 } 175 176 repoInfo := f.RepoInfo(user) 177 178 pipelineId := chi.URLParam(r, "pipeline") 179 workflow := chi.URLParam(r, "workflow") 180 if pipelineId == "" || workflow == "" { 181 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 182 return 183 } 184 185 ps, err := db.GetPipelineStatuses( 186 p.db, 187 db.FilterEq("repo_owner", repoInfo.OwnerDid), 188 db.FilterEq("repo_name", repoInfo.Name), 189 db.FilterEq("knot", repoInfo.Knot), 190 db.FilterEq("id", pipelineId), 191 ) 192 if err != nil || len(ps) != 1 { 193 l.Error("pipeline query failed", "err", err, "count", len(ps)) 194 http.Error(w, "pipeline not found", http.StatusNotFound) 195 return 196 } 197 198 singlePipeline := ps[0] 199 spindle := repoInfo.Spindle 200 knot := repoInfo.Knot 201 rkey := singlePipeline.Rkey 202 203 if spindle == "" || knot == "" || rkey == "" { 204 http.Error(w, "invalid repo info", http.StatusBadRequest) 205 return 206 } 207 208 scheme := "wss" 209 if p.config.Core.Dev { 210 scheme = "ws" 211 } 212 213 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 214 l = l.With("url", url) 215 l.Info("logs endpoint hit") 216 217 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 218 if err != nil { 219 l.Error("websocket dial failed", "err", err) 220 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 221 return 222 } 223 defer spindleConn.Close() 224 225 // create a channel for incoming messages 226 evChan := make(chan logEvent, 100) 227 // start a goroutine to read from spindle 228 go readLogs(spindleConn, evChan) 229 230 stepIdx := 0 231 var fragment bytes.Buffer 232 for { 233 select { 234 case <-ctx.Done(): 235 l.Info("client disconnected") 236 return 237 238 case ev, ok := <-evChan: 239 if !ok { 240 continue 241 } 242 243 if ev.err != nil && ev.isCloseError() { 244 l.Debug("graceful shutdown, tail complete", "err", err) 245 return 246 } 247 if ev.err != nil { 248 l.Error("error reading from spindle", "err", err) 249 return 250 } 251 252 var logLine spindlemodel.LogLine 253 if err = json.Unmarshal(ev.msg, &logLine); err != nil { 254 l.Error("failed to parse logline", "err", err) 255 continue 256 } 257 258 fragment.Reset() 259 260 switch logLine.Kind { 261 case spindlemodel.LogKindControl: 262 // control messages create a new step block 263 stepIdx++ 264 collapsed := false 265 if logLine.StepKind == spindlemodel.StepKindSystem { 266 collapsed = true 267 } 268 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 269 Id: stepIdx, 270 Name: logLine.Content, 271 Command: logLine.StepCommand, 272 Collapsed: collapsed, 273 }) 274 case spindlemodel.LogKindData: 275 // data messages simply insert new log lines into current step 276 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 277 Id: stepIdx, 278 Content: logLine.Content, 279 }) 280 } 281 if err != nil { 282 l.Error("failed to render log line", "err", err) 283 return 284 } 285 286 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 287 l.Error("error writing to client", "err", err) 288 return 289 } 290 291 case <-time.After(30 * time.Second): 292 l.Debug("sent keepalive") 293 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 294 l.Error("failed to write control", "err", err) 295 return 296 } 297 } 298 } 299} 300 301// either a message or an error 302type logEvent struct { 303 msg []byte 304 err error 305} 306 307func (ev *logEvent) isCloseError() bool { 308 return websocket.IsCloseError( 309 ev.err, 310 websocket.CloseNormalClosure, 311 websocket.CloseGoingAway, 312 websocket.CloseAbnormalClosure, 313 ) 314} 315 316// read logs from spindle and pass through to chan 317func readLogs(conn *websocket.Conn, ch chan logEvent) { 318 defer close(ch) 319 320 for { 321 if conn == nil { 322 return 323 } 324 325 _, msg, err := conn.ReadMessage() 326 if err != nil { 327 ch <- logEvent{err: err} 328 return 329 } 330 ch <- logEvent{msg: msg} 331 } 332}