forked from tangled.org/core
this repo has no description
1package pipelines 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 "tangled.sh/tangled.sh/core/appview/config" 13 "tangled.sh/tangled.sh/core/appview/db" 14 "tangled.sh/tangled.sh/core/appview/idresolver" 15 "tangled.sh/tangled.sh/core/appview/oauth" 16 "tangled.sh/tangled.sh/core/appview/pages" 17 "tangled.sh/tangled.sh/core/appview/reporesolver" 18 "tangled.sh/tangled.sh/core/eventconsumer" 19 "tangled.sh/tangled.sh/core/log" 20 "tangled.sh/tangled.sh/core/rbac" 21 spindlemodel "tangled.sh/tangled.sh/core/spindle/models" 22 23 "github.com/go-chi/chi/v5" 24 "github.com/gorilla/websocket" 25 "github.com/posthog/posthog-go" 26) 27 28type Pipelines struct { 29 repoResolver *reporesolver.RepoResolver 30 idResolver *idresolver.Resolver 31 config *config.Config 32 oauth *oauth.OAuth 33 pages *pages.Pages 34 spindlestream *eventconsumer.Consumer 35 db *db.DB 36 enforcer *rbac.Enforcer 37 posthog posthog.Client 38 logger *slog.Logger 39} 40 41func New( 42 oauth *oauth.OAuth, 43 repoResolver *reporesolver.RepoResolver, 44 pages *pages.Pages, 45 spindlestream *eventconsumer.Consumer, 46 idResolver *idresolver.Resolver, 47 db *db.DB, 48 config *config.Config, 49 posthog posthog.Client, 50 enforcer *rbac.Enforcer, 51) *Pipelines { 52 logger := log.New("pipelines") 53 54 return &Pipelines{oauth: oauth, 55 repoResolver: repoResolver, 56 pages: pages, 57 idResolver: idResolver, 58 config: config, 59 spindlestream: spindlestream, 60 db: db, 61 posthog: posthog, 62 enforcer: enforcer, 63 logger: logger, 64 } 65} 66 67func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 68 user := p.oauth.GetUser(r) 69 l := p.logger.With("handler", "Index") 70 71 f, err := p.repoResolver.Resolve(r) 72 if err != nil { 73 l.Error("failed to get repo and knot", "err", err) 74 return 75 } 76 77 repoInfo := f.RepoInfo(user) 78 79 ps, err := db.GetPipelineStatuses( 80 p.db, 81 db.FilterEq("repo_owner", repoInfo.OwnerDid), 82 db.FilterEq("repo_name", repoInfo.Name), 83 db.FilterEq("knot", repoInfo.Knot), 84 ) 85 if err != nil { 86 l.Error("failed to query db", "err", err) 87 return 88 } 89 90 p.pages.Pipelines(w, pages.PipelinesParams{ 91 LoggedInUser: user, 92 RepoInfo: repoInfo, 93 Pipelines: ps, 94 }) 95} 96 97func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 98 user := p.oauth.GetUser(r) 99 l := p.logger.With("handler", "Workflow") 100 101 f, err := p.repoResolver.Resolve(r) 102 if err != nil { 103 l.Error("failed to get repo and knot", "err", err) 104 return 105 } 106 107 repoInfo := f.RepoInfo(user) 108 109 pipelineId := chi.URLParam(r, "pipeline") 110 if pipelineId == "" { 111 l.Error("empty pipeline ID") 112 return 113 } 114 115 workflow := chi.URLParam(r, "workflow") 116 if workflow == "" { 117 l.Error("empty workflow name") 118 return 119 } 120 121 ps, err := db.GetPipelineStatuses( 122 p.db, 123 db.FilterEq("repo_owner", repoInfo.OwnerDid), 124 db.FilterEq("repo_name", repoInfo.Name), 125 db.FilterEq("knot", repoInfo.Knot), 126 db.FilterEq("id", pipelineId), 127 ) 128 if err != nil { 129 l.Error("failed to query db", "err", err) 130 return 131 } 132 133 if len(ps) != 1 { 134 l.Error("invalid number of pipelines", "len", len(ps)) 135 return 136 } 137 138 singlePipeline := ps[0] 139 140 p.pages.Workflow(w, pages.WorkflowParams{ 141 LoggedInUser: user, 142 RepoInfo: repoInfo, 143 Pipeline: singlePipeline, 144 Workflow: workflow, 145 }) 146} 147 148var upgrader = websocket.Upgrader{ 149 ReadBufferSize: 1024, 150 WriteBufferSize: 1024, 151} 152 153func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 154 l := p.logger.With("handler", "logs") 155 156 clientConn, err := upgrader.Upgrade(w, r, nil) 157 if err != nil { 158 l.Error("websocket upgrade failed", "err", err) 159 return 160 } 161 defer func() { 162 _ = clientConn.WriteControl( 163 websocket.CloseMessage, 164 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 165 time.Now().Add(time.Second), 166 ) 167 clientConn.Close() 168 }() 169 170 ctx, cancel := context.WithCancel(r.Context()) 171 defer cancel() 172 go func() { 173 for { 174 if _, _, err := clientConn.NextReader(); err != nil { 175 l.Error("failed to read", "err", err) 176 cancel() 177 return 178 } 179 } 180 }() 181 182 user := p.oauth.GetUser(r) 183 f, err := p.repoResolver.Resolve(r) 184 if err != nil { 185 l.Error("failed to get repo and knot", "err", err) 186 http.Error(w, "bad repo/knot", http.StatusBadRequest) 187 return 188 } 189 190 repoInfo := f.RepoInfo(user) 191 192 pipelineId := chi.URLParam(r, "pipeline") 193 workflow := chi.URLParam(r, "workflow") 194 if pipelineId == "" || workflow == "" { 195 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 196 return 197 } 198 199 ps, err := db.GetPipelineStatuses( 200 p.db, 201 db.FilterEq("repo_owner", repoInfo.OwnerDid), 202 db.FilterEq("repo_name", repoInfo.Name), 203 db.FilterEq("knot", repoInfo.Knot), 204 db.FilterEq("id", pipelineId), 205 ) 206 if err != nil || len(ps) != 1 { 207 l.Error("pipeline query failed", "err", err, "count", len(ps)) 208 http.Error(w, "pipeline not found", http.StatusNotFound) 209 return 210 } 211 212 singlePipeline := ps[0] 213 spindle := repoInfo.Spindle 214 knot := repoInfo.Knot 215 rkey := singlePipeline.Rkey 216 217 if spindle == "" || knot == "" || rkey == "" { 218 http.Error(w, "invalid repo info", http.StatusBadRequest) 219 return 220 } 221 222 scheme := "wss" 223 if p.config.Core.Dev { 224 scheme = "ws" 225 } 226 227 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 228 l = l.With("url", url) 229 l.Info("logs endpoint hit") 230 231 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 232 if err != nil { 233 l.Error("websocket dial failed", "err", err) 234 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 235 return 236 } 237 defer spindleConn.Close() 238 239 // create a channel for incoming messages 240 msgChan := make(chan []byte, 10) 241 errChan := make(chan error, 1) 242 243 // start a goroutine to read from spindle 244 go func() { 245 defer close(msgChan) 246 defer close(errChan) 247 248 for { 249 _, msg, err := spindleConn.ReadMessage() 250 if err != nil { 251 if websocket.IsCloseError(err, 252 websocket.CloseNormalClosure, 253 websocket.CloseGoingAway, 254 websocket.CloseAbnormalClosure) { 255 errChan <- nil // signal graceful end 256 } else { 257 errChan <- err 258 } 259 return 260 } 261 msgChan <- msg 262 } 263 }() 264 265 for { 266 select { 267 case <-ctx.Done(): 268 l.Info("client disconnected") 269 return 270 case err := <-errChan: 271 if err != nil { 272 l.Error("error reading from spindle", "err", err) 273 } 274 275 if err == nil { 276 l.Info("log tail complete") 277 } 278 279 return 280 case msg := <-msgChan: 281 var logLine spindlemodel.LogLine 282 if err = json.Unmarshal(msg, &logLine); err != nil { 283 l.Error("failed to parse logline", "err", err) 284 continue 285 } 286 287 html := fmt.Appendf(nil, ` 288 <div id="lines" hx-swap-oob="beforeend"> 289 <p>%s: %s</p> 290 </div> 291 `, logLine.Stream, logLine.Data) 292 293 if err = clientConn.WriteMessage(websocket.TextMessage, html); err != nil { 294 l.Error("error writing to client", "err", err) 295 return 296 } 297 case <-time.After(30 * time.Second): 298 l.Debug("sent keepalive") 299 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 300 l.Error("failed to write control", "err", err) 301 } 302 } 303 } 304}