forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 "tangled.org/core/appview/config"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/oauth"
15 "tangled.org/core/appview/pages"
16 "tangled.org/core/appview/reporesolver"
17 "tangled.org/core/eventconsumer"
18 "tangled.org/core/idresolver"
19 "tangled.org/core/rbac"
20 spindlemodel "tangled.org/core/spindle/models"
21
22 "github.com/go-chi/chi/v5"
23 "github.com/gorilla/websocket"
24)
25
26type Pipelines struct {
27 repoResolver *reporesolver.RepoResolver
28 idResolver *idresolver.Resolver
29 config *config.Config
30 oauth *oauth.OAuth
31 pages *pages.Pages
32 spindlestream *eventconsumer.Consumer
33 db *db.DB
34 enforcer *rbac.Enforcer
35 logger *slog.Logger
36}
37
38func New(
39 oauth *oauth.OAuth,
40 repoResolver *reporesolver.RepoResolver,
41 pages *pages.Pages,
42 spindlestream *eventconsumer.Consumer,
43 idResolver *idresolver.Resolver,
44 db *db.DB,
45 config *config.Config,
46 enforcer *rbac.Enforcer,
47 logger *slog.Logger,
48) *Pipelines {
49 return &Pipelines{
50 oauth: oauth,
51 repoResolver: repoResolver,
52 pages: pages,
53 idResolver: idResolver,
54 config: config,
55 spindlestream: spindlestream,
56 db: db,
57 enforcer: enforcer,
58 logger: logger,
59 }
60}
61
62func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
63 user := p.oauth.GetUser(r)
64 l := p.logger.With("handler", "Index")
65
66 f, err := p.repoResolver.Resolve(r)
67 if err != nil {
68 l.Error("failed to get repo and knot", "err", err)
69 return
70 }
71
72 repoInfo := f.RepoInfo(user)
73
74 ps, err := db.GetPipelineStatuses(
75 p.db,
76 db.FilterEq("repo_owner", repoInfo.OwnerDid),
77 db.FilterEq("repo_name", repoInfo.Name),
78 db.FilterEq("knot", repoInfo.Knot),
79 )
80 if err != nil {
81 l.Error("failed to query db", "err", err)
82 return
83 }
84
85 p.pages.Pipelines(w, pages.PipelinesParams{
86 LoggedInUser: user,
87 RepoInfo: repoInfo,
88 Pipelines: ps,
89 })
90}
91
92func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
93 user := p.oauth.GetUser(r)
94 l := p.logger.With("handler", "Workflow")
95
96 f, err := p.repoResolver.Resolve(r)
97 if err != nil {
98 l.Error("failed to get repo and knot", "err", err)
99 return
100 }
101
102 repoInfo := f.RepoInfo(user)
103
104 pipelineId := chi.URLParam(r, "pipeline")
105 if pipelineId == "" {
106 l.Error("empty pipeline ID")
107 return
108 }
109
110 workflow := chi.URLParam(r, "workflow")
111 if workflow == "" {
112 l.Error("empty workflow name")
113 return
114 }
115
116 ps, err := db.GetPipelineStatuses(
117 p.db,
118 db.FilterEq("repo_owner", repoInfo.OwnerDid),
119 db.FilterEq("repo_name", repoInfo.Name),
120 db.FilterEq("knot", repoInfo.Knot),
121 db.FilterEq("id", pipelineId),
122 )
123 if err != nil {
124 l.Error("failed to query db", "err", err)
125 return
126 }
127
128 if len(ps) != 1 {
129 l.Error("invalid number of pipelines", "len", len(ps))
130 return
131 }
132
133 singlePipeline := ps[0]
134
135 p.pages.Workflow(w, pages.WorkflowParams{
136 LoggedInUser: user,
137 RepoInfo: repoInfo,
138 Pipeline: singlePipeline,
139 Workflow: workflow,
140 })
141}
142
143var upgrader = websocket.Upgrader{
144 ReadBufferSize: 1024,
145 WriteBufferSize: 1024,
146}
147
148func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
149 l := p.logger.With("handler", "logs")
150
151 clientConn, err := upgrader.Upgrade(w, r, nil)
152 if err != nil {
153 l.Error("websocket upgrade failed", "err", err)
154 return
155 }
156 defer func() {
157 _ = clientConn.WriteControl(
158 websocket.CloseMessage,
159 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
160 time.Now().Add(time.Second),
161 )
162 clientConn.Close()
163 }()
164
165 ctx, cancel := context.WithCancel(r.Context())
166 defer cancel()
167
168 user := p.oauth.GetUser(r)
169 f, err := p.repoResolver.Resolve(r)
170 if err != nil {
171 l.Error("failed to get repo and knot", "err", err)
172 http.Error(w, "bad repo/knot", http.StatusBadRequest)
173 return
174 }
175
176 repoInfo := f.RepoInfo(user)
177
178 pipelineId := chi.URLParam(r, "pipeline")
179 workflow := chi.URLParam(r, "workflow")
180 if pipelineId == "" || workflow == "" {
181 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
182 return
183 }
184
185 ps, err := db.GetPipelineStatuses(
186 p.db,
187 db.FilterEq("repo_owner", repoInfo.OwnerDid),
188 db.FilterEq("repo_name", repoInfo.Name),
189 db.FilterEq("knot", repoInfo.Knot),
190 db.FilterEq("id", pipelineId),
191 )
192 if err != nil || len(ps) != 1 {
193 l.Error("pipeline query failed", "err", err, "count", len(ps))
194 http.Error(w, "pipeline not found", http.StatusNotFound)
195 return
196 }
197
198 singlePipeline := ps[0]
199 spindle := repoInfo.Spindle
200 knot := repoInfo.Knot
201 rkey := singlePipeline.Rkey
202
203 if spindle == "" || knot == "" || rkey == "" {
204 http.Error(w, "invalid repo info", http.StatusBadRequest)
205 return
206 }
207
208 scheme := "wss"
209 if p.config.Core.Dev {
210 scheme = "ws"
211 }
212
213 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
214 l = l.With("url", url)
215 l.Info("logs endpoint hit")
216
217 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
218 if err != nil {
219 l.Error("websocket dial failed", "err", err)
220 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
221 return
222 }
223 defer spindleConn.Close()
224
225 // create a channel for incoming messages
226 evChan := make(chan logEvent, 100)
227 // start a goroutine to read from spindle
228 go readLogs(spindleConn, evChan)
229
230 stepIdx := 0
231 var fragment bytes.Buffer
232 for {
233 select {
234 case <-ctx.Done():
235 l.Info("client disconnected")
236 return
237
238 case ev, ok := <-evChan:
239 if !ok {
240 continue
241 }
242
243 if ev.err != nil && ev.isCloseError() {
244 l.Debug("graceful shutdown, tail complete", "err", err)
245 return
246 }
247 if ev.err != nil {
248 l.Error("error reading from spindle", "err", err)
249 return
250 }
251
252 var logLine spindlemodel.LogLine
253 if err = json.Unmarshal(ev.msg, &logLine); err != nil {
254 l.Error("failed to parse logline", "err", err)
255 continue
256 }
257
258 fragment.Reset()
259
260 switch logLine.Kind {
261 case spindlemodel.LogKindControl:
262 // control messages create a new step block
263 stepIdx++
264 collapsed := false
265 if logLine.StepKind == spindlemodel.StepKindSystem {
266 collapsed = true
267 }
268 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
269 Id: stepIdx,
270 Name: logLine.Content,
271 Command: logLine.StepCommand,
272 Collapsed: collapsed,
273 })
274 case spindlemodel.LogKindData:
275 // data messages simply insert new log lines into current step
276 err = p.pages.LogLine(&fragment, pages.LogLineParams{
277 Id: stepIdx,
278 Content: logLine.Content,
279 })
280 }
281 if err != nil {
282 l.Error("failed to render log line", "err", err)
283 return
284 }
285
286 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
287 l.Error("error writing to client", "err", err)
288 return
289 }
290
291 case <-time.After(30 * time.Second):
292 l.Debug("sent keepalive")
293 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
294 l.Error("failed to write control", "err", err)
295 return
296 }
297 }
298 }
299}
300
301// either a message or an error
302type logEvent struct {
303 msg []byte
304 err error
305}
306
307func (ev *logEvent) isCloseError() bool {
308 return websocket.IsCloseError(
309 ev.err,
310 websocket.CloseNormalClosure,
311 websocket.CloseGoingAway,
312 websocket.CloseAbnormalClosure,
313 )
314}
315
316// read logs from spindle and pass through to chan
317func readLogs(conn *websocket.Conn, ch chan logEvent) {
318 defer close(ch)
319
320 for {
321 if conn == nil {
322 return
323 }
324
325 _, msg, err := conn.ReadMessage()
326 if err != nil {
327 ch <- logEvent{err: err}
328 return
329 }
330 ch <- logEvent{msg: msg}
331 }
332}