1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 "tangled.org/core/appview/config"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/oauth"
15 "tangled.org/core/appview/pages"
16 "tangled.org/core/appview/reporesolver"
17 "tangled.org/core/eventconsumer"
18 "tangled.org/core/idresolver"
19 "tangled.org/core/rbac"
20 spindlemodel "tangled.org/core/spindle/models"
21
22 "github.com/go-chi/chi/v5"
23 "github.com/gorilla/websocket"
24)
25
26type Pipelines struct {
27 repoResolver *reporesolver.RepoResolver
28 idResolver *idresolver.Resolver
29 config *config.Config
30 oauth *oauth.OAuth
31 pages *pages.Pages
32 spindlestream *eventconsumer.Consumer
33 db *db.DB
34 enforcer *rbac.Enforcer
35 logger *slog.Logger
36}
37
38func (p *Pipelines) Router() http.Handler {
39 r := chi.NewRouter()
40 r.Get("/", p.Index)
41 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow)
42 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs)
43
44 return r
45}
46
47func New(
48 oauth *oauth.OAuth,
49 repoResolver *reporesolver.RepoResolver,
50 pages *pages.Pages,
51 spindlestream *eventconsumer.Consumer,
52 idResolver *idresolver.Resolver,
53 db *db.DB,
54 config *config.Config,
55 enforcer *rbac.Enforcer,
56 logger *slog.Logger,
57) *Pipelines {
58 return &Pipelines{
59 oauth: oauth,
60 repoResolver: repoResolver,
61 pages: pages,
62 idResolver: idResolver,
63 config: config,
64 spindlestream: spindlestream,
65 db: db,
66 enforcer: enforcer,
67 logger: logger,
68 }
69}
70
71func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
72 user := p.oauth.GetUser(r)
73 l := p.logger.With("handler", "Index")
74
75 f, err := p.repoResolver.Resolve(r)
76 if err != nil {
77 l.Error("failed to get repo and knot", "err", err)
78 return
79 }
80
81 repoInfo := f.RepoInfo(user)
82
83 ps, err := db.GetPipelineStatuses(
84 p.db,
85 db.FilterEq("repo_owner", repoInfo.OwnerDid),
86 db.FilterEq("repo_name", repoInfo.Name),
87 db.FilterEq("knot", repoInfo.Knot),
88 )
89 if err != nil {
90 l.Error("failed to query db", "err", err)
91 return
92 }
93
94 p.pages.Pipelines(w, pages.PipelinesParams{
95 LoggedInUser: user,
96 RepoInfo: repoInfo,
97 Pipelines: ps,
98 })
99}
100
101func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
102 user := p.oauth.GetUser(r)
103 l := p.logger.With("handler", "Workflow")
104
105 f, err := p.repoResolver.Resolve(r)
106 if err != nil {
107 l.Error("failed to get repo and knot", "err", err)
108 return
109 }
110
111 repoInfo := f.RepoInfo(user)
112
113 pipelineId := chi.URLParam(r, "pipeline")
114 if pipelineId == "" {
115 l.Error("empty pipeline ID")
116 return
117 }
118
119 workflow := chi.URLParam(r, "workflow")
120 if workflow == "" {
121 l.Error("empty workflow name")
122 return
123 }
124
125 ps, err := db.GetPipelineStatuses(
126 p.db,
127 db.FilterEq("repo_owner", repoInfo.OwnerDid),
128 db.FilterEq("repo_name", repoInfo.Name),
129 db.FilterEq("knot", repoInfo.Knot),
130 db.FilterEq("id", pipelineId),
131 )
132 if err != nil {
133 l.Error("failed to query db", "err", err)
134 return
135 }
136
137 if len(ps) != 1 {
138 l.Error("invalid number of pipelines", "len", len(ps))
139 return
140 }
141
142 singlePipeline := ps[0]
143
144 p.pages.Workflow(w, pages.WorkflowParams{
145 LoggedInUser: user,
146 RepoInfo: repoInfo,
147 Pipeline: singlePipeline,
148 Workflow: workflow,
149 })
150}
151
152var upgrader = websocket.Upgrader{
153 ReadBufferSize: 1024,
154 WriteBufferSize: 1024,
155}
156
157func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
158 l := p.logger.With("handler", "logs")
159
160 clientConn, err := upgrader.Upgrade(w, r, nil)
161 if err != nil {
162 l.Error("websocket upgrade failed", "err", err)
163 return
164 }
165 defer func() {
166 _ = clientConn.WriteControl(
167 websocket.CloseMessage,
168 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
169 time.Now().Add(time.Second),
170 )
171 clientConn.Close()
172 }()
173
174 ctx, cancel := context.WithCancel(r.Context())
175 defer cancel()
176
177 user := p.oauth.GetUser(r)
178 f, err := p.repoResolver.Resolve(r)
179 if err != nil {
180 l.Error("failed to get repo and knot", "err", err)
181 http.Error(w, "bad repo/knot", http.StatusBadRequest)
182 return
183 }
184
185 repoInfo := f.RepoInfo(user)
186
187 pipelineId := chi.URLParam(r, "pipeline")
188 workflow := chi.URLParam(r, "workflow")
189 if pipelineId == "" || workflow == "" {
190 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
191 return
192 }
193
194 ps, err := db.GetPipelineStatuses(
195 p.db,
196 db.FilterEq("repo_owner", repoInfo.OwnerDid),
197 db.FilterEq("repo_name", repoInfo.Name),
198 db.FilterEq("knot", repoInfo.Knot),
199 db.FilterEq("id", pipelineId),
200 )
201 if err != nil || len(ps) != 1 {
202 l.Error("pipeline query failed", "err", err, "count", len(ps))
203 http.Error(w, "pipeline not found", http.StatusNotFound)
204 return
205 }
206
207 singlePipeline := ps[0]
208 spindle := repoInfo.Spindle
209 knot := repoInfo.Knot
210 rkey := singlePipeline.Rkey
211
212 if spindle == "" || knot == "" || rkey == "" {
213 http.Error(w, "invalid repo info", http.StatusBadRequest)
214 return
215 }
216
217 scheme := "wss"
218 if p.config.Core.Dev {
219 scheme = "ws"
220 }
221
222 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
223 l = l.With("url", url)
224 l.Info("logs endpoint hit")
225
226 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
227 if err != nil {
228 l.Error("websocket dial failed", "err", err)
229 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
230 return
231 }
232 defer spindleConn.Close()
233
234 // create a channel for incoming messages
235 evChan := make(chan logEvent, 100)
236 // start a goroutine to read from spindle
237 go readLogs(spindleConn, evChan)
238
239 stepStartTimes := make(map[int]time.Time)
240 var fragment bytes.Buffer
241 for {
242 select {
243 case <-ctx.Done():
244 l.Info("client disconnected")
245 return
246
247 case ev, ok := <-evChan:
248 if !ok {
249 continue
250 }
251
252 if ev.err != nil && ev.isCloseError() {
253 l.Debug("graceful shutdown, tail complete", "err", err)
254 return
255 }
256 if ev.err != nil {
257 l.Error("error reading from spindle", "err", err)
258 return
259 }
260
261 var logLine spindlemodel.LogLine
262 if err = json.Unmarshal(ev.msg, &logLine); err != nil {
263 l.Error("failed to parse logline", "err", err)
264 continue
265 }
266
267 fragment.Reset()
268
269 switch logLine.Kind {
270 case spindlemodel.LogKindControl:
271 switch logLine.StepStatus {
272 case spindlemodel.StepStatusStart:
273 stepStartTimes[logLine.StepId] = logLine.Time
274 collapsed := false
275 if logLine.StepKind == spindlemodel.StepKindSystem {
276 collapsed = true
277 }
278 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
279 Id: logLine.StepId,
280 Name: logLine.Content,
281 Command: logLine.StepCommand,
282 Collapsed: collapsed,
283 StartTime: logLine.Time,
284 })
285 case spindlemodel.StepStatusEnd:
286 startTime := stepStartTimes[logLine.StepId]
287 endTime := logLine.Time
288 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{
289 Id: logLine.StepId,
290 StartTime: startTime,
291 EndTime: endTime,
292 })
293 }
294
295 case spindlemodel.LogKindData:
296 // data messages simply insert new log lines into current step
297 err = p.pages.LogLine(&fragment, pages.LogLineParams{
298 Id: logLine.StepId,
299 Content: logLine.Content,
300 })
301 }
302 if err != nil {
303 l.Error("failed to render log line", "err", err)
304 return
305 }
306
307 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
308 l.Error("error writing to client", "err", err)
309 return
310 }
311
312 case <-time.After(30 * time.Second):
313 l.Debug("sent keepalive")
314 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
315 l.Error("failed to write control", "err", err)
316 return
317 }
318 }
319 }
320}
321
322// either a message or an error
323type logEvent struct {
324 msg []byte
325 err error
326}
327
328func (ev *logEvent) isCloseError() bool {
329 return websocket.IsCloseError(
330 ev.err,
331 websocket.CloseNormalClosure,
332 websocket.CloseGoingAway,
333 websocket.CloseAbnormalClosure,
334 )
335}
336
337// read logs from spindle and pass through to chan
338func readLogs(conn *websocket.Conn, ch chan logEvent) {
339 defer close(ch)
340
341 for {
342 if conn == nil {
343 return
344 }
345
346 _, msg, err := conn.ReadMessage()
347 if err != nil {
348 ch <- logEvent{err: err}
349 return
350 }
351 ch <- logEvent{msg: msg}
352 }
353}