forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package pipelines 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "html" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.sh/tangled.sh/core/appview/config" 14 "tangled.sh/tangled.sh/core/appview/db" 15 "tangled.sh/tangled.sh/core/appview/idresolver" 16 "tangled.sh/tangled.sh/core/appview/oauth" 17 "tangled.sh/tangled.sh/core/appview/pages" 18 "tangled.sh/tangled.sh/core/appview/reporesolver" 19 "tangled.sh/tangled.sh/core/eventconsumer" 20 "tangled.sh/tangled.sh/core/log" 21 "tangled.sh/tangled.sh/core/rbac" 22 spindlemodel "tangled.sh/tangled.sh/core/spindle/models" 23 24 "github.com/go-chi/chi/v5" 25 "github.com/gorilla/websocket" 26 "github.com/posthog/posthog-go" 27) 28 29type Pipelines struct { 30 repoResolver *reporesolver.RepoResolver 31 idResolver *idresolver.Resolver 32 config *config.Config 33 oauth *oauth.OAuth 34 pages *pages.Pages 35 spindlestream *eventconsumer.Consumer 36 db *db.DB 37 enforcer *rbac.Enforcer 38 posthog posthog.Client 39 logger *slog.Logger 40} 41 42func New( 43 oauth *oauth.OAuth, 44 repoResolver *reporesolver.RepoResolver, 45 pages *pages.Pages, 46 spindlestream *eventconsumer.Consumer, 47 idResolver *idresolver.Resolver, 48 db *db.DB, 49 config *config.Config, 50 posthog posthog.Client, 51 enforcer *rbac.Enforcer, 52) *Pipelines { 53 logger := log.New("pipelines") 54 55 return &Pipelines{oauth: oauth, 56 repoResolver: repoResolver, 57 pages: pages, 58 idResolver: idResolver, 59 config: config, 60 spindlestream: spindlestream, 61 db: db, 62 posthog: posthog, 63 enforcer: enforcer, 64 logger: logger, 65 } 66} 67 68func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 69 user := p.oauth.GetUser(r) 70 l := p.logger.With("handler", "Index") 71 72 f, err := p.repoResolver.Resolve(r) 73 if err != nil { 74 l.Error("failed to get repo and knot", "err", err) 75 return 76 } 77 78 repoInfo := f.RepoInfo(user) 79 80 ps, err := db.GetPipelineStatuses( 81 p.db, 82 db.FilterEq("repo_owner", repoInfo.OwnerDid), 83 db.FilterEq("repo_name", repoInfo.Name), 84 db.FilterEq("knot", repoInfo.Knot), 85 ) 86 if err != nil { 87 l.Error("failed to query db", "err", err) 88 return 89 } 90 91 p.pages.Pipelines(w, pages.PipelinesParams{ 92 LoggedInUser: user, 93 RepoInfo: repoInfo, 94 Pipelines: ps, 95 }) 96} 97 98func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 99 user := p.oauth.GetUser(r) 100 l := p.logger.With("handler", "Workflow") 101 102 f, err := p.repoResolver.Resolve(r) 103 if err != nil { 104 l.Error("failed to get repo and knot", "err", err) 105 return 106 } 107 108 repoInfo := f.RepoInfo(user) 109 110 pipelineId := chi.URLParam(r, "pipeline") 111 if pipelineId == "" { 112 l.Error("empty pipeline ID") 113 return 114 } 115 116 workflow := chi.URLParam(r, "workflow") 117 if workflow == "" { 118 l.Error("empty workflow name") 119 return 120 } 121 122 ps, err := db.GetPipelineStatuses( 123 p.db, 124 db.FilterEq("repo_owner", repoInfo.OwnerDid), 125 db.FilterEq("repo_name", repoInfo.Name), 126 db.FilterEq("knot", repoInfo.Knot), 127 db.FilterEq("id", pipelineId), 128 ) 129 if err != nil { 130 l.Error("failed to query db", "err", err) 131 return 132 } 133 134 if len(ps) != 1 { 135 l.Error("invalid number of pipelines", "len", len(ps)) 136 return 137 } 138 139 singlePipeline := ps[0] 140 141 p.pages.Workflow(w, pages.WorkflowParams{ 142 LoggedInUser: user, 143 RepoInfo: repoInfo, 144 Pipeline: singlePipeline, 145 Workflow: workflow, 146 }) 147} 148 149var upgrader = websocket.Upgrader{ 150 ReadBufferSize: 1024, 151 WriteBufferSize: 1024, 152} 153 154func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 155 l := p.logger.With("handler", "logs") 156 157 clientConn, err := upgrader.Upgrade(w, r, nil) 158 if err != nil { 159 l.Error("websocket upgrade failed", "err", err) 160 return 161 } 162 defer func() { 163 _ = clientConn.WriteControl( 164 websocket.CloseMessage, 165 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 166 time.Now().Add(time.Second), 167 ) 168 clientConn.Close() 169 }() 170 171 ctx, cancel := context.WithCancel(r.Context()) 172 defer cancel() 173 go func() { 174 for { 175 if _, _, err := clientConn.NextReader(); err != nil { 176 l.Error("failed to read", "err", err) 177 cancel() 178 return 179 } 180 } 181 }() 182 183 user := p.oauth.GetUser(r) 184 f, err := p.repoResolver.Resolve(r) 185 if err != nil { 186 l.Error("failed to get repo and knot", "err", err) 187 http.Error(w, "bad repo/knot", http.StatusBadRequest) 188 return 189 } 190 191 repoInfo := f.RepoInfo(user) 192 193 pipelineId := chi.URLParam(r, "pipeline") 194 workflow := chi.URLParam(r, "workflow") 195 if pipelineId == "" || workflow == "" { 196 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 197 return 198 } 199 200 ps, err := db.GetPipelineStatuses( 201 p.db, 202 db.FilterEq("repo_owner", repoInfo.OwnerDid), 203 db.FilterEq("repo_name", repoInfo.Name), 204 db.FilterEq("knot", repoInfo.Knot), 205 db.FilterEq("id", pipelineId), 206 ) 207 if err != nil || len(ps) != 1 { 208 l.Error("pipeline query failed", "err", err, "count", len(ps)) 209 http.Error(w, "pipeline not found", http.StatusNotFound) 210 return 211 } 212 213 singlePipeline := ps[0] 214 spindle := repoInfo.Spindle 215 knot := repoInfo.Knot 216 rkey := singlePipeline.Rkey 217 218 if spindle == "" || knot == "" || rkey == "" { 219 http.Error(w, "invalid repo info", http.StatusBadRequest) 220 return 221 } 222 223 scheme := "wss" 224 if p.config.Core.Dev { 225 scheme = "ws" 226 } 227 228 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 229 l = l.With("url", url) 230 l.Info("logs endpoint hit") 231 232 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 233 if err != nil { 234 l.Error("websocket dial failed", "err", err) 235 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 236 return 237 } 238 defer spindleConn.Close() 239 240 // create a channel for incoming messages 241 msgChan := make(chan []byte, 10) 242 errChan := make(chan error, 1) 243 244 // start a goroutine to read from spindle 245 go func() { 246 defer close(msgChan) 247 defer close(errChan) 248 249 for { 250 _, msg, err := spindleConn.ReadMessage() 251 if err != nil { 252 if websocket.IsCloseError(err, 253 websocket.CloseNormalClosure, 254 websocket.CloseGoingAway, 255 websocket.CloseAbnormalClosure) { 256 errChan <- nil // signal graceful end 257 } else { 258 errChan <- err 259 } 260 return 261 } 262 msgChan <- msg 263 } 264 }() 265 266 stepIdx := 0 267 for { 268 select { 269 case <-ctx.Done(): 270 l.Info("client disconnected") 271 return 272 case err := <-errChan: 273 if err != nil { 274 l.Error("error reading from spindle", "err", err) 275 } 276 277 if err == nil { 278 l.Info("log tail complete") 279 } 280 281 return 282 case msg := <-msgChan: 283 var logLine spindlemodel.LogLine 284 if err = json.Unmarshal(msg, &logLine); err != nil { 285 l.Error("failed to parse logline", "err", err) 286 continue 287 } 288 289 var fragment []byte 290 switch logLine.Kind { 291 case spindlemodel.LogKindControl: 292 // control messages create a new step block 293 stepIdx++ 294 fragment = fmt.Appendf(nil, ` 295 <div id="lines" hx-swap-oob="beforeend"> 296 <details id="step-%d" open> 297 <summary>%s</summary> 298 <div id="step-body-%d"></div> 299 </details> 300 </div> 301 `, stepIdx, logLine.Content, stepIdx) 302 case spindlemodel.LogKindData: 303 // data messages simply insert new log lines into current step 304 escaped := html.EscapeString(logLine.Content) 305 fragment = fmt.Appendf(nil, ` 306 <div id="step-body-%d" hx-swap-oob="beforeend"> 307 <p>%s</p> 308 </div> 309 `, stepIdx, escaped) 310 } 311 312 if err = clientConn.WriteMessage(websocket.TextMessage, fragment); err != nil { 313 l.Error("error writing to client", "err", err) 314 return 315 } 316 case <-time.After(30 * time.Second): 317 l.Debug("sent keepalive") 318 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 319 l.Error("failed to write control", "err", err) 320 } 321 } 322 } 323}