forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 "tangled.org/core/appview/config"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/oauth"
15 "tangled.org/core/appview/pages"
16 "tangled.org/core/appview/reporesolver"
17 "tangled.org/core/eventconsumer"
18 "tangled.org/core/idresolver"
19 "tangled.org/core/rbac"
20 spindlemodel "tangled.org/core/spindle/models"
21
22 "github.com/go-chi/chi/v5"
23 "github.com/gorilla/websocket"
24)
25
26type Pipelines struct {
27 repoResolver *reporesolver.RepoResolver
28 idResolver *idresolver.Resolver
29 config *config.Config
30 oauth *oauth.OAuth
31 pages *pages.Pages
32 spindlestream *eventconsumer.Consumer
33 db *db.DB
34 enforcer *rbac.Enforcer
35 logger *slog.Logger
36}
37
38func (p *Pipelines) Router() http.Handler {
39 r := chi.NewRouter()
40 r.Get("/", p.Index)
41 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow)
42 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs)
43
44 return r
45}
46
47func New(
48 oauth *oauth.OAuth,
49 repoResolver *reporesolver.RepoResolver,
50 pages *pages.Pages,
51 spindlestream *eventconsumer.Consumer,
52 idResolver *idresolver.Resolver,
53 db *db.DB,
54 config *config.Config,
55 enforcer *rbac.Enforcer,
56 logger *slog.Logger,
57) *Pipelines {
58 return &Pipelines{
59 oauth: oauth,
60 repoResolver: repoResolver,
61 pages: pages,
62 idResolver: idResolver,
63 config: config,
64 spindlestream: spindlestream,
65 db: db,
66 enforcer: enforcer,
67 logger: logger,
68 }
69}
70
71func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
72 user := p.oauth.GetUser(r)
73 l := p.logger.With("handler", "Index")
74
75 f, err := p.repoResolver.Resolve(r)
76 if err != nil {
77 l.Error("failed to get repo and knot", "err", err)
78 return
79 }
80
81 ps, err := db.GetPipelineStatuses(
82 p.db,
83 30,
84 db.FilterEq("repo_owner", f.Did),
85 db.FilterEq("repo_name", f.Name),
86 db.FilterEq("knot", f.Knot),
87 )
88 if err != nil {
89 l.Error("failed to query db", "err", err)
90 return
91 }
92
93 p.pages.Pipelines(w, pages.PipelinesParams{
94 LoggedInUser: user,
95 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
96 Pipelines: ps,
97 })
98}
99
100func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
101 user := p.oauth.GetUser(r)
102 l := p.logger.With("handler", "Workflow")
103
104 f, err := p.repoResolver.Resolve(r)
105 if err != nil {
106 l.Error("failed to get repo and knot", "err", err)
107 return
108 }
109
110 pipelineId := chi.URLParam(r, "pipeline")
111 if pipelineId == "" {
112 l.Error("empty pipeline ID")
113 return
114 }
115
116 workflow := chi.URLParam(r, "workflow")
117 if workflow == "" {
118 l.Error("empty workflow name")
119 return
120 }
121
122 ps, err := db.GetPipelineStatuses(
123 p.db,
124 1,
125 db.FilterEq("repo_owner", f.Did),
126 db.FilterEq("repo_name", f.Name),
127 db.FilterEq("knot", f.Knot),
128 db.FilterEq("id", pipelineId),
129 )
130 if err != nil {
131 l.Error("failed to query db", "err", err)
132 return
133 }
134
135 if len(ps) != 1 {
136 l.Error("invalid number of pipelines", "len", len(ps))
137 return
138 }
139
140 singlePipeline := ps[0]
141
142 p.pages.Workflow(w, pages.WorkflowParams{
143 LoggedInUser: user,
144 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
145 Pipeline: singlePipeline,
146 Workflow: workflow,
147 })
148}
149
150var upgrader = websocket.Upgrader{
151 ReadBufferSize: 1024,
152 WriteBufferSize: 1024,
153}
154
155func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
156 l := p.logger.With("handler", "logs")
157
158 clientConn, err := upgrader.Upgrade(w, r, nil)
159 if err != nil {
160 l.Error("websocket upgrade failed", "err", err)
161 return
162 }
163 defer func() {
164 _ = clientConn.WriteControl(
165 websocket.CloseMessage,
166 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
167 time.Now().Add(time.Second),
168 )
169 clientConn.Close()
170 }()
171
172 ctx, cancel := context.WithCancel(r.Context())
173 defer cancel()
174
175 f, err := p.repoResolver.Resolve(r)
176 if err != nil {
177 l.Error("failed to get repo and knot", "err", err)
178 http.Error(w, "bad repo/knot", http.StatusBadRequest)
179 return
180 }
181
182 pipelineId := chi.URLParam(r, "pipeline")
183 workflow := chi.URLParam(r, "workflow")
184 if pipelineId == "" || workflow == "" {
185 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
186 return
187 }
188
189 ps, err := db.GetPipelineStatuses(
190 p.db,
191 1,
192 db.FilterEq("repo_owner", f.Did),
193 db.FilterEq("repo_name", f.Name),
194 db.FilterEq("knot", f.Knot),
195 db.FilterEq("id", pipelineId),
196 )
197 if err != nil || len(ps) != 1 {
198 l.Error("pipeline query failed", "err", err, "count", len(ps))
199 http.Error(w, "pipeline not found", http.StatusNotFound)
200 return
201 }
202
203 singlePipeline := ps[0]
204 spindle := f.Spindle
205 knot := f.Knot
206 rkey := singlePipeline.Rkey
207
208 if spindle == "" || knot == "" || rkey == "" {
209 http.Error(w, "invalid repo info", http.StatusBadRequest)
210 return
211 }
212
213 scheme := "wss"
214 if p.config.Core.Dev {
215 scheme = "ws"
216 }
217
218 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
219 l = l.With("url", url)
220 l.Info("logs endpoint hit")
221
222 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
223 if err != nil {
224 l.Error("websocket dial failed", "err", err)
225 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
226 return
227 }
228 defer spindleConn.Close()
229
230 // create a channel for incoming messages
231 evChan := make(chan logEvent, 100)
232 // start a goroutine to read from spindle
233 go readLogs(spindleConn, evChan)
234
235 stepStartTimes := make(map[int]time.Time)
236 var fragment bytes.Buffer
237 for {
238 select {
239 case <-ctx.Done():
240 l.Info("client disconnected")
241 return
242
243 case ev, ok := <-evChan:
244 if !ok {
245 continue
246 }
247
248 if ev.err != nil && ev.isCloseError() {
249 l.Debug("graceful shutdown, tail complete", "err", err)
250 return
251 }
252 if ev.err != nil {
253 l.Error("error reading from spindle", "err", err)
254 return
255 }
256
257 var logLine spindlemodel.LogLine
258 if err = json.Unmarshal(ev.msg, &logLine); err != nil {
259 l.Error("failed to parse logline", "err", err)
260 continue
261 }
262
263 fragment.Reset()
264
265 switch logLine.Kind {
266 case spindlemodel.LogKindControl:
267 switch logLine.StepStatus {
268 case spindlemodel.StepStatusStart:
269 stepStartTimes[logLine.StepId] = logLine.Time
270 collapsed := false
271 if logLine.StepKind == spindlemodel.StepKindSystem {
272 collapsed = true
273 }
274 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
275 Id: logLine.StepId,
276 Name: logLine.Content,
277 Command: logLine.StepCommand,
278 Collapsed: collapsed,
279 StartTime: logLine.Time,
280 })
281 case spindlemodel.StepStatusEnd:
282 startTime := stepStartTimes[logLine.StepId]
283 endTime := logLine.Time
284 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{
285 Id: logLine.StepId,
286 StartTime: startTime,
287 EndTime: endTime,
288 })
289 }
290
291 case spindlemodel.LogKindData:
292 // data messages simply insert new log lines into current step
293 err = p.pages.LogLine(&fragment, pages.LogLineParams{
294 Id: logLine.StepId,
295 Content: logLine.Content,
296 })
297 }
298 if err != nil {
299 l.Error("failed to render log line", "err", err)
300 return
301 }
302
303 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
304 l.Error("error writing to client", "err", err)
305 return
306 }
307
308 case <-time.After(30 * time.Second):
309 l.Debug("sent keepalive")
310 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
311 l.Error("failed to write control", "err", err)
312 return
313 }
314 }
315 }
316}
317
318// either a message or an error
319type logEvent struct {
320 msg []byte
321 err error
322}
323
324func (ev *logEvent) isCloseError() bool {
325 return websocket.IsCloseError(
326 ev.err,
327 websocket.CloseNormalClosure,
328 websocket.CloseGoingAway,
329 websocket.CloseAbnormalClosure,
330 )
331}
332
333// read logs from spindle and pass through to chan
334func readLogs(conn *websocket.Conn, ch chan logEvent) {
335 defer close(ch)
336
337 for {
338 if conn == nil {
339 return
340 }
341
342 _, msg, err := conn.ReadMessage()
343 if err != nil {
344 ch <- logEvent{err: err}
345 return
346 }
347 ch <- logEvent{msg: msg}
348 }
349}