forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package engine 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log/slog" 8 9 securejoin "github.com/cyphar/filepath-securejoin" 10 "golang.org/x/sync/errgroup" 11 "tangled.sh/tangled.sh/core/notifier" 12 "tangled.sh/tangled.sh/core/spindle/config" 13 "tangled.sh/tangled.sh/core/spindle/db" 14 "tangled.sh/tangled.sh/core/spindle/models" 15 "tangled.sh/tangled.sh/core/spindle/secrets" 16) 17 18var ( 19 ErrTimedOut = errors.New("timed out") 20 ErrWorkflowFailed = errors.New("workflow failed") 21) 22 23func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 24 l.Info("starting all workflows in parallel", "pipeline", pipelineId) 25 26 // extract secrets 27 var allSecrets []secrets.UnlockedSecret 28 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil { 29 if res, err := vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil { 30 allSecrets = res 31 } 32 } 33 34 eg, ctx := errgroup.WithContext(ctx) 35 for eng, wfs := range pipeline.Workflows { 36 workflowTimeout := eng.WorkflowTimeout() 37 l.Info("using workflow timeout", "timeout", workflowTimeout) 38 39 for _, w := range wfs { 40 eg.Go(func() error { 41 wid := models.WorkflowId{ 42 PipelineId: pipelineId, 43 Name: w.Name, 44 } 45 46 err := db.StatusRunning(wid, n) 47 if err != nil { 48 return err 49 } 50 51 err = eng.SetupWorkflow(ctx, wid, &w) 52 if err != nil { 53 // TODO(winter): Should this always set StatusFailed? 54 // In the original, we only do in a subset of cases. 55 l.Error("setting up worklow", "wid", wid, "err", err) 56 57 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 58 if dbErr != nil { 59 return dbErr 60 } 61 return err 62 } 63 defer eng.DestroyWorkflow(ctx, wid) 64 65 wfLogger, err := models.NewWorkflowLogger(cfg.Server.LogDir, wid) 66 if err != nil { 67 l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 68 wfLogger = nil 69 } else { 70 defer wfLogger.Close() 71 } 72 73 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 74 defer cancel() 75 76 for stepIdx, step := range w.Steps { 77 if wfLogger != nil { 78 ctl := wfLogger.ControlWriter(stepIdx, step) 79 ctl.Write([]byte(step.Name())) 80 } 81 82 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger) 83 if err != nil { 84 if errors.Is(err, ErrTimedOut) { 85 dbErr := db.StatusTimeout(wid, n) 86 if dbErr != nil { 87 return dbErr 88 } 89 } else { 90 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 91 if dbErr != nil { 92 return dbErr 93 } 94 } 95 96 return fmt.Errorf("starting steps image: %w", err) 97 } 98 } 99 100 err = db.StatusSuccess(wid, n) 101 if err != nil { 102 return err 103 } 104 105 return nil 106 }) 107 } 108 } 109 110 if err := eg.Wait(); err != nil { 111 l.Error("failed to run one or more workflows", "err", err) 112 } else { 113 l.Error("successfully ran full pipeline") 114 } 115}