1package pipelines
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 "tangled.sh/tangled.sh/core/appview/config"
13 "tangled.sh/tangled.sh/core/appview/db"
14 "tangled.sh/tangled.sh/core/appview/idresolver"
15 "tangled.sh/tangled.sh/core/appview/oauth"
16 "tangled.sh/tangled.sh/core/appview/pages"
17 "tangled.sh/tangled.sh/core/appview/reporesolver"
18 "tangled.sh/tangled.sh/core/eventconsumer"
19 "tangled.sh/tangled.sh/core/log"
20 "tangled.sh/tangled.sh/core/rbac"
21 spindlemodel "tangled.sh/tangled.sh/core/spindle/models"
22
23 "github.com/go-chi/chi/v5"
24 "github.com/gorilla/websocket"
25 "github.com/posthog/posthog-go"
26)
27
28type Pipelines struct {
29 repoResolver *reporesolver.RepoResolver
30 idResolver *idresolver.Resolver
31 config *config.Config
32 oauth *oauth.OAuth
33 pages *pages.Pages
34 spindlestream *eventconsumer.Consumer
35 db *db.DB
36 enforcer *rbac.Enforcer
37 posthog posthog.Client
38 logger *slog.Logger
39}
40
41func New(
42 oauth *oauth.OAuth,
43 repoResolver *reporesolver.RepoResolver,
44 pages *pages.Pages,
45 spindlestream *eventconsumer.Consumer,
46 idResolver *idresolver.Resolver,
47 db *db.DB,
48 config *config.Config,
49 posthog posthog.Client,
50 enforcer *rbac.Enforcer,
51) *Pipelines {
52 logger := log.New("pipelines")
53
54 return &Pipelines{oauth: oauth,
55 repoResolver: repoResolver,
56 pages: pages,
57 idResolver: idResolver,
58 config: config,
59 spindlestream: spindlestream,
60 db: db,
61 posthog: posthog,
62 enforcer: enforcer,
63 logger: logger,
64 }
65}
66
67func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
68 user := p.oauth.GetUser(r)
69 l := p.logger.With("handler", "Index")
70
71 f, err := p.repoResolver.Resolve(r)
72 if err != nil {
73 l.Error("failed to get repo and knot", "err", err)
74 return
75 }
76
77 repoInfo := f.RepoInfo(user)
78
79 ps, err := db.GetPipelineStatuses(
80 p.db,
81 db.FilterEq("repo_owner", repoInfo.OwnerDid),
82 db.FilterEq("repo_name", repoInfo.Name),
83 db.FilterEq("knot", repoInfo.Knot),
84 )
85 if err != nil {
86 l.Error("failed to query db", "err", err)
87 return
88 }
89
90 p.pages.Pipelines(w, pages.PipelinesParams{
91 LoggedInUser: user,
92 RepoInfo: repoInfo,
93 Pipelines: ps,
94 })
95}
96
97func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
98 user := p.oauth.GetUser(r)
99 l := p.logger.With("handler", "Workflow")
100
101 f, err := p.repoResolver.Resolve(r)
102 if err != nil {
103 l.Error("failed to get repo and knot", "err", err)
104 return
105 }
106
107 repoInfo := f.RepoInfo(user)
108
109 pipelineId := chi.URLParam(r, "pipeline")
110 if pipelineId == "" {
111 l.Error("empty pipeline ID")
112 return
113 }
114
115 workflow := chi.URLParam(r, "workflow")
116 if workflow == "" {
117 l.Error("empty workflow name")
118 return
119 }
120
121 ps, err := db.GetPipelineStatuses(
122 p.db,
123 db.FilterEq("repo_owner", repoInfo.OwnerDid),
124 db.FilterEq("repo_name", repoInfo.Name),
125 db.FilterEq("knot", repoInfo.Knot),
126 db.FilterEq("id", pipelineId),
127 )
128 if err != nil {
129 l.Error("failed to query db", "err", err)
130 return
131 }
132
133 if len(ps) != 1 {
134 l.Error("invalid number of pipelines", "len", len(ps))
135 return
136 }
137
138 singlePipeline := ps[0]
139
140 p.pages.Workflow(w, pages.WorkflowParams{
141 LoggedInUser: user,
142 RepoInfo: repoInfo,
143 Pipeline: singlePipeline,
144 Workflow: workflow,
145 })
146}
147
148var upgrader = websocket.Upgrader{
149 ReadBufferSize: 1024,
150 WriteBufferSize: 1024,
151}
152
153func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
154 l := p.logger.With("handler", "logs")
155
156 clientConn, err := upgrader.Upgrade(w, r, nil)
157 if err != nil {
158 l.Error("websocket upgrade failed", "err", err)
159 return
160 }
161 defer func() {
162 _ = clientConn.WriteControl(
163 websocket.CloseMessage,
164 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
165 time.Now().Add(time.Second),
166 )
167 clientConn.Close()
168 }()
169
170 ctx, cancel := context.WithCancel(r.Context())
171 defer cancel()
172 go func() {
173 for {
174 if _, _, err := clientConn.NextReader(); err != nil {
175 l.Error("failed to read", "err", err)
176 cancel()
177 return
178 }
179 }
180 }()
181
182 user := p.oauth.GetUser(r)
183 f, err := p.repoResolver.Resolve(r)
184 if err != nil {
185 l.Error("failed to get repo and knot", "err", err)
186 http.Error(w, "bad repo/knot", http.StatusBadRequest)
187 return
188 }
189
190 repoInfo := f.RepoInfo(user)
191
192 pipelineId := chi.URLParam(r, "pipeline")
193 workflow := chi.URLParam(r, "workflow")
194 if pipelineId == "" || workflow == "" {
195 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
196 return
197 }
198
199 ps, err := db.GetPipelineStatuses(
200 p.db,
201 db.FilterEq("repo_owner", repoInfo.OwnerDid),
202 db.FilterEq("repo_name", repoInfo.Name),
203 db.FilterEq("knot", repoInfo.Knot),
204 db.FilterEq("id", pipelineId),
205 )
206 if err != nil || len(ps) != 1 {
207 l.Error("pipeline query failed", "err", err, "count", len(ps))
208 http.Error(w, "pipeline not found", http.StatusNotFound)
209 return
210 }
211
212 singlePipeline := ps[0]
213 spindle := repoInfo.Spindle
214 knot := repoInfo.Knot
215 rkey := singlePipeline.Rkey
216
217 if spindle == "" || knot == "" || rkey == "" {
218 http.Error(w, "invalid repo info", http.StatusBadRequest)
219 return
220 }
221
222 scheme := "wss"
223 if p.config.Core.Dev {
224 scheme = "ws"
225 }
226
227 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
228 l = l.With("url", url)
229 l.Info("logs endpoint hit")
230
231 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
232 if err != nil {
233 l.Error("websocket dial failed", "err", err)
234 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
235 return
236 }
237 defer spindleConn.Close()
238
239 // create a channel for incoming messages
240 msgChan := make(chan []byte, 10)
241 errChan := make(chan error, 1)
242
243 // start a goroutine to read from spindle
244 go func() {
245 defer close(msgChan)
246 defer close(errChan)
247
248 for {
249 _, msg, err := spindleConn.ReadMessage()
250 if err != nil {
251 if websocket.IsCloseError(err,
252 websocket.CloseNormalClosure,
253 websocket.CloseGoingAway,
254 websocket.CloseAbnormalClosure) {
255 errChan <- nil // signal graceful end
256 } else {
257 errChan <- err
258 }
259 return
260 }
261 msgChan <- msg
262 }
263 }()
264
265 for {
266 select {
267 case <-ctx.Done():
268 l.Info("client disconnected")
269 return
270 case err := <-errChan:
271 if err != nil {
272 l.Error("error reading from spindle", "err", err)
273 }
274
275 if err == nil {
276 l.Info("log tail complete")
277 }
278
279 return
280 case msg := <-msgChan:
281 var logLine spindlemodel.LogLine
282 if err = json.Unmarshal(msg, &logLine); err != nil {
283 l.Error("failed to parse logline", "err", err)
284 continue
285 }
286
287 html := fmt.Appendf(nil, `
288 <div id="lines" hx-swap-oob="beforeend">
289 <p>%s: %s</p>
290 </div>
291 `, logLine.Stream, logLine.Data)
292
293 if err = clientConn.WriteMessage(websocket.TextMessage, html); err != nil {
294 l.Error("error writing to client", "err", err)
295 return
296 }
297 case <-time.After(30 * time.Second):
298 l.Debug("sent keepalive")
299 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
300 l.Error("failed to write control", "err", err)
301 }
302 }
303 }
304}