forked from tangled.org/core
this repo has no description
1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 "tangled.sh/tangled.sh/core/appview/config" 13 "tangled.sh/tangled.sh/core/appview/db" 14 "tangled.sh/tangled.sh/core/appview/oauth" 15 "tangled.sh/tangled.sh/core/appview/pages" 16 "tangled.sh/tangled.sh/core/appview/reporesolver" 17 "tangled.sh/tangled.sh/core/eventconsumer" 18 "tangled.sh/tangled.sh/core/idresolver" 19 "tangled.sh/tangled.sh/core/log" 20 "tangled.sh/tangled.sh/core/rbac" 21 spindlemodel "tangled.sh/tangled.sh/core/spindle/models" 22 23 "github.com/go-chi/chi/v5" 24 "github.com/gorilla/websocket" 25) 26 27type Pipelines struct { 28 repoResolver *reporesolver.RepoResolver 29 idResolver *idresolver.Resolver 30 config *config.Config 31 oauth *oauth.OAuth 32 pages *pages.Pages 33 spindlestream *eventconsumer.Consumer 34 db *db.DB 35 enforcer *rbac.Enforcer 36 logger *slog.Logger 37} 38 39func New( 40 oauth *oauth.OAuth, 41 repoResolver *reporesolver.RepoResolver, 42 pages *pages.Pages, 43 spindlestream *eventconsumer.Consumer, 44 idResolver *idresolver.Resolver, 45 db *db.DB, 46 config *config.Config, 47 enforcer *rbac.Enforcer, 48) *Pipelines { 49 logger := log.New("pipelines") 50 51 return &Pipelines{oauth: oauth, 52 repoResolver: repoResolver, 53 pages: pages, 54 idResolver: idResolver, 55 config: config, 56 spindlestream: spindlestream, 57 db: db, 58 enforcer: enforcer, 59 logger: logger, 60 } 61} 62 63func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 64 user := p.oauth.GetUser(r) 65 l := p.logger.With("handler", "Index") 66 67 f, err := p.repoResolver.Resolve(r) 68 if err != nil { 69 l.Error("failed to get repo and knot", "err", err) 70 return 71 } 72 73 repoInfo := f.RepoInfo(user) 74 75 ps, err := db.GetPipelineStatuses( 76 p.db, 77 db.FilterEq("repo_owner", repoInfo.OwnerDid), 78 db.FilterEq("repo_name", repoInfo.Name), 79 db.FilterEq("knot", repoInfo.Knot), 80 ) 81 if err != nil { 82 l.Error("failed to query db", "err", err) 83 return 84 } 85 86 p.pages.Pipelines(w, pages.PipelinesParams{ 87 LoggedInUser: user, 88 RepoInfo: repoInfo, 89 Pipelines: ps, 90 }) 91} 92 93func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 94 user := p.oauth.GetUser(r) 95 l := p.logger.With("handler", "Workflow") 96 97 f, err := p.repoResolver.Resolve(r) 98 if err != nil { 99 l.Error("failed to get repo and knot", "err", err) 100 return 101 } 102 103 repoInfo := f.RepoInfo(user) 104 105 pipelineId := chi.URLParam(r, "pipeline") 106 if pipelineId == "" { 107 l.Error("empty pipeline ID") 108 return 109 } 110 111 workflow := chi.URLParam(r, "workflow") 112 if workflow == "" { 113 l.Error("empty workflow name") 114 return 115 } 116 117 ps, err := db.GetPipelineStatuses( 118 p.db, 119 db.FilterEq("repo_owner", repoInfo.OwnerDid), 120 db.FilterEq("repo_name", repoInfo.Name), 121 db.FilterEq("knot", repoInfo.Knot), 122 db.FilterEq("id", pipelineId), 123 ) 124 if err != nil { 125 l.Error("failed to query db", "err", err) 126 return 127 } 128 129 if len(ps) != 1 { 130 l.Error("invalid number of pipelines", "len", len(ps)) 131 return 132 } 133 134 singlePipeline := ps[0] 135 136 p.pages.Workflow(w, pages.WorkflowParams{ 137 LoggedInUser: user, 138 RepoInfo: repoInfo, 139 Pipeline: singlePipeline, 140 Workflow: workflow, 141 }) 142} 143 144var upgrader = websocket.Upgrader{ 145 ReadBufferSize: 1024, 146 WriteBufferSize: 1024, 147} 148 149func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 150 l := p.logger.With("handler", "logs") 151 152 clientConn, err := upgrader.Upgrade(w, r, nil) 153 if err != nil { 154 l.Error("websocket upgrade failed", "err", err) 155 return 156 } 157 defer func() { 158 _ = clientConn.WriteControl( 159 websocket.CloseMessage, 160 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 161 time.Now().Add(time.Second), 162 ) 163 clientConn.Close() 164 }() 165 166 ctx, cancel := context.WithCancel(r.Context()) 167 defer cancel() 168 169 user := p.oauth.GetUser(r) 170 f, err := p.repoResolver.Resolve(r) 171 if err != nil { 172 l.Error("failed to get repo and knot", "err", err) 173 http.Error(w, "bad repo/knot", http.StatusBadRequest) 174 return 175 } 176 177 repoInfo := f.RepoInfo(user) 178 179 pipelineId := chi.URLParam(r, "pipeline") 180 workflow := chi.URLParam(r, "workflow") 181 if pipelineId == "" || workflow == "" { 182 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 183 return 184 } 185 186 ps, err := db.GetPipelineStatuses( 187 p.db, 188 db.FilterEq("repo_owner", repoInfo.OwnerDid), 189 db.FilterEq("repo_name", repoInfo.Name), 190 db.FilterEq("knot", repoInfo.Knot), 191 db.FilterEq("id", pipelineId), 192 ) 193 if err != nil || len(ps) != 1 { 194 l.Error("pipeline query failed", "err", err, "count", len(ps)) 195 http.Error(w, "pipeline not found", http.StatusNotFound) 196 return 197 } 198 199 singlePipeline := ps[0] 200 spindle := repoInfo.Spindle 201 knot := repoInfo.Knot 202 rkey := singlePipeline.Rkey 203 204 if spindle == "" || knot == "" || rkey == "" { 205 http.Error(w, "invalid repo info", http.StatusBadRequest) 206 return 207 } 208 209 scheme := "wss" 210 if p.config.Core.Dev { 211 scheme = "ws" 212 } 213 214 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 215 l = l.With("url", url) 216 l.Info("logs endpoint hit") 217 218 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 219 if err != nil { 220 l.Error("websocket dial failed", "err", err) 221 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 222 return 223 } 224 defer spindleConn.Close() 225 226 // create a channel for incoming messages 227 evChan := make(chan logEvent, 100) 228 // start a goroutine to read from spindle 229 go readLogs(spindleConn, evChan) 230 231 stepIdx := 0 232 var fragment bytes.Buffer 233 for { 234 select { 235 case <-ctx.Done(): 236 l.Info("client disconnected") 237 return 238 239 case ev, ok := <-evChan: 240 if !ok { 241 continue 242 } 243 244 if ev.err != nil && ev.isCloseError() { 245 l.Debug("graceful shutdown, tail complete", "err", err) 246 return 247 } 248 if ev.err != nil { 249 l.Error("error reading from spindle", "err", err) 250 return 251 } 252 253 var logLine spindlemodel.LogLine 254 if err = json.Unmarshal(ev.msg, &logLine); err != nil { 255 l.Error("failed to parse logline", "err", err) 256 continue 257 } 258 259 fragment.Reset() 260 261 switch logLine.Kind { 262 case spindlemodel.LogKindControl: 263 // control messages create a new step block 264 stepIdx++ 265 collapsed := false 266 if logLine.StepKind == spindlemodel.StepKindSystem { 267 collapsed = true 268 } 269 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 270 Id: stepIdx, 271 Name: logLine.Content, 272 Command: logLine.StepCommand, 273 Collapsed: collapsed, 274 }) 275 case spindlemodel.LogKindData: 276 // data messages simply insert new log lines into current step 277 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 278 Id: stepIdx, 279 Content: logLine.Content, 280 }) 281 } 282 if err != nil { 283 l.Error("failed to render log line", "err", err) 284 return 285 } 286 287 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 288 l.Error("error writing to client", "err", err) 289 return 290 } 291 292 case <-time.After(30 * time.Second): 293 l.Debug("sent keepalive") 294 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 295 l.Error("failed to write control", "err", err) 296 return 297 } 298 } 299 } 300} 301 302// either a message or an error 303type logEvent struct { 304 msg []byte 305 err error 306} 307 308func (ev *logEvent) isCloseError() bool { 309 return websocket.IsCloseError( 310 ev.err, 311 websocket.CloseNormalClosure, 312 websocket.CloseGoingAway, 313 websocket.CloseAbnormalClosure, 314 ) 315} 316 317// read logs from spindle and pass through to chan 318func readLogs(conn *websocket.Conn, ch chan logEvent) { 319 defer close(ch) 320 321 for { 322 if conn == nil { 323 return 324 } 325 326 _, msg, err := conn.ReadMessage() 327 if err != nil { 328 ch <- logEvent{err: err} 329 return 330 } 331 ch <- logEvent{msg: msg} 332 } 333}