forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "net/http" 9 10 "github.com/go-chi/chi/v5" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/jetstream" 13 "tangled.sh/tangled.sh/core/knotclient" 14 "tangled.sh/tangled.sh/core/knotclient/cursor" 15 "tangled.sh/tangled.sh/core/log" 16 "tangled.sh/tangled.sh/core/notifier" 17 "tangled.sh/tangled.sh/core/rbac" 18 "tangled.sh/tangled.sh/core/spindle/config" 19 "tangled.sh/tangled.sh/core/spindle/db" 20 "tangled.sh/tangled.sh/core/spindle/engine" 21 "tangled.sh/tangled.sh/core/spindle/models" 22 "tangled.sh/tangled.sh/core/spindle/queue" 23) 24 25const ( 26 rbacDomain = "thisserver" 27) 28 29type Spindle struct { 30 jc *jetstream.JetstreamClient 31 db *db.DB 32 e *rbac.Enforcer 33 l *slog.Logger 34 n *notifier.Notifier 35 eng *engine.Engine 36 jq *queue.Queue 37 cfg *config.Config 38} 39 40func Run(ctx context.Context) error { 41 logger := log.FromContext(ctx) 42 43 cfg, err := config.Load(ctx) 44 if err != nil { 45 return fmt.Errorf("failed to load config: %w", err) 46 } 47 48 d, err := db.Make(cfg.Server.DBPath) 49 if err != nil { 50 return fmt.Errorf("failed to setup db: %w", err) 51 } 52 53 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 54 if err != nil { 55 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 56 } 57 e.E.EnableAutoSave(true) 58 59 n := notifier.New() 60 61 eng, err := engine.New(ctx, d, &n) 62 if err != nil { 63 return err 64 } 65 66 jq := queue.NewQueue(100, 2) 67 68 collections := []string{tangled.SpindleMemberNSID} 69 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, false, false) 70 if err != nil { 71 return fmt.Errorf("failed to setup jetstream client: %w", err) 72 } 73 74 spindle := Spindle{ 75 jc: jc, 76 e: e, 77 db: d, 78 l: logger, 79 n: &n, 80 eng: eng, 81 jq: jq, 82 cfg: cfg, 83 } 84 85 err = e.AddDomain(rbacDomain) 86 if err != nil { 87 return fmt.Errorf("failed to set rbac domain: %w", err) 88 } 89 err = spindle.configureOwner() 90 if err != nil { 91 return err 92 } 93 logger.Info("owner set", "did", cfg.Server.Owner) 94 95 // starts a job queue runner in the background 96 jq.Start() 97 defer jq.Stop() 98 99 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 100 if err != nil { 101 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 102 } 103 104 err = jc.StartJetstream(ctx, spindle.ingest()) 105 if err != nil { 106 return fmt.Errorf("failed to start jetstream consumer: %w", err) 107 } 108 109 // for each incoming sh.tangled.pipeline, we execute 110 // spindle.processPipeline, which in turn enqueues the pipeline 111 // job in the above registered queue. 112 113 ccfg := knotclient.NewConsumerConfig() 114 ccfg.Logger = logger 115 ccfg.Dev = cfg.Server.Dev 116 ccfg.ProcessFunc = spindle.processPipeline 117 ccfg.CursorStore = cursorStore 118 for _, knot := range spindle.cfg.Knots { 119 kes := knotclient.NewEventSource(knot) 120 ccfg.AddEventSource(kes) 121 } 122 ec := knotclient.NewEventConsumer(*ccfg) 123 124 go func() { 125 logger.Info("starting knot event consumer", "knots", spindle.cfg.Knots) 126 ec.Start(ctx) 127 }() 128 129 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 130 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 131 132 return nil 133} 134 135func (s *Spindle) Router() http.Handler { 136 mux := chi.NewRouter() 137 138 mux.HandleFunc("/events", s.Events) 139 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 140 return mux 141} 142 143func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error { 144 if msg.Nsid == tangled.PipelineNSID { 145 pipeline := tangled.Pipeline{} 146 err := json.Unmarshal(msg.EventJson, &pipeline) 147 if err != nil { 148 fmt.Println("error unmarshalling", err) 149 return err 150 } 151 152 pipelineId := models.PipelineId{ 153 Knot: src.Knot, 154 Rkey: msg.Rkey, 155 } 156 157 for _, w := range pipeline.Workflows { 158 if w != nil { 159 err := s.db.StatusPending(models.WorkflowId{ 160 PipelineId: pipelineId, 161 Name: w.Name, 162 }, s.n) 163 if err != nil { 164 return err 165 } 166 } 167 } 168 169 ok := s.jq.Enqueue(queue.Job{ 170 Run: func() error { 171 s.eng.StartWorkflows(ctx, &pipeline, pipelineId) 172 return nil 173 }, 174 OnFail: func(jobError error) { 175 s.l.Error("pipeline run failed", "error", jobError) 176 }, 177 }) 178 if ok { 179 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 180 } else { 181 s.l.Error("failed to enqueue pipeline: queue is full") 182 } 183 } 184 185 return nil 186} 187 188func (s *Spindle) configureOwner() error { 189 cfgOwner := s.cfg.Server.Owner 190 serverOwner, err := s.e.GetUserByRole("server:owner", rbacDomain) 191 if err != nil { 192 return fmt.Errorf("failed to fetch server:owner: %w", err) 193 } 194 195 if len(serverOwner) == 0 { 196 s.e.AddOwner(rbacDomain, cfgOwner) 197 } else { 198 if serverOwner[0] != cfgOwner { 199 return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0]) 200 } 201 } 202 return nil 203}