forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 "tangled.org/core/appview/config"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/oauth"
15 "tangled.org/core/appview/pages"
16 "tangled.org/core/appview/reporesolver"
17 "tangled.org/core/eventconsumer"
18 "tangled.org/core/idresolver"
19 "tangled.org/core/orm"
20 "tangled.org/core/rbac"
21 spindlemodel "tangled.org/core/spindle/models"
22
23 "github.com/go-chi/chi/v5"
24 "github.com/gorilla/websocket"
25)
26
27type Pipelines struct {
28 repoResolver *reporesolver.RepoResolver
29 idResolver *idresolver.Resolver
30 config *config.Config
31 oauth *oauth.OAuth
32 pages *pages.Pages
33 spindlestream *eventconsumer.Consumer
34 db *db.DB
35 enforcer *rbac.Enforcer
36 logger *slog.Logger
37}
38
39func (p *Pipelines) Router() http.Handler {
40 r := chi.NewRouter()
41 r.Get("/", p.Index)
42 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow)
43 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs)
44
45 return r
46}
47
48func New(
49 oauth *oauth.OAuth,
50 repoResolver *reporesolver.RepoResolver,
51 pages *pages.Pages,
52 spindlestream *eventconsumer.Consumer,
53 idResolver *idresolver.Resolver,
54 db *db.DB,
55 config *config.Config,
56 enforcer *rbac.Enforcer,
57 logger *slog.Logger,
58) *Pipelines {
59 return &Pipelines{
60 oauth: oauth,
61 repoResolver: repoResolver,
62 pages: pages,
63 idResolver: idResolver,
64 config: config,
65 spindlestream: spindlestream,
66 db: db,
67 enforcer: enforcer,
68 logger: logger,
69 }
70}
71
72func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
73 user := p.oauth.GetUser(r)
74 l := p.logger.With("handler", "Index")
75
76 f, err := p.repoResolver.Resolve(r)
77 if err != nil {
78 l.Error("failed to get repo and knot", "err", err)
79 return
80 }
81
82 ps, err := db.GetPipelineStatuses(
83 p.db,
84 30,
85 orm.FilterEq("repo_owner", f.Did),
86 orm.FilterEq("repo_name", f.Name),
87 orm.FilterEq("knot", f.Knot),
88 )
89 if err != nil {
90 l.Error("failed to query db", "err", err)
91 return
92 }
93
94 p.pages.Pipelines(w, pages.PipelinesParams{
95 LoggedInUser: user,
96 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
97 Pipelines: ps,
98 })
99}
100
101func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
102 user := p.oauth.GetUser(r)
103 l := p.logger.With("handler", "Workflow")
104
105 f, err := p.repoResolver.Resolve(r)
106 if err != nil {
107 l.Error("failed to get repo and knot", "err", err)
108 return
109 }
110
111 pipelineId := chi.URLParam(r, "pipeline")
112 if pipelineId == "" {
113 l.Error("empty pipeline ID")
114 return
115 }
116
117 workflow := chi.URLParam(r, "workflow")
118 if workflow == "" {
119 l.Error("empty workflow name")
120 return
121 }
122
123 ps, err := db.GetPipelineStatuses(
124 p.db,
125 1,
126 orm.FilterEq("repo_owner", f.Did),
127 orm.FilterEq("repo_name", f.Name),
128 orm.FilterEq("knot", f.Knot),
129 orm.FilterEq("id", pipelineId),
130 )
131 if err != nil {
132 l.Error("failed to query db", "err", err)
133 return
134 }
135
136 if len(ps) != 1 {
137 l.Error("invalid number of pipelines", "len", len(ps))
138 return
139 }
140
141 singlePipeline := ps[0]
142
143 p.pages.Workflow(w, pages.WorkflowParams{
144 LoggedInUser: user,
145 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
146 Pipeline: singlePipeline,
147 Workflow: workflow,
148 })
149}
150
151var upgrader = websocket.Upgrader{
152 ReadBufferSize: 1024,
153 WriteBufferSize: 1024,
154}
155
156func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
157 l := p.logger.With("handler", "logs")
158
159 clientConn, err := upgrader.Upgrade(w, r, nil)
160 if err != nil {
161 l.Error("websocket upgrade failed", "err", err)
162 return
163 }
164 defer func() {
165 _ = clientConn.WriteControl(
166 websocket.CloseMessage,
167 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
168 time.Now().Add(time.Second),
169 )
170 clientConn.Close()
171 }()
172
173 ctx, cancel := context.WithCancel(r.Context())
174 defer cancel()
175
176 f, err := p.repoResolver.Resolve(r)
177 if err != nil {
178 l.Error("failed to get repo and knot", "err", err)
179 http.Error(w, "bad repo/knot", http.StatusBadRequest)
180 return
181 }
182
183 pipelineId := chi.URLParam(r, "pipeline")
184 workflow := chi.URLParam(r, "workflow")
185 if pipelineId == "" || workflow == "" {
186 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
187 return
188 }
189
190 ps, err := db.GetPipelineStatuses(
191 p.db,
192 1,
193 orm.FilterEq("repo_owner", f.Did),
194 orm.FilterEq("repo_name", f.Name),
195 orm.FilterEq("knot", f.Knot),
196 orm.FilterEq("id", pipelineId),
197 )
198 if err != nil || len(ps) != 1 {
199 l.Error("pipeline query failed", "err", err, "count", len(ps))
200 http.Error(w, "pipeline not found", http.StatusNotFound)
201 return
202 }
203
204 singlePipeline := ps[0]
205 spindle := f.Spindle
206 knot := f.Knot
207 rkey := singlePipeline.Rkey
208
209 if spindle == "" || knot == "" || rkey == "" {
210 http.Error(w, "invalid repo info", http.StatusBadRequest)
211 return
212 }
213
214 scheme := "wss"
215 if p.config.Core.Dev {
216 scheme = "ws"
217 }
218
219 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
220 l = l.With("url", url)
221 l.Info("logs endpoint hit")
222
223 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
224 if err != nil {
225 l.Error("websocket dial failed", "err", err)
226 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
227 return
228 }
229 defer spindleConn.Close()
230
231 // create a channel for incoming messages
232 evChan := make(chan logEvent, 100)
233 // start a goroutine to read from spindle
234 go readLogs(spindleConn, evChan)
235
236 stepStartTimes := make(map[int]time.Time)
237 var fragment bytes.Buffer
238 for {
239 select {
240 case <-ctx.Done():
241 l.Info("client disconnected")
242 return
243
244 case ev, ok := <-evChan:
245 if !ok {
246 continue
247 }
248
249 if ev.err != nil && ev.isCloseError() {
250 l.Debug("graceful shutdown, tail complete", "err", err)
251 return
252 }
253 if ev.err != nil {
254 l.Error("error reading from spindle", "err", err)
255 return
256 }
257
258 var logLine spindlemodel.LogLine
259 if err = json.Unmarshal(ev.msg, &logLine); err != nil {
260 l.Error("failed to parse logline", "err", err)
261 continue
262 }
263
264 fragment.Reset()
265
266 switch logLine.Kind {
267 case spindlemodel.LogKindControl:
268 switch logLine.StepStatus {
269 case spindlemodel.StepStatusStart:
270 stepStartTimes[logLine.StepId] = logLine.Time
271 collapsed := false
272 if logLine.StepKind == spindlemodel.StepKindSystem {
273 collapsed = true
274 }
275 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
276 Id: logLine.StepId,
277 Name: logLine.Content,
278 Command: logLine.StepCommand,
279 Collapsed: collapsed,
280 StartTime: logLine.Time,
281 })
282 case spindlemodel.StepStatusEnd:
283 startTime := stepStartTimes[logLine.StepId]
284 endTime := logLine.Time
285 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{
286 Id: logLine.StepId,
287 StartTime: startTime,
288 EndTime: endTime,
289 })
290 }
291
292 case spindlemodel.LogKindData:
293 // data messages simply insert new log lines into current step
294 err = p.pages.LogLine(&fragment, pages.LogLineParams{
295 Id: logLine.StepId,
296 Content: logLine.Content,
297 })
298 }
299 if err != nil {
300 l.Error("failed to render log line", "err", err)
301 return
302 }
303
304 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
305 l.Error("error writing to client", "err", err)
306 return
307 }
308
309 case <-time.After(30 * time.Second):
310 l.Debug("sent keepalive")
311 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
312 l.Error("failed to write control", "err", err)
313 return
314 }
315 }
316 }
317}
318
319// either a message or an error
320type logEvent struct {
321 msg []byte
322 err error
323}
324
325func (ev *logEvent) isCloseError() bool {
326 return websocket.IsCloseError(
327 ev.err,
328 websocket.CloseNormalClosure,
329 websocket.CloseGoingAway,
330 websocket.CloseAbnormalClosure,
331 )
332}
333
334// read logs from spindle and pass through to chan
335func readLogs(conn *websocket.Conn, ch chan logEvent) {
336 defer close(ch)
337
338 for {
339 if conn == nil {
340 return
341 }
342
343 _, msg, err := conn.ReadMessage()
344 if err != nil {
345 ch <- logEvent{err: err}
346 return
347 }
348 ch <- logEvent{msg: msg}
349 }
350}