1package pipelines
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "html"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.sh/tangled.sh/core/appview/config"
14 "tangled.sh/tangled.sh/core/appview/db"
15 "tangled.sh/tangled.sh/core/appview/idresolver"
16 "tangled.sh/tangled.sh/core/appview/oauth"
17 "tangled.sh/tangled.sh/core/appview/pages"
18 "tangled.sh/tangled.sh/core/appview/reporesolver"
19 "tangled.sh/tangled.sh/core/eventconsumer"
20 "tangled.sh/tangled.sh/core/log"
21 "tangled.sh/tangled.sh/core/rbac"
22 spindlemodel "tangled.sh/tangled.sh/core/spindle/models"
23
24 "github.com/go-chi/chi/v5"
25 "github.com/gorilla/websocket"
26 "github.com/posthog/posthog-go"
27)
28
29type Pipelines struct {
30 repoResolver *reporesolver.RepoResolver
31 idResolver *idresolver.Resolver
32 config *config.Config
33 oauth *oauth.OAuth
34 pages *pages.Pages
35 spindlestream *eventconsumer.Consumer
36 db *db.DB
37 enforcer *rbac.Enforcer
38 posthog posthog.Client
39 logger *slog.Logger
40}
41
42func New(
43 oauth *oauth.OAuth,
44 repoResolver *reporesolver.RepoResolver,
45 pages *pages.Pages,
46 spindlestream *eventconsumer.Consumer,
47 idResolver *idresolver.Resolver,
48 db *db.DB,
49 config *config.Config,
50 posthog posthog.Client,
51 enforcer *rbac.Enforcer,
52) *Pipelines {
53 logger := log.New("pipelines")
54
55 return &Pipelines{oauth: oauth,
56 repoResolver: repoResolver,
57 pages: pages,
58 idResolver: idResolver,
59 config: config,
60 spindlestream: spindlestream,
61 db: db,
62 posthog: posthog,
63 enforcer: enforcer,
64 logger: logger,
65 }
66}
67
68func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
69 user := p.oauth.GetUser(r)
70 l := p.logger.With("handler", "Index")
71
72 f, err := p.repoResolver.Resolve(r)
73 if err != nil {
74 l.Error("failed to get repo and knot", "err", err)
75 return
76 }
77
78 repoInfo := f.RepoInfo(user)
79
80 ps, err := db.GetPipelineStatuses(
81 p.db,
82 db.FilterEq("repo_owner", repoInfo.OwnerDid),
83 db.FilterEq("repo_name", repoInfo.Name),
84 db.FilterEq("knot", repoInfo.Knot),
85 )
86 if err != nil {
87 l.Error("failed to query db", "err", err)
88 return
89 }
90
91 p.pages.Pipelines(w, pages.PipelinesParams{
92 LoggedInUser: user,
93 RepoInfo: repoInfo,
94 Pipelines: ps,
95 })
96}
97
98func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
99 user := p.oauth.GetUser(r)
100 l := p.logger.With("handler", "Workflow")
101
102 f, err := p.repoResolver.Resolve(r)
103 if err != nil {
104 l.Error("failed to get repo and knot", "err", err)
105 return
106 }
107
108 repoInfo := f.RepoInfo(user)
109
110 pipelineId := chi.URLParam(r, "pipeline")
111 if pipelineId == "" {
112 l.Error("empty pipeline ID")
113 return
114 }
115
116 workflow := chi.URLParam(r, "workflow")
117 if workflow == "" {
118 l.Error("empty workflow name")
119 return
120 }
121
122 ps, err := db.GetPipelineStatuses(
123 p.db,
124 db.FilterEq("repo_owner", repoInfo.OwnerDid),
125 db.FilterEq("repo_name", repoInfo.Name),
126 db.FilterEq("knot", repoInfo.Knot),
127 db.FilterEq("id", pipelineId),
128 )
129 if err != nil {
130 l.Error("failed to query db", "err", err)
131 return
132 }
133
134 if len(ps) != 1 {
135 l.Error("invalid number of pipelines", "len", len(ps))
136 return
137 }
138
139 singlePipeline := ps[0]
140
141 p.pages.Workflow(w, pages.WorkflowParams{
142 LoggedInUser: user,
143 RepoInfo: repoInfo,
144 Pipeline: singlePipeline,
145 Workflow: workflow,
146 })
147}
148
149var upgrader = websocket.Upgrader{
150 ReadBufferSize: 1024,
151 WriteBufferSize: 1024,
152}
153
154func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
155 l := p.logger.With("handler", "logs")
156
157 clientConn, err := upgrader.Upgrade(w, r, nil)
158 if err != nil {
159 l.Error("websocket upgrade failed", "err", err)
160 return
161 }
162 defer func() {
163 _ = clientConn.WriteControl(
164 websocket.CloseMessage,
165 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
166 time.Now().Add(time.Second),
167 )
168 clientConn.Close()
169 }()
170
171 ctx, cancel := context.WithCancel(r.Context())
172 defer cancel()
173 go func() {
174 for {
175 if _, _, err := clientConn.NextReader(); err != nil {
176 l.Error("failed to read", "err", err)
177 cancel()
178 return
179 }
180 }
181 }()
182
183 user := p.oauth.GetUser(r)
184 f, err := p.repoResolver.Resolve(r)
185 if err != nil {
186 l.Error("failed to get repo and knot", "err", err)
187 http.Error(w, "bad repo/knot", http.StatusBadRequest)
188 return
189 }
190
191 repoInfo := f.RepoInfo(user)
192
193 pipelineId := chi.URLParam(r, "pipeline")
194 workflow := chi.URLParam(r, "workflow")
195 if pipelineId == "" || workflow == "" {
196 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
197 return
198 }
199
200 ps, err := db.GetPipelineStatuses(
201 p.db,
202 db.FilterEq("repo_owner", repoInfo.OwnerDid),
203 db.FilterEq("repo_name", repoInfo.Name),
204 db.FilterEq("knot", repoInfo.Knot),
205 db.FilterEq("id", pipelineId),
206 )
207 if err != nil || len(ps) != 1 {
208 l.Error("pipeline query failed", "err", err, "count", len(ps))
209 http.Error(w, "pipeline not found", http.StatusNotFound)
210 return
211 }
212
213 singlePipeline := ps[0]
214 spindle := repoInfo.Spindle
215 knot := repoInfo.Knot
216 rkey := singlePipeline.Rkey
217
218 if spindle == "" || knot == "" || rkey == "" {
219 http.Error(w, "invalid repo info", http.StatusBadRequest)
220 return
221 }
222
223 scheme := "wss"
224 if p.config.Core.Dev {
225 scheme = "ws"
226 }
227
228 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
229 l = l.With("url", url)
230 l.Info("logs endpoint hit")
231
232 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
233 if err != nil {
234 l.Error("websocket dial failed", "err", err)
235 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
236 return
237 }
238 defer spindleConn.Close()
239
240 // create a channel for incoming messages
241 msgChan := make(chan []byte, 10)
242 errChan := make(chan error, 1)
243
244 // start a goroutine to read from spindle
245 go func() {
246 defer close(msgChan)
247 defer close(errChan)
248
249 for {
250 _, msg, err := spindleConn.ReadMessage()
251 if err != nil {
252 if websocket.IsCloseError(err,
253 websocket.CloseNormalClosure,
254 websocket.CloseGoingAway,
255 websocket.CloseAbnormalClosure) {
256 errChan <- nil // signal graceful end
257 } else {
258 errChan <- err
259 }
260 return
261 }
262 msgChan <- msg
263 }
264 }()
265
266 stepIdx := 0
267 for {
268 select {
269 case <-ctx.Done():
270 l.Info("client disconnected")
271 return
272 case err := <-errChan:
273 if err != nil {
274 l.Error("error reading from spindle", "err", err)
275 }
276
277 if err == nil {
278 l.Info("log tail complete")
279 }
280
281 return
282 case msg := <-msgChan:
283 var logLine spindlemodel.LogLine
284 if err = json.Unmarshal(msg, &logLine); err != nil {
285 l.Error("failed to parse logline", "err", err)
286 continue
287 }
288
289 var fragment []byte
290 switch logLine.Kind {
291 case spindlemodel.LogKindControl:
292 // control messages create a new step block
293 stepIdx++
294 fragment = fmt.Appendf(nil, `
295 <div id="lines" hx-swap-oob="beforeend">
296 <details id="step-%d" open>
297 <summary>%s</summary>
298 <div id="step-body-%d"></div>
299 </details>
300 </div>
301 `, stepIdx, logLine.Content, stepIdx)
302 case spindlemodel.LogKindData:
303 // data messages simply insert new log lines into current step
304 escaped := html.EscapeString(logLine.Content)
305 fragment = fmt.Appendf(nil, `
306 <div id="step-body-%d" hx-swap-oob="beforeend">
307 <p>%s</p>
308 </div>
309 `, stepIdx, escaped)
310 }
311
312 if err = clientConn.WriteMessage(websocket.TextMessage, fragment); err != nil {
313 l.Error("error writing to client", "err", err)
314 return
315 }
316 case <-time.After(30 * time.Second):
317 l.Debug("sent keepalive")
318 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
319 l.Error("failed to write control", "err", err)
320 }
321 }
322 }
323}