spindle/engine: clean up envs, stdout on dev and use new models #258

merged
opened by anirudh.fi targeting master from push-momltwttmuyq
Changed files
+90 -66
spindle
+7 -1
spindle/config/config.go
···
Owner string `env:"OWNER, required"`
}
+
type Pipelines struct {
+
// TODO: change default to nixery.tangled.sh
+
Nixery string `env:"NIXERY, default=nixery.dev"`
+
}
+
type Config struct {
-
Server Server `env:",prefix=SPINDLE_SERVER_"`
+
Server Server `env:",prefix=SPINDLE_SERVER_"`
+
Pipelines Pipelines `env:",prefix=SPINDLE_PIPELINES_"`
}
func Load(ctx context.Context) (*Config, error) {
+21
spindle/engine/ansi_stripper.go
···
+
package engine
+
+
import (
+
"io"
+
+
"github.com/go-enry/go-enry/v2/regex"
+
)
+
+
// regex to match ANSI escape codes (e.g., color codes, cursor moves)
+
const ansi = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))"
+
+
var re = regex.MustCompile(ansi)
+
+
type ansiStrippingWriter struct {
+
underlying io.Writer
+
}
+
+
func (w *ansiStrippingWriter) Write(p []byte) (int, error) {
+
clean := re.ReplaceAll(p, []byte{})
+
return w.underlying.Write(clean)
+
}
+42 -28
spindle/engine/engine.go
···
"io"
"log/slog"
"os"
-
"path"
"strings"
"sync"
···
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
-
"tangled.sh/tangled.sh/core/api/tangled"
"tangled.sh/tangled.sh/core/log"
"tangled.sh/tangled.sh/core/notifier"
+
"tangled.sh/tangled.sh/core/spindle/config"
"tangled.sh/tangled.sh/core/spindle/db"
"tangled.sh/tangled.sh/core/spindle/models"
)
···
l *slog.Logger
db *db.DB
n *notifier.Notifier
+
cfg *config.Config
chanMu sync.RWMutex
stdoutChans map[string]chan string
···
cleanup map[string][]cleanupFunc
}
-
func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
+
func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return nil, err
···
l: l,
db: db,
n: n,
+
cfg: cfg,
}
e.stdoutChans = make(map[string]chan string, 100)
···
return e, nil
}
-
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, pipelineId models.PipelineId) {
+
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
wg := sync.WaitGroup{}
···
}
defer e.DestroyWorkflow(ctx, wid)
-
// TODO: actual checks for image/registry etc.
-
var deps string
-
for _, d := range w.Dependencies {
-
if d.Registry == "nixpkgs" {
-
deps = path.Join(d.Packages...)
-
}
-
}
-
-
// load defaults from somewhere else
-
deps = path.Join(deps, "bash", "git", "coreutils", "nix")
-
-
cimg := path.Join("nixery.dev", deps)
-
reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{})
+
reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{})
if err != nil {
e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
···
defer reader.Close()
io.Copy(os.Stdout, reader)
-
err = e.StartSteps(ctx, w.Steps, wid, cimg)
+
err = e.StartSteps(ctx, w.Steps, wid, w.Image)
if err != nil {
e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error())
-
err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
-
if err != nil {
-
return err
+
dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
+
if dbErr != nil {
+
return dbErr
}
return fmt.Errorf("starting steps image: %w", err)
···
// StartSteps starts all steps sequentially with the same base image.
// ONLY marks pipeline as failed if container's exit code is non-zero.
// All other errors are bubbled up.
-
func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, wid models.WorkflowId, image string) error {
+
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
// set up logging channels
e.chanMu.Lock()
if _, exists := e.stdoutChans[wid.String()]; !exists {
···
}
if state.ExitCode != 0 {
-
e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode)
-
return fmt.Errorf("%s", state.Error)
+
e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
+
err := e.db.StatusFailed(wid, state.Error, int64(state.ExitCode), e.n)
+
if err != nil {
+
return err
+
}
+
return fmt.Errorf("error: %s, exit code: %d, oom: %s", state.Error, state.ExitCode, state.OOMKilled)
}
}
···
Follow: true,
ShowStdout: true,
ShowStderr: true,
-
Details: false,
+
Details: true,
Timestamps: false,
})
if err != nil {
return err
}
+
var devOutput io.Writer = io.Discard
+
if e.cfg.Server.Dev {
+
devOutput = &ansiStrippingWriter{underlying: os.Stdout}
+
}
+
+
tee := io.TeeReader(logs, devOutput)
+
// using StdCopy we demux logs and stream stdout and stderr to different
// channels.
//
···
rpipeOut, wpipeOut := io.Pipe()
rpipeErr, wpipeErr := io.Pipe()
+
wg := sync.WaitGroup{}
+
+
wg.Add(1)
go func() {
+
defer wg.Done()
defer wpipeOut.Close()
defer wpipeErr.Close()
-
_, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs)
+
_, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee)
if err != nil && err != io.EOF {
e.l.Error("failed to copy logs", "error", err)
}
···
// read from stdout and send to stdout pipe
// NOTE: the stdoutCh channnel is closed further up in StartSteps
// once all steps are done.
+
wg.Add(1)
go func() {
+
defer wg.Done()
e.chanMu.RLock()
stdoutCh := e.stdoutChans[wid.String()]
e.chanMu.RUnlock()
···
// read from stderr and send to stderr pipe
// NOTE: the stderrCh channnel is closed further up in StartSteps
// once all steps are done.
+
wg.Add(1)
go func() {
+
defer wg.Done()
e.chanMu.RLock()
stderrCh := e.stderrChans[wid.String()]
e.chanMu.RUnlock()
···
}
}()
+
wg.Wait()
+
return nil
}
···
Source: nixVolume(wid),
Target: "/nix",
},
+
{
+
Type: mount.TypeTmpfs,
+
Target: "/tmp",
+
},
},
-
ReadonlyRootfs: true,
+
ReadonlyRootfs: false,
CapDrop: []string{"ALL"},
-
SecurityOpt: []string{"no-new-privileges"},
+
SecurityOpt: []string{"seccomp=unconfined"},
}
return hostConfig
+4 -8
spindle/engine/envs.go
···
import (
"fmt"
-
-
"tangled.sh/tangled.sh/core/api/tangled"
)
type EnvVars []string
// ConstructEnvs converts a tangled.Pipeline_Step_Environment_Elem.{Key,Value}
// representation into a docker-friendly []string{"KEY=value", ...} slice.
-
func ConstructEnvs(envs []*tangled.Pipeline_Step_Environment_Elem) EnvVars {
+
func ConstructEnvs(envs map[string]string) EnvVars {
var dockerEnvs EnvVars
-
for _, env := range envs {
-
if env != nil {
-
ev := fmt.Sprintf("%s=%s", env.Key, env.Value)
-
dockerEnvs = append(dockerEnvs, ev)
-
}
+
for k, v := range envs {
+
ev := fmt.Sprintf("%s=%s", k, v)
+
dockerEnvs = append(dockerEnvs, ev)
}
return dockerEnvs
}
+4 -19
spindle/engine/envs_test.go
···
import (
"reflect"
"testing"
-
-
"tangled.sh/tangled.sh/core/api/tangled"
)
func TestConstructEnvs(t *testing.T) {
tests := []struct {
name string
-
in []*tangled.Pipeline_Step_Environment_Elem
+
in map[string]string
want EnvVars
}{
{
name: "empty input",
-
in: []*tangled.Pipeline_Step_Environment_Elem{},
+
in: make(map[string]string),
want: EnvVars{},
},
{
name: "single env var",
-
in: []*tangled.Pipeline_Step_Environment_Elem{
-
{Key: "FOO", Value: "bar"},
-
},
+
in: map[string]string{"FOO": "bar"},
want: EnvVars{"FOO=bar"},
},
{
name: "multiple env vars",
-
in: []*tangled.Pipeline_Step_Environment_Elem{
-
{Key: "FOO", Value: "bar"},
-
{Key: "BAZ", Value: "qux"},
-
},
+
in: map[string]string{"FOO": "bar", "BAZ": "qux"},
want: EnvVars{"FOO=bar", "BAZ=qux"},
},
-
{
-
name: "nil entries are skipped",
-
in: []*tangled.Pipeline_Step_Environment_Elem{
-
nil,
-
{Key: "FOO", Value: "bar"},
-
},
-
want: EnvVars{"FOO=bar"},
-
},
}
for _, tt := range tests {
+12 -10
spindle/server.go
···
n := notifier.New()
-
eng, err := engine.New(ctx, d, &n)
+
eng, err := engine.New(ctx, cfg, d, &n)
if err != nil {
return err
}
···
func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
if msg.Nsid == tangled.PipelineNSID {
-
pipeline := tangled.Pipeline{}
-
err := json.Unmarshal(msg.EventJson, &pipeline)
+
tpl := tangled.Pipeline{}
+
err := json.Unmarshal(msg.EventJson, &tpl)
if err != nil {
fmt.Println("error unmarshalling", err)
return err
}
-
if pipeline.TriggerMetadata == nil {
+
if tpl.TriggerMetadata == nil {
return fmt.Errorf("no trigger metadata found")
}
-
if pipeline.TriggerMetadata.Repo == nil {
+
if tpl.TriggerMetadata.Repo == nil {
return fmt.Errorf("no repo data found")
}
// filter by repos
_, err = s.db.GetRepo(
-
pipeline.TriggerMetadata.Repo.Knot,
-
pipeline.TriggerMetadata.Repo.Did,
-
pipeline.TriggerMetadata.Repo.Repo,
+
tpl.TriggerMetadata.Repo.Knot,
+
tpl.TriggerMetadata.Repo.Did,
+
tpl.TriggerMetadata.Repo.Repo,
)
if err != nil {
return err
···
Rkey: msg.Rkey,
}
-
for _, w := range pipeline.Workflows {
+
for _, w := range tpl.Workflows {
if w != nil {
err := s.db.StatusPending(models.WorkflowId{
PipelineId: pipelineId,
···
}
}
+
spl := models.ToPipeline(tpl, s.cfg.Server.Dev)
+
ok := s.jq.Enqueue(queue.Job{
Run: func() error {
-
s.eng.StartWorkflows(ctx, &pipeline, pipelineId)
+
s.eng.StartWorkflows(ctx, spl, pipelineId)
return nil
},
OnFail: func(jobError error) {