forked from tangled.org/core
this repo has no description
at test-ci 7.9 kB view raw
1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 "tangled.sh/tangled.sh/core/appview/config" 13 "tangled.sh/tangled.sh/core/appview/db" 14 "tangled.sh/tangled.sh/core/appview/idresolver" 15 "tangled.sh/tangled.sh/core/appview/oauth" 16 "tangled.sh/tangled.sh/core/appview/pages" 17 "tangled.sh/tangled.sh/core/appview/reporesolver" 18 "tangled.sh/tangled.sh/core/eventconsumer" 19 "tangled.sh/tangled.sh/core/log" 20 "tangled.sh/tangled.sh/core/rbac" 21 spindlemodel "tangled.sh/tangled.sh/core/spindle/models" 22 23 "github.com/go-chi/chi/v5" 24 "github.com/gorilla/websocket" 25 "github.com/posthog/posthog-go" 26) 27 28type Pipelines struct { 29 repoResolver *reporesolver.RepoResolver 30 idResolver *idresolver.Resolver 31 config *config.Config 32 oauth *oauth.OAuth 33 pages *pages.Pages 34 spindlestream *eventconsumer.Consumer 35 db *db.DB 36 enforcer *rbac.Enforcer 37 posthog posthog.Client 38 logger *slog.Logger 39} 40 41func New( 42 oauth *oauth.OAuth, 43 repoResolver *reporesolver.RepoResolver, 44 pages *pages.Pages, 45 spindlestream *eventconsumer.Consumer, 46 idResolver *idresolver.Resolver, 47 db *db.DB, 48 config *config.Config, 49 posthog posthog.Client, 50 enforcer *rbac.Enforcer, 51) *Pipelines { 52 logger := log.New("pipelines") 53 54 return &Pipelines{oauth: oauth, 55 repoResolver: repoResolver, 56 pages: pages, 57 idResolver: idResolver, 58 config: config, 59 spindlestream: spindlestream, 60 db: db, 61 posthog: posthog, 62 enforcer: enforcer, 63 logger: logger, 64 } 65} 66 67func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 68 user := p.oauth.GetUser(r) 69 l := p.logger.With("handler", "Index") 70 71 f, err := p.repoResolver.Resolve(r) 72 if err != nil { 73 l.Error("failed to get repo and knot", "err", err) 74 return 75 } 76 77 repoInfo := f.RepoInfo(user) 78 79 ps, err := db.GetPipelineStatuses( 80 p.db, 81 db.FilterEq("repo_owner", repoInfo.OwnerDid), 82 db.FilterEq("repo_name", repoInfo.Name), 83 db.FilterEq("knot", repoInfo.Knot), 84 ) 85 if err != nil { 86 l.Error("failed to query db", "err", err) 87 return 88 } 89 90 p.pages.Pipelines(w, pages.PipelinesParams{ 91 LoggedInUser: user, 92 RepoInfo: repoInfo, 93 Pipelines: ps, 94 }) 95} 96 97func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 98 user := p.oauth.GetUser(r) 99 l := p.logger.With("handler", "Workflow") 100 101 f, err := p.repoResolver.Resolve(r) 102 if err != nil { 103 l.Error("failed to get repo and knot", "err", err) 104 return 105 } 106 107 repoInfo := f.RepoInfo(user) 108 109 pipelineId := chi.URLParam(r, "pipeline") 110 if pipelineId == "" { 111 l.Error("empty pipeline ID") 112 return 113 } 114 115 workflow := chi.URLParam(r, "workflow") 116 if workflow == "" { 117 l.Error("empty workflow name") 118 return 119 } 120 121 ps, err := db.GetPipelineStatuses( 122 p.db, 123 db.FilterEq("repo_owner", repoInfo.OwnerDid), 124 db.FilterEq("repo_name", repoInfo.Name), 125 db.FilterEq("knot", repoInfo.Knot), 126 db.FilterEq("id", pipelineId), 127 ) 128 if err != nil { 129 l.Error("failed to query db", "err", err) 130 return 131 } 132 133 if len(ps) != 1 { 134 l.Error("invalid number of pipelines", "len", len(ps)) 135 return 136 } 137 138 singlePipeline := ps[0] 139 140 p.pages.Workflow(w, pages.WorkflowParams{ 141 LoggedInUser: user, 142 RepoInfo: repoInfo, 143 Pipeline: singlePipeline, 144 Workflow: workflow, 145 }) 146} 147 148var upgrader = websocket.Upgrader{ 149 ReadBufferSize: 1024, 150 WriteBufferSize: 1024, 151} 152 153func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 154 l := p.logger.With("handler", "logs") 155 156 clientConn, err := upgrader.Upgrade(w, r, nil) 157 if err != nil { 158 l.Error("websocket upgrade failed", "err", err) 159 return 160 } 161 defer func() { 162 _ = clientConn.WriteControl( 163 websocket.CloseMessage, 164 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 165 time.Now().Add(time.Second), 166 ) 167 clientConn.Close() 168 }() 169 170 ctx, cancel := context.WithCancel(r.Context()) 171 defer cancel() 172 173 user := p.oauth.GetUser(r) 174 f, err := p.repoResolver.Resolve(r) 175 if err != nil { 176 l.Error("failed to get repo and knot", "err", err) 177 http.Error(w, "bad repo/knot", http.StatusBadRequest) 178 return 179 } 180 181 repoInfo := f.RepoInfo(user) 182 183 pipelineId := chi.URLParam(r, "pipeline") 184 workflow := chi.URLParam(r, "workflow") 185 if pipelineId == "" || workflow == "" { 186 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 187 return 188 } 189 190 ps, err := db.GetPipelineStatuses( 191 p.db, 192 db.FilterEq("repo_owner", repoInfo.OwnerDid), 193 db.FilterEq("repo_name", repoInfo.Name), 194 db.FilterEq("knot", repoInfo.Knot), 195 db.FilterEq("id", pipelineId), 196 ) 197 if err != nil || len(ps) != 1 { 198 l.Error("pipeline query failed", "err", err, "count", len(ps)) 199 http.Error(w, "pipeline not found", http.StatusNotFound) 200 return 201 } 202 203 singlePipeline := ps[0] 204 spindle := repoInfo.Spindle 205 knot := repoInfo.Knot 206 rkey := singlePipeline.Rkey 207 208 if spindle == "" || knot == "" || rkey == "" { 209 http.Error(w, "invalid repo info", http.StatusBadRequest) 210 return 211 } 212 213 scheme := "wss" 214 if p.config.Core.Dev { 215 scheme = "ws" 216 } 217 218 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 219 l = l.With("url", url) 220 l.Info("logs endpoint hit") 221 222 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 223 if err != nil { 224 l.Error("websocket dial failed", "err", err) 225 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 226 return 227 } 228 defer spindleConn.Close() 229 230 // create a channel for incoming messages 231 evChan := make(chan logEvent, 100) 232 // start a goroutine to read from spindle 233 go readLogs(spindleConn, evChan) 234 235 stepIdx := 0 236 var fragment bytes.Buffer 237 for { 238 select { 239 case <-ctx.Done(): 240 l.Info("client disconnected") 241 return 242 243 case ev, ok := <-evChan: 244 if !ok { 245 continue 246 } 247 248 if ev.err != nil && ev.isCloseError() { 249 l.Debug("graceful shutdown, tail complete", "err", err) 250 return 251 } 252 if ev.err != nil { 253 l.Error("error reading from spindle", "err", err) 254 return 255 } 256 257 var logLine spindlemodel.LogLine 258 if err = json.Unmarshal(ev.msg, &logLine); err != nil { 259 l.Error("failed to parse logline", "err", err) 260 continue 261 } 262 263 fragment.Reset() 264 265 switch logLine.Kind { 266 case spindlemodel.LogKindControl: 267 // control messages create a new step block 268 stepIdx++ 269 collapsed := false 270 if logLine.StepKind == spindlemodel.StepKindSystem { 271 collapsed = true 272 } 273 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 274 Id: stepIdx, 275 Name: logLine.Content, 276 Command: logLine.StepCommand, 277 Collapsed: collapsed, 278 }) 279 case spindlemodel.LogKindData: 280 // data messages simply insert new log lines into current step 281 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 282 Id: stepIdx, 283 Content: logLine.Content, 284 }) 285 } 286 if err != nil { 287 l.Error("failed to render log line", "err", err) 288 return 289 } 290 291 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 292 l.Error("error writing to client", "err", err) 293 return 294 } 295 296 case <-time.After(30 * time.Second): 297 l.Debug("sent keepalive") 298 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 299 l.Error("failed to write control", "err", err) 300 return 301 } 302 } 303 } 304} 305 306// either a message or an error 307type logEvent struct { 308 msg []byte 309 err error 310} 311 312func (ev *logEvent) isCloseError() bool { 313 return websocket.IsCloseError( 314 ev.err, 315 websocket.CloseNormalClosure, 316 websocket.CloseGoingAway, 317 websocket.CloseAbnormalClosure, 318 ) 319} 320 321// read logs from spindle and pass through to chan 322func readLogs(conn *websocket.Conn, ch chan logEvent) { 323 defer close(ch) 324 325 for { 326 if conn == nil { 327 return 328 } 329 330 _, msg, err := conn.ReadMessage() 331 if err != nil { 332 ch <- logEvent{err: err} 333 return 334 } 335 ch <- logEvent{msg: msg} 336 } 337}