forked from tangled.org/core
Monorepo for Tangled — https://tangled.org

appview: stream logs from workflow endpoint

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li 0df0d697 6e90af17

verified
Changed files
+254 -44
appview
db
pages
repoinfo
templates
pipelines
reporesolver
+23 -9
appview/db/pipeline.go
···
}
type WorkflowStatus struct {
-
data []PipelineStatus
+
Data []PipelineStatus
}
func (w WorkflowStatus) Latest() PipelineStatus {
-
return w.data[len(w.data)-1]
+
return w.Data[len(w.Data)-1]
}
// time taken by this workflow to reach an "end state"
func (w WorkflowStatus) TimeTaken() time.Duration {
var start, end *time.Time
-
for _, s := range w.data {
+
for _, s := range w.Data {
if s.Status.IsStart() {
start = &s.Created
}
···
}
slices.Sort(ws)
return ws
+
}
+
+
// if we know that a spindle has picked up this pipeline, then it is Responding
+
func (p Pipeline) IsResponding() bool {
+
return len(p.Statuses) != 0
}
type Trigger struct {
···
status.Status,
status.Error,
status.ExitCode,
+
status.Created.Format(time.RFC3339),
}
placeholders := make([]string, len(args))
···
workflow,
status,
error,
-
exit_code
+
exit_code,
+
created
) values (%s)
`, strings.Join(placeholders, ","))
···
return nil, err
}
-
// Parse created time manually
p.Created, err = time.Parse(time.RFC3339, created)
if err != nil {
return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
}
-
// Link trigger to pipeline
t.Id = p.TriggerId
p.Trigger = &t
p.Statuses = make(map[string]WorkflowStatus)
···
}
// append
-
statuses.data = append(statuses.data, ps)
+
statuses.Data = append(statuses.Data, ps)
// reassign
pipeline.Statuses[ps.Workflow] = statuses
···
var all []Pipeline
for _, p := range pipelines {
for _, s := range p.Statuses {
-
slices.SortFunc(s.data, func(a, b PipelineStatus) int {
+
slices.SortFunc(s.Data, func(a, b PipelineStatus) int {
if a.Created.After(b.Created) {
return 1
}
-
return -1
+
if a.Created.Before(b.Created) {
+
return -1
+
}
+
if a.ID > b.ID {
+
return 1
+
}
+
if a.ID < b.ID {
+
return -1
+
}
+
return 0
})
}
all = append(all, p)
+1
appview/pages/pages.go
···
RepoInfo repoinfo.RepoInfo
Pipeline db.Pipeline
Workflow string
+
LogUrl string
Active string
}
+1
appview/pages/repoinfo/repoinfo.go
···
OwnerHandle string
Description string
Knot string
+
Spindle string
RepoAt syntax.ATURI
IsStarred bool
Stats db.RepoStats
+1
appview/pages/templates/layouts/base.html
···
/>
<meta name="htmx-config" content='{"includeIndicatorStyles": false}'>
<script src="/static/htmx.min.js"></script>
+
<script src="https://cdn.jsdelivr.net/npm/htmx-ext-ws@2.0.2"></script>
<link rel="stylesheet" href="/static/tw.css?{{ cssContentHash }}" type="text/css" />
<title>{{ block "title" . }}{{ end }} · tangled</title>
{{ block "extrameta" . }}{{ end }}
+15 -2
appview/pages/templates/repo/pipelines/fragments/pipelineSymbol.html
···
{{ $statuses := .Statuses }}
{{ $total := len $statuses }}
{{ $success := index $c "success" }}
+
{{ $fail := index $c "failed" }}
+
{{ $empty := eq $total 0 }}
{{ $allPass := eq $success $total }}
+
{{ $allFail := eq $fail $total }}
-
{{ if $allPass }}
+
{{ if $empty }}
+
<div class="flex gap-1 items-center">
+
{{ i "hourglass" "size-4 text-gray-600 dark:text-gray-400 " }}
+
<span>0/{{ $total }}</span>
+
</div>
+
{{ else if $allPass }}
<div class="flex gap-1 items-center">
-
{{ i "check" "size-4 text-green-600 dark:text-green-400 " }}
+
{{ i "check" "size-4 text-green-600" }}
<span>{{ $total }}/{{ $total }}</span>
+
</div>
+
{{ else if $allFail }}
+
<div class="flex gap-1 items-center">
+
{{ i "x" "size-4 text-red-600" }}
+
<span>0/{{ $total }}</span>
</div>
{{ else }}
{{ $radius := f64 8 }}
+5
appview/pages/templates/repo/pipelines/fragments/tooltip.html
···
<time>{{ $time }}</time>
</div>
</div>
+
{{ else }}
+
<div class="flex items-center gap-2 p-2 italic text-gray-600 dark:text-gray-400 ">
+
{{ i "hourglass" "size-4" }}
+
Waiting for spindle ...
+
</div>
{{ end }}
</div>
</div>
+23 -9
appview/pages/templates/repo/pipelines/pipelines.html
···
{{ $root := index . 0 }}
{{ $p := index . 1 }}
{{ with $p }}
-
<div class="grid grid-cols-4 md:grid-cols-8 gap-2 items-center w-full">
-
<div class="col-span-1 md:col-span-5 flex items-center gap-4">
+
<div class="grid grid-cols-6 md:grid-cols-12 gap-2 items-center w-full">
+
<div class="col-span-2 md:col-span-8 flex items-center gap-4">
{{ $target := .Trigger.TargetRef }}
{{ $workflows := .Workflows }}
+
{{ $link := "" }}
+
{{ if .IsResponding }}
+
{{ $link = printf "/%s/pipelines/%s/workflow/%d" $root.RepoInfo.FullName .Id (index $workflows 0) }}
+
{{ end }}
{{ if .Trigger.IsPush }}
-
<a href="/{{ $root.RepoInfo.FullName }}/pipelines/{{ .Id }}/workflow/{{ index $workflows 0 }}" class="block">
-
<span class="font-bold">{{ $target }}</span>
-
<span>push</span>
-
</a>
+
<span class="font-bold">{{ $target }}</span>
+
<span>push</span>
<span class="hidden md:inline-flex gap-2 items-center font-mono text-sm">
{{ $old := deref .Trigger.PushOldSha }}
{{ $new := deref .Trigger.PushNewSha }}
···
{{ end }}
</div>
-
<div class="col-span-1 pl-4">
+
<div class="text-sm md:text-base col-span-1">
{{ template "repo/pipelines/fragments/pipelineSymbolLong" . }}
</div>
-
<div class="col-span-1 text-right">
+
<div class="text-sm md:text-base col-span-1 text-right">
<time title="{{ .Created | longTimeFmt }}">
{{ .Created | shortTimeFmt }} ago
</time>
</div>
{{ $t := .TimeTaken }}
-
<div class="col-span-1 text-right">
+
<div class="text-sm md:text-base col-span-1 text-right">
{{ if $t }}
<time title="{{ $t }}">{{ $t | durationFmt }}</time>
{{ else }}
<time>--</time>
{{ end }}
</div>
+
+
<div class="col-span-1 flex justify-end">
+
{{ if $link }}
+
<a class="md:hidden" href="/{{ $root.RepoInfo.FullName }}/pipelines/{{ .Id }}/workflow/{{ index $workflows 0 }}">
+
{{ i "arrow-up-right" "size-4" }}
+
</a>
+
<a class="hidden md:inline underline" href="/{{ $root.RepoInfo.FullName }}/pipelines/{{ .Id }}/workflow/{{ index $workflows 0 }}">
+
view
+
</a>
+
{{ end }}
+
</div>
+
</div>
{{ end }}
{{ end }}
+35 -18
appview/pages/templates/repo/pipelines/workflow.html
···
{{ define "sidebar" }}
{{ $active := .Workflow }}
{{ with .Pipeline }}
-
<div class="rounded border border-gray-200 dark:border-gray-700">
+
{{ $id := .Id }}
+
<div class="grid grid-cols-1 rounded border border-gray-200 dark:border-gray-700 divide-y divide-gray-200 dark:divide-gray-700">
{{ range $name, $all := .Statuses }}
-
<div class="flex items-center justify-between p-2 border-b border-gray-200 dark:border-gray-700 {{if eq $name $active}}bg-gray-100/50 dark:bg-gray-700/50{{end}}">
-
{{ $lastStatus := $all.Latest }}
-
{{ $kind := $lastStatus.Status.String }}
+
<a href="/{{ $.RepoInfo.FullName }}/pipelines/{{ $id }}/workflow/{{ $name }}" class="no-underline hover:no-underline hover:bg-gray-100/25 hover:dark:bg-gray-700/25">
+
<div
+
class="flex gap-2 items-center justify-between p-2 {{ if eq $name $active }}bg-gray-100/50 dark:bg-gray-700/50{{ end }}">
+
{{ $lastStatus := $all.Latest }}
+
{{ $kind := $lastStatus.Status.String }}
+
+
{{ $t := .TimeTaken }}
+
{{ $time := "" }}
-
{{ $t := .TimeTaken }}
-
{{ $time := "" }}
-
{{ if $t }}
+
{{ if $t }}
{{ $time = durationFmt $t }}
-
{{ else }}
-
{{ $time = printf "%s ago" (shortTimeFmt $.Created) }}
-
{{ end }}
+
{{ else }}
+
{{ $time = printf "%s ago" (shortTimeFmt $lastStatus.Created) }}
+
{{ end }}
-
<div id="left" class="flex items-center gap-2 flex-shrink-0">
-
{{ template "repo/pipelines/fragments/workflowSymbol" $all }}
-
{{ $name }}
+
<div id="left" class="flex items-center gap-2 flex-shrink-0">
+
{{ template "repo/pipelines/fragments/workflowSymbol" $all }}
+
{{ $name }}
+
</div>
+
<div id="right" class="flex items-center gap-2 flex-shrink-0">
+
<span class="font-bold">{{ $kind }}</span>
+
<time>{{ $time }}</time>
+
</div>
</div>
-
<div id="right" class="flex items-center gap-2 flex-shrink-0">
-
<span class="font-bold">{{ $kind }}</span>
-
<time>{{ $time }}</time>
-
</div>
-
</div>
+
</a>
{{ end }}
</div>
{{ end }}
{{ end }}
+
+
{{ define "logs" }}
+
<div id="log-stream"
+
class="p-2 bg-gray-100 dark:bg-gray-900 font-mono text-sm min-h-96 max-h-screen overflow-auto flex flex-col-reverse [overflow-anchor:auto_!important]"
+
hx-ext="ws"
+
ws-connect="/{{ $.RepoInfo.FullName }}/pipelines/{{ .Pipeline.Id }}/workflow/{{ .Workflow }}/logs">
+
<div id="lines">
+
<!-- Each log line should be rendered with class="item" like below -->
+
<!-- <div class="item">[INFO] Log line here</div> -->
+
</div>
+
</div>
+
{{ end }}
+147 -5
appview/pipelines/pipelines.go
···
package pipelines
import (
+
"context"
+
"encoding/json"
+
"fmt"
"log/slog"
"net/http"
+
"strings"
+
"time"
"tangled.sh/tangled.sh/core/appview/config"
"tangled.sh/tangled.sh/core/appview/db"
···
"tangled.sh/tangled.sh/core/eventconsumer"
"tangled.sh/tangled.sh/core/log"
"tangled.sh/tangled.sh/core/rbac"
+
spindlemodel "tangled.sh/tangled.sh/core/spindle/models"
"github.com/go-chi/chi/v5"
+
"github.com/gorilla/websocket"
"github.com/posthog/posthog-go"
)
···
db *db.DB
enforcer *rbac.Enforcer
posthog posthog.Client
-
Logger *slog.Logger
+
logger *slog.Logger
}
func New(
···
db: db,
posthog: posthog,
enforcer: enforcer,
-
Logger: logger,
+
logger: logger,
}
}
func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
user := p.oauth.GetUser(r)
-
l := p.Logger.With("handler", "Index")
+
l := p.logger.With("handler", "Index")
f, err := p.repoResolver.Resolve(r)
if err != nil {
···
func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
user := p.oauth.GetUser(r)
-
l := p.Logger.With("handler", "Workflow")
+
l := p.logger.With("handler", "Workflow")
f, err := p.repoResolver.Resolve(r)
if err != nil {
···
}
workflow := chi.URLParam(r, "workflow")
-
if pipelineId == "" {
+
if workflow == "" {
l.Error("empty workflow name")
return
}
···
Workflow: workflow,
})
}
+
+
var upgrader = websocket.Upgrader{
+
ReadBufferSize: 1024,
+
WriteBufferSize: 1024,
+
}
+
+
func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
+
l := p.logger.With("handler", "logs")
+
+
clientConn, err := upgrader.Upgrade(w, r, nil)
+
if err != nil {
+
l.Error("websocket upgrade failed", "err", err)
+
return
+
}
+
defer clientConn.Close()
+
+
ctx, cancel := context.WithCancel(r.Context())
+
defer cancel()
+
go func() {
+
for {
+
if _, _, err := clientConn.NextReader(); err != nil {
+
l.Error("failed to read", "err", err)
+
cancel()
+
return
+
}
+
}
+
}()
+
+
user := p.oauth.GetUser(r)
+
f, err := p.repoResolver.Resolve(r)
+
if err != nil {
+
l.Error("failed to get repo and knot", "err", err)
+
http.Error(w, "bad repo/knot", http.StatusBadRequest)
+
return
+
}
+
+
repoInfo := f.RepoInfo(user)
+
+
pipelineId := chi.URLParam(r, "pipeline")
+
workflow := chi.URLParam(r, "workflow")
+
if pipelineId == "" || workflow == "" {
+
http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
+
return
+
}
+
+
ps, err := db.GetPipelineStatuses(
+
p.db,
+
db.FilterEq("repo_owner", repoInfo.OwnerDid),
+
db.FilterEq("repo_name", repoInfo.Name),
+
db.FilterEq("knot", repoInfo.Knot),
+
db.FilterEq("id", pipelineId),
+
)
+
if err != nil || len(ps) != 1 {
+
l.Error("pipeline query failed", "err", err, "count", len(ps))
+
http.Error(w, "pipeline not found", http.StatusNotFound)
+
return
+
}
+
+
singlePipeline := ps[0]
+
spindle := repoInfo.Spindle
+
knot := repoInfo.Knot
+
rkey := singlePipeline.Rkey
+
+
if spindle == "" || knot == "" || rkey == "" {
+
http.Error(w, "invalid repo info", http.StatusBadRequest)
+
return
+
}
+
+
scheme := "wss"
+
if p.config.Core.Dev {
+
scheme = "ws"
+
}
+
+
url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
+
l = l.With("url", url)
+
l.Info("logs endpoint hit")
+
+
spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
+
if err != nil {
+
l.Error("websocket dial failed", "err", err)
+
http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
+
return
+
}
+
defer spindleConn.Close()
+
+
// create a channel for incoming messages
+
msgChan := make(chan []byte, 10)
+
errChan := make(chan error, 1)
+
+
// start a goroutine to read from spindle
+
go func() {
+
defer close(msgChan)
+
for {
+
_, msg, err := spindleConn.ReadMessage()
+
if err != nil {
+
errChan <- err
+
return
+
}
+
msgChan <- msg
+
}
+
}()
+
+
for {
+
select {
+
case <-ctx.Done():
+
l.Info("client disconnected")
+
return
+
case err := <-errChan:
+
l.Error("error reading from spindle", "err", err)
+
return
+
case msg := <-msgChan:
+
var logLine spindlemodel.LogLine
+
if err = json.Unmarshal(msg, &logLine); err != nil {
+
l.Error("failed to parse logline", "err", err)
+
continue
+
}
+
+
html := fmt.Appendf(nil, `
+
<div id="lines" hx-swap-oob="beforeend">
+
<p>%s: %s</p>
+
</div>
+
`, logLine.Stream, logLine.Data)
+
+
if err = clientConn.WriteMessage(websocket.TextMessage, html); err != nil {
+
l.Error("error writing to client", "err", err)
+
return
+
}
+
case <-time.After(30 * time.Second):
+
l.Debug("sent keepalive")
+
if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
+
l.Error("failed to write control", "err", err)
+
}
+
}
+
}
+
}
+1
appview/pipelines/router.go
···
r := chi.NewRouter()
r.Get("/", p.Index)
r.Get("/{pipeline}/workflow/{workflow}", p.Workflow)
+
r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs)
return r
}
+1
appview/reporesolver/resolver.go
···
Ref: f.Ref,
IsStarred: isStarred,
Knot: knot,
+
Spindle: f.Spindle,
Roles: f.RolesInRepo(user),
Stats: db.RepoStats{
StarCount: starCount,
+1 -1
flake.nix
···
inherit (gitignore.lib) gitignoreSource;
in {
overlays.default = final: prev: let
-
goModHash = "sha256-G+59ZwQwBbnO9ZjAB5zMEmWZbeG4k7ko/lPz+ceqYKs=";
+
goModHash = "sha256-2RUwj16RNaZ/gCOcd7b3LRCHiROCRj9HuzbBdLdgWGo=";
appviewDeps = {
inherit htmx-src htmx-ws-src lucide-src inter-fonts-src ibm-plex-mono-src goModHash gitignoreSource;
};