1package engine
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log/slog"
8
9 securejoin "github.com/cyphar/filepath-securejoin"
10 "golang.org/x/sync/errgroup"
11 "tangled.sh/tangled.sh/core/notifier"
12 "tangled.sh/tangled.sh/core/spindle/config"
13 "tangled.sh/tangled.sh/core/spindle/db"
14 "tangled.sh/tangled.sh/core/spindle/models"
15 "tangled.sh/tangled.sh/core/spindle/secrets"
16)
17
18var (
19 ErrTimedOut = errors.New("timed out")
20 ErrWorkflowFailed = errors.New("workflow failed")
21)
22
23func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
24 l.Info("starting all workflows in parallel", "pipeline", pipelineId)
25
26 // extract secrets
27 var allSecrets []secrets.UnlockedSecret
28 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil {
29 if res, err := vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil {
30 allSecrets = res
31 }
32 }
33
34 eg, ctx := errgroup.WithContext(ctx)
35 for eng, wfs := range pipeline.Workflows {
36 workflowTimeout := eng.WorkflowTimeout()
37 l.Info("using workflow timeout", "timeout", workflowTimeout)
38
39 for _, w := range wfs {
40 eg.Go(func() error {
41 wid := models.WorkflowId{
42 PipelineId: pipelineId,
43 Name: w.Name,
44 }
45
46 err := db.StatusRunning(wid, n)
47 if err != nil {
48 return err
49 }
50
51 err = eng.SetupWorkflow(ctx, wid, &w)
52 if err != nil {
53 // TODO(winter): Should this always set StatusFailed?
54 // In the original, we only do in a subset of cases.
55 l.Error("setting up worklow", "wid", wid, "err", err)
56
57 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
58 if dbErr != nil {
59 return dbErr
60 }
61 return err
62 }
63 defer eng.DestroyWorkflow(ctx, wid)
64
65 wfLogger, err := models.NewWorkflowLogger(cfg.Server.LogDir, wid)
66 if err != nil {
67 l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
68 wfLogger = nil
69 } else {
70 defer wfLogger.Close()
71 }
72
73 ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
74 defer cancel()
75
76 for stepIdx, step := range w.Steps {
77 if wfLogger != nil {
78 ctl := wfLogger.ControlWriter(stepIdx, step)
79 ctl.Write([]byte(step.Name()))
80 }
81
82 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger)
83 if err != nil {
84 if errors.Is(err, ErrTimedOut) {
85 dbErr := db.StatusTimeout(wid, n)
86 if dbErr != nil {
87 return dbErr
88 }
89 } else {
90 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
91 if dbErr != nil {
92 return dbErr
93 }
94 }
95
96 return fmt.Errorf("starting steps image: %w", err)
97 }
98 }
99
100 err = db.StatusSuccess(wid, n)
101 if err != nil {
102 return err
103 }
104
105 return nil
106 })
107 }
108 }
109
110 if err := eg.Wait(); err != nil {
111 l.Error("failed to run one or more workflows", "err", err)
112 } else {
113 l.Error("successfully ran full pipeline")
114 }
115}