spindle: pass secrets to engine as env vars #377

merged
opened by oppi.li targeting master from push-vynsusnqpmus
Changed files
+61 -32
spindle
+42 -19
spindle/engine/engine.go
···
"sync"
"time"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/mount"
···
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"tangled.sh/tangled.sh/core/log"
"tangled.sh/tangled.sh/core/notifier"
"tangled.sh/tangled.sh/core/spindle/config"
"tangled.sh/tangled.sh/core/spindle/db"
"tangled.sh/tangled.sh/core/spindle/models"
)
const (
···
db *db.DB
n *notifier.Notifier
cfg *config.Config
cleanupMu sync.Mutex
cleanup map[string][]cleanupFunc
}
-
func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return nil, err
···
db: db,
n: n,
cfg: cfg,
}
e.cleanup = make(map[string][]cleanupFunc)
···
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
-
wg := sync.WaitGroup{}
for _, w := range pipeline.Workflows {
-
wg.Add(1)
-
go func() error {
-
defer wg.Done()
wid := models.WorkflowId{
PipelineId: pipelineId,
Name: w.Name,
···
defer reader.Close()
io.Copy(os.Stdout, reader)
-
workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout
-
workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
-
if err != nil {
-
e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
-
workflowTimeout = 5 * time.Minute
-
}
-
e.l.Info("using workflow timeout", "timeout", workflowTimeout)
ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
defer cancel()
-
err = e.StartSteps(ctx, w.Steps, wid, w.Image)
if err != nil {
if errors.Is(err, ErrTimedOut) {
dbErr := e.db.StatusTimeout(wid, e.n)
···
}
return nil
-
}()
}
-
wg.Wait()
}
// SetupWorkflow sets up a new network for the workflow and volumes for
···
// ONLY marks pipeline as failed if container's exit code is non-zero.
// All other errors are bubbled up.
// Fixed version of the step execution logic
-
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
-
for stepIdx, step := range steps {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
-
envs := ConstructEnvs(step.Environment)
envs.AddEnv("HOME", workspaceDir)
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
hostConfig := hostConfig(wid)
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
-
Image: image,
Cmd: []string{"bash", "-c", step.Command},
WorkingDir: workspaceDir,
Tty: false,
···
"sync"
"time"
+
securejoin "github.com/cyphar/filepath-securejoin"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/mount"
···
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
+
"golang.org/x/sync/errgroup"
"tangled.sh/tangled.sh/core/log"
"tangled.sh/tangled.sh/core/notifier"
"tangled.sh/tangled.sh/core/spindle/config"
"tangled.sh/tangled.sh/core/spindle/db"
"tangled.sh/tangled.sh/core/spindle/models"
+
"tangled.sh/tangled.sh/core/spindle/secrets"
)
const (
···
db *db.DB
n *notifier.Notifier
cfg *config.Config
+
vault secrets.Manager
cleanupMu sync.Mutex
cleanup map[string][]cleanupFunc
}
+
func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier, vault secrets.Manager) (*Engine, error) {
dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return nil, err
···
db: db,
n: n,
cfg: cfg,
+
vault: vault,
}
e.cleanup = make(map[string][]cleanupFunc)
···
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
+
// extract secrets
+
var allSecrets []secrets.UnlockedSecret
+
if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil {
+
if res, err := e.vault.GetSecretsUnlocked(secrets.DidSlashRepo(didSlashRepo)); err == nil {
+
allSecrets = res
+
}
+
}
+
+
workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout
+
workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
+
if err != nil {
+
e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
+
workflowTimeout = 5 * time.Minute
+
}
+
e.l.Info("using workflow timeout", "timeout", workflowTimeout)
+
+
eg, ctx := errgroup.WithContext(ctx)
for _, w := range pipeline.Workflows {
+
eg.Go(func() error {
wid := models.WorkflowId{
PipelineId: pipelineId,
Name: w.Name,
···
defer reader.Close()
io.Copy(os.Stdout, reader)
ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
defer cancel()
+
err = e.StartSteps(ctx, wid, w, allSecrets)
if err != nil {
if errors.Is(err, ErrTimedOut) {
dbErr := e.db.StatusTimeout(wid, e.n)
···
}
return nil
+
})
}
+
if err = eg.Wait(); err != nil {
+
e.l.Error("failed to run one or more workflows", "err", err)
+
} else {
+
e.l.Error("successfully ran full pipeline")
+
}
}
// SetupWorkflow sets up a new network for the workflow and volumes for
···
// ONLY marks pipeline as failed if container's exit code is non-zero.
// All other errors are bubbled up.
// Fixed version of the step execution logic
+
func (e *Engine) StartSteps(ctx context.Context, wid models.WorkflowId, w models.Workflow, secrets []secrets.UnlockedSecret) error {
+
workflowEnvs := ConstructEnvs(w.Environment)
+
for _, s := range secrets {
+
workflowEnvs.AddEnv(s.Key, s.Value)
+
}
+
for stepIdx, step := range w.Steps {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
+
envs := append(EnvVars(nil), workflowEnvs...)
+
for k, v := range step.Environment {
+
envs.AddEnv(k, v)
+
}
envs.AddEnv("HOME", workspaceDir)
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
hostConfig := hostConfig(wid)
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
+
Image: w.Image,
Cmd: []string{"bash", "-c", step.Command},
WorkingDir: workspaceDir,
Tty: false,
+9 -12
spindle/models/pipeline.go
···
)
type Pipeline struct {
Workflows []Workflow
}
···
swf.Environment = workflowEnvToMap(twf.Environment)
swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery)
-
swf.addNixProfileToPath()
-
swf.setGlobalEnvs()
setup := &setupSteps{}
setup.addStep(nixConfStep())
···
workflows = append(workflows, *swf)
}
-
return &Pipeline{Workflows: workflows}
}
func workflowEnvToMap(envs []*tangled.Pipeline_Pair) map[string]string {
···
return path.Join(nixery, dependencies)
}
-
-
func (wf *Workflow) addNixProfileToPath() {
-
wf.Environment["PATH"] = "$PATH:/.nix-profile/bin"
-
}
-
-
func (wf *Workflow) setGlobalEnvs() {
-
wf.Environment["NIX_CONFIG"] = "experimental-features = nix-command flakes"
-
wf.Environment["HOME"] = "/tangled/workspace"
-
}
···
)
type Pipeline struct {
+
RepoOwner string
+
RepoName string
Workflows []Workflow
}
···
swf.Environment = workflowEnvToMap(twf.Environment)
swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery)
setup := &setupSteps{}
setup.addStep(nixConfStep())
···
workflows = append(workflows, *swf)
}
+
repoOwner := pl.TriggerMetadata.Repo.Did
+
repoName := pl.TriggerMetadata.Repo.Repo
+
return &Pipeline{
+
RepoOwner: repoOwner,
+
RepoName: repoName,
+
Workflows: workflows,
+
}
}
func workflowEnvToMap(envs []*tangled.Pipeline_Pair) map[string]string {
···
return path.Join(nixery, dependencies)
}
+3
spindle/models/setup_steps.go
···
continue
}
// collect packages from custom registries
for _, pkg := range packages {
customPackages = append(customPackages, fmt.Sprintf("'%s#%s'", registry, pkg))
···
continue
}
+
if len(packages) == 0 {
+
customPackages = append(customPackages, registry)
+
}
// collect packages from custom registries
for _, pkg := range packages {
customPackages = append(customPackages, fmt.Sprintf("'%s#%s'", registry, pkg))
+7 -1
spindle/server.go
···
n := notifier.New()
-
eng, err := engine.New(ctx, cfg, d, &n)
if err != nil {
return err
}
···
n := notifier.New()
+
// TODO: add hashicorp vault provider and choose here
+
vault, err := secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
+
if err != nil {
+
return fmt.Errorf("failed to setup secrets provider: %w", err)
+
}
+
+
eng, err := engine.New(ctx, cfg, d, &n, vault)
if err != nil {
return err
}