1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 "tangled.sh/tangled.sh/core/appview/config"
13 "tangled.sh/tangled.sh/core/appview/db"
14 "tangled.sh/tangled.sh/core/appview/oauth"
15 "tangled.sh/tangled.sh/core/appview/pages"
16 "tangled.sh/tangled.sh/core/appview/reporesolver"
17 "tangled.sh/tangled.sh/core/eventconsumer"
18 "tangled.sh/tangled.sh/core/idresolver"
19 "tangled.sh/tangled.sh/core/log"
20 "tangled.sh/tangled.sh/core/rbac"
21 spindlemodel "tangled.sh/tangled.sh/core/spindle/models"
22
23 "github.com/go-chi/chi/v5"
24 "github.com/gorilla/websocket"
25)
26
27type Pipelines struct {
28 repoResolver *reporesolver.RepoResolver
29 idResolver *idresolver.Resolver
30 config *config.Config
31 oauth *oauth.OAuth
32 pages *pages.Pages
33 spindlestream *eventconsumer.Consumer
34 db *db.DB
35 enforcer *rbac.Enforcer
36 logger *slog.Logger
37}
38
39func New(
40 oauth *oauth.OAuth,
41 repoResolver *reporesolver.RepoResolver,
42 pages *pages.Pages,
43 spindlestream *eventconsumer.Consumer,
44 idResolver *idresolver.Resolver,
45 db *db.DB,
46 config *config.Config,
47 enforcer *rbac.Enforcer,
48) *Pipelines {
49 logger := log.New("pipelines")
50
51 return &Pipelines{oauth: oauth,
52 repoResolver: repoResolver,
53 pages: pages,
54 idResolver: idResolver,
55 config: config,
56 spindlestream: spindlestream,
57 db: db,
58 enforcer: enforcer,
59 logger: logger,
60 }
61}
62
63func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
64 user := p.oauth.GetUser(r)
65 l := p.logger.With("handler", "Index")
66
67 f, err := p.repoResolver.Resolve(r)
68 if err != nil {
69 l.Error("failed to get repo and knot", "err", err)
70 return
71 }
72
73 repoInfo := f.RepoInfo(user)
74
75 ps, err := db.GetPipelineStatuses(
76 p.db,
77 db.FilterEq("repo_owner", repoInfo.OwnerDid),
78 db.FilterEq("repo_name", repoInfo.Name),
79 db.FilterEq("knot", repoInfo.Knot),
80 )
81 if err != nil {
82 l.Error("failed to query db", "err", err)
83 return
84 }
85
86 p.pages.Pipelines(w, pages.PipelinesParams{
87 LoggedInUser: user,
88 RepoInfo: repoInfo,
89 Pipelines: ps,
90 })
91}
92
93func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
94 user := p.oauth.GetUser(r)
95 l := p.logger.With("handler", "Workflow")
96
97 f, err := p.repoResolver.Resolve(r)
98 if err != nil {
99 l.Error("failed to get repo and knot", "err", err)
100 return
101 }
102
103 repoInfo := f.RepoInfo(user)
104
105 pipelineId := chi.URLParam(r, "pipeline")
106 if pipelineId == "" {
107 l.Error("empty pipeline ID")
108 return
109 }
110
111 workflow := chi.URLParam(r, "workflow")
112 if workflow == "" {
113 l.Error("empty workflow name")
114 return
115 }
116
117 ps, err := db.GetPipelineStatuses(
118 p.db,
119 db.FilterEq("repo_owner", repoInfo.OwnerDid),
120 db.FilterEq("repo_name", repoInfo.Name),
121 db.FilterEq("knot", repoInfo.Knot),
122 db.FilterEq("id", pipelineId),
123 )
124 if err != nil {
125 l.Error("failed to query db", "err", err)
126 return
127 }
128
129 if len(ps) != 1 {
130 l.Error("invalid number of pipelines", "len", len(ps))
131 return
132 }
133
134 singlePipeline := ps[0]
135
136 p.pages.Workflow(w, pages.WorkflowParams{
137 LoggedInUser: user,
138 RepoInfo: repoInfo,
139 Pipeline: singlePipeline,
140 Workflow: workflow,
141 })
142}
143
144var upgrader = websocket.Upgrader{
145 ReadBufferSize: 1024,
146 WriteBufferSize: 1024,
147}
148
149func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
150 l := p.logger.With("handler", "logs")
151
152 clientConn, err := upgrader.Upgrade(w, r, nil)
153 if err != nil {
154 l.Error("websocket upgrade failed", "err", err)
155 return
156 }
157 defer func() {
158 _ = clientConn.WriteControl(
159 websocket.CloseMessage,
160 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
161 time.Now().Add(time.Second),
162 )
163 clientConn.Close()
164 }()
165
166 ctx, cancel := context.WithCancel(r.Context())
167 defer cancel()
168
169 user := p.oauth.GetUser(r)
170 f, err := p.repoResolver.Resolve(r)
171 if err != nil {
172 l.Error("failed to get repo and knot", "err", err)
173 http.Error(w, "bad repo/knot", http.StatusBadRequest)
174 return
175 }
176
177 repoInfo := f.RepoInfo(user)
178
179 pipelineId := chi.URLParam(r, "pipeline")
180 workflow := chi.URLParam(r, "workflow")
181 if pipelineId == "" || workflow == "" {
182 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
183 return
184 }
185
186 ps, err := db.GetPipelineStatuses(
187 p.db,
188 db.FilterEq("repo_owner", repoInfo.OwnerDid),
189 db.FilterEq("repo_name", repoInfo.Name),
190 db.FilterEq("knot", repoInfo.Knot),
191 db.FilterEq("id", pipelineId),
192 )
193 if err != nil || len(ps) != 1 {
194 l.Error("pipeline query failed", "err", err, "count", len(ps))
195 http.Error(w, "pipeline not found", http.StatusNotFound)
196 return
197 }
198
199 singlePipeline := ps[0]
200 spindle := repoInfo.Spindle
201 knot := repoInfo.Knot
202 rkey := singlePipeline.Rkey
203
204 if spindle == "" || knot == "" || rkey == "" {
205 http.Error(w, "invalid repo info", http.StatusBadRequest)
206 return
207 }
208
209 scheme := "wss"
210 if p.config.Core.Dev {
211 scheme = "ws"
212 }
213
214 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
215 l = l.With("url", url)
216 l.Info("logs endpoint hit")
217
218 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
219 if err != nil {
220 l.Error("websocket dial failed", "err", err)
221 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
222 return
223 }
224 defer spindleConn.Close()
225
226 // create a channel for incoming messages
227 evChan := make(chan logEvent, 100)
228 // start a goroutine to read from spindle
229 go readLogs(spindleConn, evChan)
230
231 stepIdx := 0
232 var fragment bytes.Buffer
233 for {
234 select {
235 case <-ctx.Done():
236 l.Info("client disconnected")
237 return
238
239 case ev, ok := <-evChan:
240 if !ok {
241 continue
242 }
243
244 if ev.err != nil && ev.isCloseError() {
245 l.Debug("graceful shutdown, tail complete", "err", err)
246 return
247 }
248 if ev.err != nil {
249 l.Error("error reading from spindle", "err", err)
250 return
251 }
252
253 var logLine spindlemodel.LogLine
254 if err = json.Unmarshal(ev.msg, &logLine); err != nil {
255 l.Error("failed to parse logline", "err", err)
256 continue
257 }
258
259 fragment.Reset()
260
261 switch logLine.Kind {
262 case spindlemodel.LogKindControl:
263 // control messages create a new step block
264 stepIdx++
265 collapsed := false
266 if logLine.StepKind == spindlemodel.StepKindSystem {
267 collapsed = true
268 }
269 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
270 Id: stepIdx,
271 Name: logLine.Content,
272 Command: logLine.StepCommand,
273 Collapsed: collapsed,
274 })
275 case spindlemodel.LogKindData:
276 // data messages simply insert new log lines into current step
277 err = p.pages.LogLine(&fragment, pages.LogLineParams{
278 Id: stepIdx,
279 Content: logLine.Content,
280 })
281 }
282 if err != nil {
283 l.Error("failed to render log line", "err", err)
284 return
285 }
286
287 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
288 l.Error("error writing to client", "err", err)
289 return
290 }
291
292 case <-time.After(30 * time.Second):
293 l.Debug("sent keepalive")
294 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
295 l.Error("failed to write control", "err", err)
296 return
297 }
298 }
299 }
300}
301
302// either a message or an error
303type logEvent struct {
304 msg []byte
305 err error
306}
307
308func (ev *logEvent) isCloseError() bool {
309 return websocket.IsCloseError(
310 ev.err,
311 websocket.CloseNormalClosure,
312 websocket.CloseGoingAway,
313 websocket.CloseAbnormalClosure,
314 )
315}
316
317// read logs from spindle and pass through to chan
318func readLogs(conn *websocket.Conn, ch chan logEvent) {
319 defer close(ch)
320
321 for {
322 if conn == nil {
323 return
324 }
325
326 _, msg, err := conn.ReadMessage()
327 if err != nil {
328 ch <- logEvent{err: err}
329 return
330 }
331 ch <- logEvent{msg: msg}
332 }
333}