forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 "tangled.org/core/appview/config" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/oauth" 15 "tangled.org/core/appview/pages" 16 "tangled.org/core/appview/reporesolver" 17 "tangled.org/core/eventconsumer" 18 "tangled.org/core/idresolver" 19 "tangled.org/core/log" 20 "tangled.org/core/rbac" 21 spindlemodel "tangled.org/core/spindle/models" 22 23 "github.com/go-chi/chi/v5" 24 "github.com/gorilla/websocket" 25) 26 27type Pipelines struct { 28 repoResolver *reporesolver.RepoResolver 29 idResolver *idresolver.Resolver 30 config *config.Config 31 oauth *oauth.OAuth 32 pages *pages.Pages 33 spindlestream *eventconsumer.Consumer 34 db *db.DB 35 enforcer *rbac.Enforcer 36 logger *slog.Logger 37} 38 39func New( 40 oauth *oauth.OAuth, 41 repoResolver *reporesolver.RepoResolver, 42 pages *pages.Pages, 43 spindlestream *eventconsumer.Consumer, 44 idResolver *idresolver.Resolver, 45 db *db.DB, 46 config *config.Config, 47 enforcer *rbac.Enforcer, 48) *Pipelines { 49 logger := log.New("pipelines") 50 51 return &Pipelines{ 52 oauth: oauth, 53 repoResolver: repoResolver, 54 pages: pages, 55 idResolver: idResolver, 56 config: config, 57 spindlestream: spindlestream, 58 db: db, 59 enforcer: enforcer, 60 logger: logger, 61 } 62} 63 64func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 65 user := p.oauth.GetUser(r) 66 l := p.logger.With("handler", "Index") 67 68 f, err := p.repoResolver.Resolve(r) 69 if err != nil { 70 l.Error("failed to get repo and knot", "err", err) 71 return 72 } 73 74 repoInfo := f.RepoInfo(user) 75 76 ps, err := db.GetPipelineStatuses( 77 p.db, 78 db.FilterEq("repo_owner", repoInfo.OwnerDid), 79 db.FilterEq("repo_name", repoInfo.Name), 80 db.FilterEq("knot", repoInfo.Knot), 81 ) 82 if err != nil { 83 l.Error("failed to query db", "err", err) 84 return 85 } 86 87 p.pages.Pipelines(w, pages.PipelinesParams{ 88 LoggedInUser: user, 89 RepoInfo: repoInfo, 90 Pipelines: ps, 91 }) 92} 93 94func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 95 user := p.oauth.GetUser(r) 96 l := p.logger.With("handler", "Workflow") 97 98 f, err := p.repoResolver.Resolve(r) 99 if err != nil { 100 l.Error("failed to get repo and knot", "err", err) 101 return 102 } 103 104 repoInfo := f.RepoInfo(user) 105 106 pipelineId := chi.URLParam(r, "pipeline") 107 if pipelineId == "" { 108 l.Error("empty pipeline ID") 109 return 110 } 111 112 workflow := chi.URLParam(r, "workflow") 113 if workflow == "" { 114 l.Error("empty workflow name") 115 return 116 } 117 118 ps, err := db.GetPipelineStatuses( 119 p.db, 120 db.FilterEq("repo_owner", repoInfo.OwnerDid), 121 db.FilterEq("repo_name", repoInfo.Name), 122 db.FilterEq("knot", repoInfo.Knot), 123 db.FilterEq("id", pipelineId), 124 ) 125 if err != nil { 126 l.Error("failed to query db", "err", err) 127 return 128 } 129 130 if len(ps) != 1 { 131 l.Error("invalid number of pipelines", "len", len(ps)) 132 return 133 } 134 135 singlePipeline := ps[0] 136 137 p.pages.Workflow(w, pages.WorkflowParams{ 138 LoggedInUser: user, 139 RepoInfo: repoInfo, 140 Pipeline: singlePipeline, 141 Workflow: workflow, 142 }) 143} 144 145var upgrader = websocket.Upgrader{ 146 ReadBufferSize: 1024, 147 WriteBufferSize: 1024, 148} 149 150func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 151 l := p.logger.With("handler", "logs") 152 153 clientConn, err := upgrader.Upgrade(w, r, nil) 154 if err != nil { 155 l.Error("websocket upgrade failed", "err", err) 156 return 157 } 158 defer func() { 159 _ = clientConn.WriteControl( 160 websocket.CloseMessage, 161 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 162 time.Now().Add(time.Second), 163 ) 164 clientConn.Close() 165 }() 166 167 ctx, cancel := context.WithCancel(r.Context()) 168 defer cancel() 169 170 user := p.oauth.GetUser(r) 171 f, err := p.repoResolver.Resolve(r) 172 if err != nil { 173 l.Error("failed to get repo and knot", "err", err) 174 http.Error(w, "bad repo/knot", http.StatusBadRequest) 175 return 176 } 177 178 repoInfo := f.RepoInfo(user) 179 180 pipelineId := chi.URLParam(r, "pipeline") 181 workflow := chi.URLParam(r, "workflow") 182 if pipelineId == "" || workflow == "" { 183 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 184 return 185 } 186 187 ps, err := db.GetPipelineStatuses( 188 p.db, 189 db.FilterEq("repo_owner", repoInfo.OwnerDid), 190 db.FilterEq("repo_name", repoInfo.Name), 191 db.FilterEq("knot", repoInfo.Knot), 192 db.FilterEq("id", pipelineId), 193 ) 194 if err != nil || len(ps) != 1 { 195 l.Error("pipeline query failed", "err", err, "count", len(ps)) 196 http.Error(w, "pipeline not found", http.StatusNotFound) 197 return 198 } 199 200 singlePipeline := ps[0] 201 spindle := repoInfo.Spindle 202 knot := repoInfo.Knot 203 rkey := singlePipeline.Rkey 204 205 if spindle == "" || knot == "" || rkey == "" { 206 http.Error(w, "invalid repo info", http.StatusBadRequest) 207 return 208 } 209 210 scheme := "wss" 211 if p.config.Core.Dev { 212 scheme = "ws" 213 } 214 215 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 216 l = l.With("url", url) 217 l.Info("logs endpoint hit") 218 219 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 220 if err != nil { 221 l.Error("websocket dial failed", "err", err) 222 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 223 return 224 } 225 defer spindleConn.Close() 226 227 // create a channel for incoming messages 228 evChan := make(chan logEvent, 100) 229 // start a goroutine to read from spindle 230 go readLogs(spindleConn, evChan) 231 232 stepIdx := 0 233 var fragment bytes.Buffer 234 for { 235 select { 236 case <-ctx.Done(): 237 l.Info("client disconnected") 238 return 239 240 case ev, ok := <-evChan: 241 if !ok { 242 continue 243 } 244 245 if ev.err != nil && ev.isCloseError() { 246 l.Debug("graceful shutdown, tail complete", "err", err) 247 return 248 } 249 if ev.err != nil { 250 l.Error("error reading from spindle", "err", err) 251 return 252 } 253 254 var logLine spindlemodel.LogLine 255 if err = json.Unmarshal(ev.msg, &logLine); err != nil { 256 l.Error("failed to parse logline", "err", err) 257 continue 258 } 259 260 fragment.Reset() 261 262 switch logLine.Kind { 263 case spindlemodel.LogKindControl: 264 // control messages create a new step block 265 stepIdx++ 266 collapsed := false 267 if logLine.StepKind == spindlemodel.StepKindSystem { 268 collapsed = true 269 } 270 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 271 Id: stepIdx, 272 Name: logLine.Content, 273 Command: logLine.StepCommand, 274 Collapsed: collapsed, 275 }) 276 case spindlemodel.LogKindData: 277 // data messages simply insert new log lines into current step 278 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 279 Id: stepIdx, 280 Content: logLine.Content, 281 }) 282 } 283 if err != nil { 284 l.Error("failed to render log line", "err", err) 285 return 286 } 287 288 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 289 l.Error("error writing to client", "err", err) 290 return 291 } 292 293 case <-time.After(30 * time.Second): 294 l.Debug("sent keepalive") 295 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 296 l.Error("failed to write control", "err", err) 297 return 298 } 299 } 300 } 301} 302 303// either a message or an error 304type logEvent struct { 305 msg []byte 306 err error 307} 308 309func (ev *logEvent) isCloseError() bool { 310 return websocket.IsCloseError( 311 ev.err, 312 websocket.CloseNormalClosure, 313 websocket.CloseGoingAway, 314 websocket.CloseAbnormalClosure, 315 ) 316} 317 318// read logs from spindle and pass through to chan 319func readLogs(conn *websocket.Conn, ch chan logEvent) { 320 defer close(ch) 321 322 for { 323 if conn == nil { 324 return 325 } 326 327 _, msg, err := conn.ReadMessage() 328 if err != nil { 329 ch <- logEvent{err: err} 330 return 331 } 332 ch <- logEvent{msg: msg} 333 } 334}