forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "net/http" 9 10 "github.com/go-chi/chi/v5" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/jetstream" 13 "tangled.sh/tangled.sh/core/knotclient" 14 "tangled.sh/tangled.sh/core/knotclient/cursor" 15 "tangled.sh/tangled.sh/core/log" 16 "tangled.sh/tangled.sh/core/notifier" 17 "tangled.sh/tangled.sh/core/rbac" 18 "tangled.sh/tangled.sh/core/spindle/config" 19 "tangled.sh/tangled.sh/core/spindle/db" 20 "tangled.sh/tangled.sh/core/spindle/engine" 21 "tangled.sh/tangled.sh/core/spindle/models" 22 "tangled.sh/tangled.sh/core/spindle/queue" 23) 24 25type Spindle struct { 26 jc *jetstream.JetstreamClient 27 db *db.DB 28 e *rbac.Enforcer 29 l *slog.Logger 30 n *notifier.Notifier 31 eng *engine.Engine 32 jq *queue.Queue 33} 34 35func Run(ctx context.Context) error { 36 cfg, err := config.Load(ctx) 37 if err != nil { 38 return fmt.Errorf("failed to load config: %w", err) 39 } 40 41 d, err := db.Make(cfg.Server.DBPath) 42 if err != nil { 43 return fmt.Errorf("failed to setup db: %w", err) 44 } 45 46 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 47 if err != nil { 48 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 49 } 50 51 logger := log.FromContext(ctx) 52 53 collections := []string{tangled.SpindleMemberNSID} 54 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false) 55 if err != nil { 56 return fmt.Errorf("failed to setup jetstream client: %w", err) 57 } 58 59 n := notifier.New() 60 eng, err := engine.New(ctx, d, &n) 61 if err != nil { 62 return err 63 } 64 65 jq := queue.NewQueue(100, 2) 66 67 // starts a job queue runner in the background 68 jq.Start() 69 defer jq.Stop() 70 71 spindle := Spindle{ 72 jc: jc, 73 e: e, 74 db: d, 75 l: logger, 76 n: &n, 77 eng: eng, 78 jq: jq, 79 } 80 81 // for each incoming sh.tangled.pipeline, we execute 82 // spindle.processPipeline, which in turn enqueues the pipeline 83 // job in the above registered queue. 84 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 85 if err != nil { 86 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 87 } 88 go func() { 89 logger.Info("starting event consumer") 90 knotEventSource := knotclient.NewEventSource("localhost:6000") 91 92 ccfg := knotclient.NewConsumerConfig() 93 ccfg.Logger = logger 94 ccfg.Dev = cfg.Server.Dev 95 ccfg.ProcessFunc = spindle.processPipeline 96 ccfg.CursorStore = cursorStore 97 ccfg.AddEventSource(knotEventSource) 98 99 ec := knotclient.NewEventConsumer(*ccfg) 100 101 ec.Start(ctx) 102 }() 103 104 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 105 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 106 107 return nil 108} 109 110func (s *Spindle) Router() http.Handler { 111 mux := chi.NewRouter() 112 113 mux.HandleFunc("/events", s.Events) 114 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 115 return mux 116} 117 118func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error { 119 if msg.Nsid == tangled.PipelineNSID { 120 pipeline := tangled.Pipeline{} 121 err := json.Unmarshal(msg.EventJson, &pipeline) 122 if err != nil { 123 fmt.Println("error unmarshalling", err) 124 return err 125 } 126 127 pipelineId := models.PipelineId{ 128 Knot: src.Knot, 129 Rkey: msg.Rkey, 130 } 131 132 for _, w := range pipeline.Workflows { 133 if w != nil { 134 err := s.db.StatusPending(models.WorkflowId{ 135 PipelineId: pipelineId, 136 Name: w.Name, 137 }, s.n) 138 if err != nil { 139 return err 140 } 141 } 142 } 143 144 ok := s.jq.Enqueue(queue.Job{ 145 Run: func() error { 146 s.eng.StartWorkflows(ctx, &pipeline, pipelineId) 147 return nil 148 }, 149 OnFail: func(jobError error) { 150 s.l.Error("pipeline run failed", "error", jobError) 151 }, 152 }) 153 if ok { 154 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 155 } else { 156 s.l.Error("failed to enqueue pipeline: queue is full") 157 } 158 } 159 160 return nil 161}