1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 "tangled.sh/tangled.sh/core/appview/config"
13 "tangled.sh/tangled.sh/core/appview/db"
14 "tangled.sh/tangled.sh/core/appview/idresolver"
15 "tangled.sh/tangled.sh/core/appview/oauth"
16 "tangled.sh/tangled.sh/core/appview/pages"
17 "tangled.sh/tangled.sh/core/appview/reporesolver"
18 "tangled.sh/tangled.sh/core/eventconsumer"
19 "tangled.sh/tangled.sh/core/log"
20 "tangled.sh/tangled.sh/core/rbac"
21 spindlemodel "tangled.sh/tangled.sh/core/spindle/models"
22
23 "github.com/go-chi/chi/v5"
24 "github.com/gorilla/websocket"
25 "github.com/posthog/posthog-go"
26)
27
28type Pipelines struct {
29 repoResolver *reporesolver.RepoResolver
30 idResolver *idresolver.Resolver
31 config *config.Config
32 oauth *oauth.OAuth
33 pages *pages.Pages
34 spindlestream *eventconsumer.Consumer
35 db *db.DB
36 enforcer *rbac.Enforcer
37 posthog posthog.Client
38 logger *slog.Logger
39}
40
41func New(
42 oauth *oauth.OAuth,
43 repoResolver *reporesolver.RepoResolver,
44 pages *pages.Pages,
45 spindlestream *eventconsumer.Consumer,
46 idResolver *idresolver.Resolver,
47 db *db.DB,
48 config *config.Config,
49 posthog posthog.Client,
50 enforcer *rbac.Enforcer,
51) *Pipelines {
52 logger := log.New("pipelines")
53
54 return &Pipelines{oauth: oauth,
55 repoResolver: repoResolver,
56 pages: pages,
57 idResolver: idResolver,
58 config: config,
59 spindlestream: spindlestream,
60 db: db,
61 posthog: posthog,
62 enforcer: enforcer,
63 logger: logger,
64 }
65}
66
67func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
68 user := p.oauth.GetUser(r)
69 l := p.logger.With("handler", "Index")
70
71 f, err := p.repoResolver.Resolve(r)
72 if err != nil {
73 l.Error("failed to get repo and knot", "err", err)
74 return
75 }
76
77 repoInfo := f.RepoInfo(user)
78
79 ps, err := db.GetPipelineStatuses(
80 p.db,
81 db.FilterEq("repo_owner", repoInfo.OwnerDid),
82 db.FilterEq("repo_name", repoInfo.Name),
83 db.FilterEq("knot", repoInfo.Knot),
84 )
85 if err != nil {
86 l.Error("failed to query db", "err", err)
87 return
88 }
89
90 p.pages.Pipelines(w, pages.PipelinesParams{
91 LoggedInUser: user,
92 RepoInfo: repoInfo,
93 Pipelines: ps,
94 })
95}
96
97func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
98 user := p.oauth.GetUser(r)
99 l := p.logger.With("handler", "Workflow")
100
101 f, err := p.repoResolver.Resolve(r)
102 if err != nil {
103 l.Error("failed to get repo and knot", "err", err)
104 return
105 }
106
107 repoInfo := f.RepoInfo(user)
108
109 pipelineId := chi.URLParam(r, "pipeline")
110 if pipelineId == "" {
111 l.Error("empty pipeline ID")
112 return
113 }
114
115 workflow := chi.URLParam(r, "workflow")
116 if workflow == "" {
117 l.Error("empty workflow name")
118 return
119 }
120
121 ps, err := db.GetPipelineStatuses(
122 p.db,
123 db.FilterEq("repo_owner", repoInfo.OwnerDid),
124 db.FilterEq("repo_name", repoInfo.Name),
125 db.FilterEq("knot", repoInfo.Knot),
126 db.FilterEq("id", pipelineId),
127 )
128 if err != nil {
129 l.Error("failed to query db", "err", err)
130 return
131 }
132
133 if len(ps) != 1 {
134 l.Error("invalid number of pipelines", "len", len(ps))
135 return
136 }
137
138 singlePipeline := ps[0]
139
140 p.pages.Workflow(w, pages.WorkflowParams{
141 LoggedInUser: user,
142 RepoInfo: repoInfo,
143 Pipeline: singlePipeline,
144 Workflow: workflow,
145 })
146}
147
148var upgrader = websocket.Upgrader{
149 ReadBufferSize: 1024,
150 WriteBufferSize: 1024,
151}
152
153func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
154 l := p.logger.With("handler", "logs")
155
156 clientConn, err := upgrader.Upgrade(w, r, nil)
157 if err != nil {
158 l.Error("websocket upgrade failed", "err", err)
159 return
160 }
161 defer func() {
162 _ = clientConn.WriteControl(
163 websocket.CloseMessage,
164 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
165 time.Now().Add(time.Second),
166 )
167 clientConn.Close()
168 }()
169
170 ctx, cancel := context.WithCancel(r.Context())
171 defer cancel()
172
173 user := p.oauth.GetUser(r)
174 f, err := p.repoResolver.Resolve(r)
175 if err != nil {
176 l.Error("failed to get repo and knot", "err", err)
177 http.Error(w, "bad repo/knot", http.StatusBadRequest)
178 return
179 }
180
181 repoInfo := f.RepoInfo(user)
182
183 pipelineId := chi.URLParam(r, "pipeline")
184 workflow := chi.URLParam(r, "workflow")
185 if pipelineId == "" || workflow == "" {
186 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
187 return
188 }
189
190 ps, err := db.GetPipelineStatuses(
191 p.db,
192 db.FilterEq("repo_owner", repoInfo.OwnerDid),
193 db.FilterEq("repo_name", repoInfo.Name),
194 db.FilterEq("knot", repoInfo.Knot),
195 db.FilterEq("id", pipelineId),
196 )
197 if err != nil || len(ps) != 1 {
198 l.Error("pipeline query failed", "err", err, "count", len(ps))
199 http.Error(w, "pipeline not found", http.StatusNotFound)
200 return
201 }
202
203 singlePipeline := ps[0]
204 spindle := repoInfo.Spindle
205 knot := repoInfo.Knot
206 rkey := singlePipeline.Rkey
207
208 if spindle == "" || knot == "" || rkey == "" {
209 http.Error(w, "invalid repo info", http.StatusBadRequest)
210 return
211 }
212
213 scheme := "wss"
214 if p.config.Core.Dev {
215 scheme = "ws"
216 }
217
218 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
219 l = l.With("url", url)
220 l.Info("logs endpoint hit")
221
222 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
223 if err != nil {
224 l.Error("websocket dial failed", "err", err)
225 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
226 return
227 }
228 defer spindleConn.Close()
229
230 // create a channel for incoming messages
231 evChan := make(chan logEvent, 100)
232 // start a goroutine to read from spindle
233 go readLogs(spindleConn, evChan)
234
235 stepIdx := 0
236 var fragment bytes.Buffer
237 for {
238 select {
239 case <-ctx.Done():
240 l.Info("client disconnected")
241 return
242
243 case ev, ok := <-evChan:
244 if !ok {
245 continue
246 }
247
248 if ev.err != nil && ev.isCloseError() {
249 l.Debug("graceful shutdown, tail complete", "err", err)
250 return
251 }
252 if ev.err != nil {
253 l.Error("error reading from spindle", "err", err)
254 return
255 }
256
257 var logLine spindlemodel.LogLine
258 if err = json.Unmarshal(ev.msg, &logLine); err != nil {
259 l.Error("failed to parse logline", "err", err)
260 continue
261 }
262
263 fragment.Reset()
264
265 switch logLine.Kind {
266 case spindlemodel.LogKindControl:
267 // control messages create a new step block
268 stepIdx++
269 collapsed := false
270 if logLine.StepKind == spindlemodel.StepKindSystem {
271 collapsed = true
272 }
273 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
274 Id: stepIdx,
275 Name: logLine.Content,
276 Command: logLine.StepCommand,
277 Collapsed: collapsed,
278 })
279 case spindlemodel.LogKindData:
280 // data messages simply insert new log lines into current step
281 err = p.pages.LogLine(&fragment, pages.LogLineParams{
282 Id: stepIdx,
283 Content: logLine.Content,
284 })
285 }
286 if err != nil {
287 l.Error("failed to render log line", "err", err)
288 return
289 }
290
291 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
292 l.Error("error writing to client", "err", err)
293 return
294 }
295
296 case <-time.After(30 * time.Second):
297 l.Debug("sent keepalive")
298 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
299 l.Error("failed to write control", "err", err)
300 return
301 }
302 }
303 }
304}
305
306// either a message or an error
307type logEvent struct {
308 msg []byte
309 err error
310}
311
312func (ev *logEvent) isCloseError() bool {
313 return websocket.IsCloseError(
314 ev.err,
315 websocket.CloseNormalClosure,
316 websocket.CloseGoingAway,
317 websocket.CloseAbnormalClosure,
318 )
319}
320
321// read logs from spindle and pass through to chan
322func readLogs(conn *websocket.Conn, ch chan logEvent) {
323 defer close(ch)
324
325 for {
326 if conn == nil {
327 return
328 }
329
330 _, msg, err := conn.ReadMessage()
331 if err != nil {
332 ch <- logEvent{err: err}
333 return
334 }
335 ch <- logEvent{msg: msg}
336 }
337}