forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 "tangled.org/core/appview/config"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/oauth"
15 "tangled.org/core/appview/pages"
16 "tangled.org/core/appview/reporesolver"
17 "tangled.org/core/eventconsumer"
18 "tangled.org/core/idresolver"
19 "tangled.org/core/log"
20 "tangled.org/core/rbac"
21 spindlemodel "tangled.org/core/spindle/models"
22
23 "github.com/go-chi/chi/v5"
24 "github.com/gorilla/websocket"
25)
26
27type Pipelines struct {
28 repoResolver *reporesolver.RepoResolver
29 idResolver *idresolver.Resolver
30 config *config.Config
31 oauth *oauth.OAuth
32 pages *pages.Pages
33 spindlestream *eventconsumer.Consumer
34 db *db.DB
35 enforcer *rbac.Enforcer
36 logger *slog.Logger
37}
38
39func New(
40 oauth *oauth.OAuth,
41 repoResolver *reporesolver.RepoResolver,
42 pages *pages.Pages,
43 spindlestream *eventconsumer.Consumer,
44 idResolver *idresolver.Resolver,
45 db *db.DB,
46 config *config.Config,
47 enforcer *rbac.Enforcer,
48) *Pipelines {
49 logger := log.New("pipelines")
50
51 return &Pipelines{
52 oauth: oauth,
53 repoResolver: repoResolver,
54 pages: pages,
55 idResolver: idResolver,
56 config: config,
57 spindlestream: spindlestream,
58 db: db,
59 enforcer: enforcer,
60 logger: logger,
61 }
62}
63
64func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
65 user := p.oauth.GetUser(r)
66 l := p.logger.With("handler", "Index")
67
68 f, err := p.repoResolver.Resolve(r)
69 if err != nil {
70 l.Error("failed to get repo and knot", "err", err)
71 return
72 }
73
74 repoInfo := f.RepoInfo(user)
75
76 ps, err := db.GetPipelineStatuses(
77 p.db,
78 db.FilterEq("repo_owner", repoInfo.OwnerDid),
79 db.FilterEq("repo_name", repoInfo.Name),
80 db.FilterEq("knot", repoInfo.Knot),
81 )
82 if err != nil {
83 l.Error("failed to query db", "err", err)
84 return
85 }
86
87 p.pages.Pipelines(w, pages.PipelinesParams{
88 LoggedInUser: user,
89 RepoInfo: repoInfo,
90 Pipelines: ps,
91 })
92}
93
94func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
95 user := p.oauth.GetUser(r)
96 l := p.logger.With("handler", "Workflow")
97
98 f, err := p.repoResolver.Resolve(r)
99 if err != nil {
100 l.Error("failed to get repo and knot", "err", err)
101 return
102 }
103
104 repoInfo := f.RepoInfo(user)
105
106 pipelineId := chi.URLParam(r, "pipeline")
107 if pipelineId == "" {
108 l.Error("empty pipeline ID")
109 return
110 }
111
112 workflow := chi.URLParam(r, "workflow")
113 if workflow == "" {
114 l.Error("empty workflow name")
115 return
116 }
117
118 ps, err := db.GetPipelineStatuses(
119 p.db,
120 db.FilterEq("repo_owner", repoInfo.OwnerDid),
121 db.FilterEq("repo_name", repoInfo.Name),
122 db.FilterEq("knot", repoInfo.Knot),
123 db.FilterEq("id", pipelineId),
124 )
125 if err != nil {
126 l.Error("failed to query db", "err", err)
127 return
128 }
129
130 if len(ps) != 1 {
131 l.Error("invalid number of pipelines", "len", len(ps))
132 return
133 }
134
135 singlePipeline := ps[0]
136
137 p.pages.Workflow(w, pages.WorkflowParams{
138 LoggedInUser: user,
139 RepoInfo: repoInfo,
140 Pipeline: singlePipeline,
141 Workflow: workflow,
142 })
143}
144
145var upgrader = websocket.Upgrader{
146 ReadBufferSize: 1024,
147 WriteBufferSize: 1024,
148}
149
150func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
151 l := p.logger.With("handler", "logs")
152
153 clientConn, err := upgrader.Upgrade(w, r, nil)
154 if err != nil {
155 l.Error("websocket upgrade failed", "err", err)
156 return
157 }
158 defer func() {
159 _ = clientConn.WriteControl(
160 websocket.CloseMessage,
161 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
162 time.Now().Add(time.Second),
163 )
164 clientConn.Close()
165 }()
166
167 ctx, cancel := context.WithCancel(r.Context())
168 defer cancel()
169
170 user := p.oauth.GetUser(r)
171 f, err := p.repoResolver.Resolve(r)
172 if err != nil {
173 l.Error("failed to get repo and knot", "err", err)
174 http.Error(w, "bad repo/knot", http.StatusBadRequest)
175 return
176 }
177
178 repoInfo := f.RepoInfo(user)
179
180 pipelineId := chi.URLParam(r, "pipeline")
181 workflow := chi.URLParam(r, "workflow")
182 if pipelineId == "" || workflow == "" {
183 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
184 return
185 }
186
187 ps, err := db.GetPipelineStatuses(
188 p.db,
189 db.FilterEq("repo_owner", repoInfo.OwnerDid),
190 db.FilterEq("repo_name", repoInfo.Name),
191 db.FilterEq("knot", repoInfo.Knot),
192 db.FilterEq("id", pipelineId),
193 )
194 if err != nil || len(ps) != 1 {
195 l.Error("pipeline query failed", "err", err, "count", len(ps))
196 http.Error(w, "pipeline not found", http.StatusNotFound)
197 return
198 }
199
200 singlePipeline := ps[0]
201 spindle := repoInfo.Spindle
202 knot := repoInfo.Knot
203 rkey := singlePipeline.Rkey
204
205 if spindle == "" || knot == "" || rkey == "" {
206 http.Error(w, "invalid repo info", http.StatusBadRequest)
207 return
208 }
209
210 scheme := "wss"
211 if p.config.Core.Dev {
212 scheme = "ws"
213 }
214
215 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
216 l = l.With("url", url)
217 l.Info("logs endpoint hit")
218
219 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
220 if err != nil {
221 l.Error("websocket dial failed", "err", err)
222 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
223 return
224 }
225 defer spindleConn.Close()
226
227 // create a channel for incoming messages
228 evChan := make(chan logEvent, 100)
229 // start a goroutine to read from spindle
230 go readLogs(spindleConn, evChan)
231
232 stepIdx := 0
233 var fragment bytes.Buffer
234 for {
235 select {
236 case <-ctx.Done():
237 l.Info("client disconnected")
238 return
239
240 case ev, ok := <-evChan:
241 if !ok {
242 continue
243 }
244
245 if ev.err != nil && ev.isCloseError() {
246 l.Debug("graceful shutdown, tail complete", "err", err)
247 return
248 }
249 if ev.err != nil {
250 l.Error("error reading from spindle", "err", err)
251 return
252 }
253
254 var logLine spindlemodel.LogLine
255 if err = json.Unmarshal(ev.msg, &logLine); err != nil {
256 l.Error("failed to parse logline", "err", err)
257 continue
258 }
259
260 fragment.Reset()
261
262 switch logLine.Kind {
263 case spindlemodel.LogKindControl:
264 // control messages create a new step block
265 stepIdx++
266 collapsed := false
267 if logLine.StepKind == spindlemodel.StepKindSystem {
268 collapsed = true
269 }
270 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
271 Id: stepIdx,
272 Name: logLine.Content,
273 Command: logLine.StepCommand,
274 Collapsed: collapsed,
275 })
276 case spindlemodel.LogKindData:
277 // data messages simply insert new log lines into current step
278 err = p.pages.LogLine(&fragment, pages.LogLineParams{
279 Id: stepIdx,
280 Content: logLine.Content,
281 })
282 }
283 if err != nil {
284 l.Error("failed to render log line", "err", err)
285 return
286 }
287
288 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
289 l.Error("error writing to client", "err", err)
290 return
291 }
292
293 case <-time.After(30 * time.Second):
294 l.Debug("sent keepalive")
295 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
296 l.Error("failed to write control", "err", err)
297 return
298 }
299 }
300 }
301}
302
303// either a message or an error
304type logEvent struct {
305 msg []byte
306 err error
307}
308
309func (ev *logEvent) isCloseError() bool {
310 return websocket.IsCloseError(
311 ev.err,
312 websocket.CloseNormalClosure,
313 websocket.CloseGoingAway,
314 websocket.CloseAbnormalClosure,
315 )
316}
317
318// read logs from spindle and pass through to chan
319func readLogs(conn *websocket.Conn, ch chan logEvent) {
320 defer close(ch)
321
322 for {
323 if conn == nil {
324 return
325 }
326
327 _, msg, err := conn.ReadMessage()
328 if err != nil {
329 ch <- logEvent{err: err}
330 return
331 }
332 ch <- logEvent{msg: msg}
333 }
334}