1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 "tangled.org/core/appview/config"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/oauth"
15 "tangled.org/core/appview/pages"
16 "tangled.org/core/appview/reporesolver"
17 "tangled.org/core/eventconsumer"
18 "tangled.org/core/idresolver"
19 "tangled.org/core/rbac"
20 spindlemodel "tangled.org/core/spindle/models"
21
22 "github.com/go-chi/chi/v5"
23 "github.com/gorilla/websocket"
24)
25
26type Pipelines struct {
27 repoResolver *reporesolver.RepoResolver
28 idResolver *idresolver.Resolver
29 config *config.Config
30 oauth *oauth.OAuth
31 pages *pages.Pages
32 spindlestream *eventconsumer.Consumer
33 db *db.DB
34 enforcer *rbac.Enforcer
35 logger *slog.Logger
36}
37
38func (p *Pipelines) Router() http.Handler {
39 r := chi.NewRouter()
40 r.Get("/", p.Index)
41 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow)
42 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs)
43
44 return r
45}
46
47func New(
48 oauth *oauth.OAuth,
49 repoResolver *reporesolver.RepoResolver,
50 pages *pages.Pages,
51 spindlestream *eventconsumer.Consumer,
52 idResolver *idresolver.Resolver,
53 db *db.DB,
54 config *config.Config,
55 enforcer *rbac.Enforcer,
56 logger *slog.Logger,
57) *Pipelines {
58 return &Pipelines{
59 oauth: oauth,
60 repoResolver: repoResolver,
61 pages: pages,
62 idResolver: idResolver,
63 config: config,
64 spindlestream: spindlestream,
65 db: db,
66 enforcer: enforcer,
67 logger: logger,
68 }
69}
70
71func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
72 user := p.oauth.GetUser(r)
73 l := p.logger.With("handler", "Index")
74
75 f, err := p.repoResolver.Resolve(r)
76 if err != nil {
77 l.Error("failed to get repo and knot", "err", err)
78 return
79 }
80
81 repoInfo := f.RepoInfo(user)
82
83 ps, err := db.GetPipelineStatuses(
84 p.db,
85 30,
86 db.FilterEq("repo_owner", repoInfo.OwnerDid),
87 db.FilterEq("repo_name", repoInfo.Name),
88 db.FilterEq("knot", repoInfo.Knot),
89 )
90 if err != nil {
91 l.Error("failed to query db", "err", err)
92 return
93 }
94
95 p.pages.Pipelines(w, pages.PipelinesParams{
96 LoggedInUser: user,
97 RepoInfo: repoInfo,
98 Pipelines: ps,
99 })
100}
101
102func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
103 user := p.oauth.GetUser(r)
104 l := p.logger.With("handler", "Workflow")
105
106 f, err := p.repoResolver.Resolve(r)
107 if err != nil {
108 l.Error("failed to get repo and knot", "err", err)
109 return
110 }
111
112 repoInfo := f.RepoInfo(user)
113
114 pipelineId := chi.URLParam(r, "pipeline")
115 if pipelineId == "" {
116 l.Error("empty pipeline ID")
117 return
118 }
119
120 workflow := chi.URLParam(r, "workflow")
121 if workflow == "" {
122 l.Error("empty workflow name")
123 return
124 }
125
126 ps, err := db.GetPipelineStatuses(
127 p.db,
128 1,
129 db.FilterEq("repo_owner", repoInfo.OwnerDid),
130 db.FilterEq("repo_name", repoInfo.Name),
131 db.FilterEq("knot", repoInfo.Knot),
132 db.FilterEq("id", pipelineId),
133 )
134 if err != nil {
135 l.Error("failed to query db", "err", err)
136 return
137 }
138
139 if len(ps) != 1 {
140 l.Error("invalid number of pipelines", "len", len(ps))
141 return
142 }
143
144 singlePipeline := ps[0]
145
146 p.pages.Workflow(w, pages.WorkflowParams{
147 LoggedInUser: user,
148 RepoInfo: repoInfo,
149 Pipeline: singlePipeline,
150 Workflow: workflow,
151 })
152}
153
154var upgrader = websocket.Upgrader{
155 ReadBufferSize: 1024,
156 WriteBufferSize: 1024,
157}
158
159func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
160 l := p.logger.With("handler", "logs")
161
162 clientConn, err := upgrader.Upgrade(w, r, nil)
163 if err != nil {
164 l.Error("websocket upgrade failed", "err", err)
165 return
166 }
167 defer func() {
168 _ = clientConn.WriteControl(
169 websocket.CloseMessage,
170 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
171 time.Now().Add(time.Second),
172 )
173 clientConn.Close()
174 }()
175
176 ctx, cancel := context.WithCancel(r.Context())
177 defer cancel()
178
179 user := p.oauth.GetUser(r)
180 f, err := p.repoResolver.Resolve(r)
181 if err != nil {
182 l.Error("failed to get repo and knot", "err", err)
183 http.Error(w, "bad repo/knot", http.StatusBadRequest)
184 return
185 }
186
187 repoInfo := f.RepoInfo(user)
188
189 pipelineId := chi.URLParam(r, "pipeline")
190 workflow := chi.URLParam(r, "workflow")
191 if pipelineId == "" || workflow == "" {
192 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
193 return
194 }
195
196 ps, err := db.GetPipelineStatuses(
197 p.db,
198 1,
199 db.FilterEq("repo_owner", repoInfo.OwnerDid),
200 db.FilterEq("repo_name", repoInfo.Name),
201 db.FilterEq("knot", repoInfo.Knot),
202 db.FilterEq("id", pipelineId),
203 )
204 if err != nil || len(ps) != 1 {
205 l.Error("pipeline query failed", "err", err, "count", len(ps))
206 http.Error(w, "pipeline not found", http.StatusNotFound)
207 return
208 }
209
210 singlePipeline := ps[0]
211 spindle := repoInfo.Spindle
212 knot := repoInfo.Knot
213 rkey := singlePipeline.Rkey
214
215 if spindle == "" || knot == "" || rkey == "" {
216 http.Error(w, "invalid repo info", http.StatusBadRequest)
217 return
218 }
219
220 scheme := "wss"
221 if p.config.Core.Dev {
222 scheme = "ws"
223 }
224
225 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
226 l = l.With("url", url)
227 l.Info("logs endpoint hit")
228
229 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
230 if err != nil {
231 l.Error("websocket dial failed", "err", err)
232 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
233 return
234 }
235 defer spindleConn.Close()
236
237 // create a channel for incoming messages
238 evChan := make(chan logEvent, 100)
239 // start a goroutine to read from spindle
240 go readLogs(spindleConn, evChan)
241
242 stepStartTimes := make(map[int]time.Time)
243 var fragment bytes.Buffer
244 for {
245 select {
246 case <-ctx.Done():
247 l.Info("client disconnected")
248 return
249
250 case ev, ok := <-evChan:
251 if !ok {
252 continue
253 }
254
255 if ev.err != nil && ev.isCloseError() {
256 l.Debug("graceful shutdown, tail complete", "err", err)
257 return
258 }
259 if ev.err != nil {
260 l.Error("error reading from spindle", "err", err)
261 return
262 }
263
264 var logLine spindlemodel.LogLine
265 if err = json.Unmarshal(ev.msg, &logLine); err != nil {
266 l.Error("failed to parse logline", "err", err)
267 continue
268 }
269
270 fragment.Reset()
271
272 switch logLine.Kind {
273 case spindlemodel.LogKindControl:
274 switch logLine.StepStatus {
275 case spindlemodel.StepStatusStart:
276 stepStartTimes[logLine.StepId] = logLine.Time
277 collapsed := false
278 if logLine.StepKind == spindlemodel.StepKindSystem {
279 collapsed = true
280 }
281 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
282 Id: logLine.StepId,
283 Name: logLine.Content,
284 Command: logLine.StepCommand,
285 Collapsed: collapsed,
286 StartTime: logLine.Time,
287 })
288 case spindlemodel.StepStatusEnd:
289 startTime := stepStartTimes[logLine.StepId]
290 endTime := logLine.Time
291 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{
292 Id: logLine.StepId,
293 StartTime: startTime,
294 EndTime: endTime,
295 })
296 }
297
298 case spindlemodel.LogKindData:
299 // data messages simply insert new log lines into current step
300 err = p.pages.LogLine(&fragment, pages.LogLineParams{
301 Id: logLine.StepId,
302 Content: logLine.Content,
303 })
304 }
305 if err != nil {
306 l.Error("failed to render log line", "err", err)
307 return
308 }
309
310 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
311 l.Error("error writing to client", "err", err)
312 return
313 }
314
315 case <-time.After(30 * time.Second):
316 l.Debug("sent keepalive")
317 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
318 l.Error("failed to write control", "err", err)
319 return
320 }
321 }
322 }
323}
324
325// either a message or an error
326type logEvent struct {
327 msg []byte
328 err error
329}
330
331func (ev *logEvent) isCloseError() bool {
332 return websocket.IsCloseError(
333 ev.err,
334 websocket.CloseNormalClosure,
335 websocket.CloseGoingAway,
336 websocket.CloseAbnormalClosure,
337 )
338}
339
340// read logs from spindle and pass through to chan
341func readLogs(conn *websocket.Conn, ch chan logEvent) {
342 defer close(ch)
343
344 for {
345 if conn == nil {
346 return
347 }
348
349 _, msg, err := conn.ReadMessage()
350 if err != nil {
351 ch <- logEvent{err: err}
352 return
353 }
354 ch <- logEvent{msg: msg}
355 }
356}