forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 10 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 12 "github.com/go-chi/chi/v5" 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/eventconsumer" 15 "tangled.org/core/eventconsumer/cursor" 16 "tangled.org/core/idresolver" 17 "tangled.org/core/jetstream" 18 "tangled.org/core/log" 19 "tangled.org/core/notifier" 20 "tangled.org/core/rbac" 21 "tangled.org/core/spindle/config" 22 "tangled.org/core/spindle/db" 23 "tangled.org/core/spindle/engine" 24 "tangled.org/core/spindle/engines/nixery" 25 "tangled.org/core/spindle/models" 26 "tangled.org/core/spindle/queue" 27 "tangled.org/core/spindle/secrets" 28 "tangled.org/core/spindle/xrpc" 29 "tangled.org/core/xrpc/serviceauth" 30) 31 32//go:embed motd 33var motd []byte 34 35const ( 36 rbacDomain = "thisserver" 37) 38 39type Spindle struct { 40 jc *jetstream.JetstreamClient 41 db *db.DB 42 e *rbac.Enforcer 43 l *slog.Logger 44 n *notifier.Notifier 45 engs map[string]models.Engine 46 jq *queue.Queue 47 cfg *config.Config 48 ks *eventconsumer.Consumer 49 res *idresolver.Resolver 50 vault secrets.Manager 51} 52 53// New creates a new Spindle server with the provided configuration and engines. 54func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 55 logger := log.FromContext(ctx) 56 57 d, err := db.Make(cfg.Server.DBPath) 58 if err != nil { 59 return nil, fmt.Errorf("failed to setup db: %w", err) 60 } 61 62 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 63 if err != nil { 64 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 65 } 66 e.E.EnableAutoSave(true) 67 68 n := notifier.New() 69 70 var vault secrets.Manager 71 switch cfg.Server.Secrets.Provider { 72 case "openbao": 73 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 74 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 75 } 76 vault, err = secrets.NewOpenBaoManager( 77 cfg.Server.Secrets.OpenBao.ProxyAddr, 78 logger, 79 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 80 ) 81 if err != nil { 82 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 83 } 84 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 85 case "sqlite", "": 86 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 87 if err != nil { 88 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 89 } 90 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 91 default: 92 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 93 } 94 95 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 96 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 97 98 collections := []string{ 99 tangled.SpindleMemberNSID, 100 tangled.RepoNSID, 101 tangled.RepoCollaboratorNSID, 102 } 103 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 104 if err != nil { 105 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 106 } 107 jc.AddDid(cfg.Server.Owner) 108 109 // Check if the spindle knows about any Dids; 110 dids, err := d.GetAllDids() 111 if err != nil { 112 return nil, fmt.Errorf("failed to get all dids: %w", err) 113 } 114 for _, d := range dids { 115 jc.AddDid(d) 116 } 117 118 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 119 120 spindle := &Spindle{ 121 jc: jc, 122 e: e, 123 db: d, 124 l: logger, 125 n: &n, 126 engs: engines, 127 jq: jq, 128 cfg: cfg, 129 res: resolver, 130 vault: vault, 131 } 132 133 err = e.AddSpindle(rbacDomain) 134 if err != nil { 135 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 136 } 137 err = spindle.configureOwner() 138 if err != nil { 139 return nil, err 140 } 141 logger.Info("owner set", "did", cfg.Server.Owner) 142 143 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 144 if err != nil { 145 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 146 } 147 148 err = jc.StartJetstream(ctx, spindle.ingest()) 149 if err != nil { 150 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 151 } 152 153 // for each incoming sh.tangled.pipeline, we execute 154 // spindle.processPipeline, which in turn enqueues the pipeline 155 // job in the above registered queue. 156 ccfg := eventconsumer.NewConsumerConfig() 157 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 158 ccfg.Dev = cfg.Server.Dev 159 ccfg.ProcessFunc = spindle.processPipeline 160 ccfg.CursorStore = cursorStore 161 knownKnots, err := d.Knots() 162 if err != nil { 163 return nil, err 164 } 165 for _, knot := range knownKnots { 166 logger.Info("adding source start", "knot", knot) 167 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 168 } 169 spindle.ks = eventconsumer.NewConsumer(*ccfg) 170 171 return spindle, nil 172} 173 174// DB returns the database instance. 175func (s *Spindle) DB() *db.DB { 176 return s.db 177} 178 179// Queue returns the job queue instance. 180func (s *Spindle) Queue() *queue.Queue { 181 return s.jq 182} 183 184// Engines returns the map of available engines. 185func (s *Spindle) Engines() map[string]models.Engine { 186 return s.engs 187} 188 189// Vault returns the secrets manager instance. 190func (s *Spindle) Vault() secrets.Manager { 191 return s.vault 192} 193 194// Notifier returns the notifier instance. 195func (s *Spindle) Notifier() *notifier.Notifier { 196 return s.n 197} 198 199// Enforcer returns the RBAC enforcer instance. 200func (s *Spindle) Enforcer() *rbac.Enforcer { 201 return s.e 202} 203 204// Start starts the Spindle server (blocking). 205func (s *Spindle) Start(ctx context.Context) error { 206 // starts a job queue runner in the background 207 s.jq.Start() 208 defer s.jq.Stop() 209 210 // Stop vault token renewal if it implements Stopper 211 if stopper, ok := s.vault.(secrets.Stopper); ok { 212 defer stopper.Stop() 213 } 214 215 go func() { 216 s.l.Info("starting knot event consumer") 217 s.ks.Start(ctx) 218 }() 219 220 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 221 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 222} 223 224func Run(ctx context.Context) error { 225 cfg, err := config.Load(ctx) 226 if err != nil { 227 return fmt.Errorf("failed to load config: %w", err) 228 } 229 230 nixeryEng, err := nixery.New(ctx, cfg) 231 if err != nil { 232 return err 233 } 234 235 s, err := New(ctx, cfg, map[string]models.Engine{ 236 "nixery": nixeryEng, 237 }) 238 if err != nil { 239 return err 240 } 241 242 return s.Start(ctx) 243} 244 245func (s *Spindle) Router() http.Handler { 246 mux := chi.NewRouter() 247 248 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 249 w.Write(motd) 250 }) 251 mux.HandleFunc("/events", s.Events) 252 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 253 254 mux.Mount("/xrpc", s.XrpcRouter()) 255 return mux 256} 257 258func (s *Spindle) XrpcRouter() http.Handler { 259 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 260 261 l := log.SubLogger(s.l, "xrpc") 262 263 x := xrpc.Xrpc{ 264 Logger: l, 265 Db: s.db, 266 Enforcer: s.e, 267 Engines: s.engs, 268 Config: s.cfg, 269 Resolver: s.res, 270 Vault: s.vault, 271 ServiceAuth: serviceAuth, 272 } 273 274 return x.Router() 275} 276 277func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 278 if msg.Nsid == tangled.PipelineNSID { 279 tpl := tangled.Pipeline{} 280 err := json.Unmarshal(msg.EventJson, &tpl) 281 if err != nil { 282 fmt.Println("error unmarshalling", err) 283 return err 284 } 285 286 if tpl.TriggerMetadata == nil { 287 return fmt.Errorf("no trigger metadata found") 288 } 289 290 if tpl.TriggerMetadata.Repo == nil { 291 return fmt.Errorf("no repo data found") 292 } 293 294 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 295 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 296 } 297 298 // filter by repos 299 _, err = s.db.GetRepo( 300 tpl.TriggerMetadata.Repo.Knot, 301 tpl.TriggerMetadata.Repo.Did, 302 tpl.TriggerMetadata.Repo.Repo, 303 ) 304 if err != nil { 305 return err 306 } 307 308 pipelineId := models.PipelineId{ 309 Knot: src.Key(), 310 Rkey: msg.Rkey, 311 } 312 313 workflows := make(map[models.Engine][]models.Workflow) 314 315 // Build pipeline environment variables once for all workflows 316 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 317 318 for _, w := range tpl.Workflows { 319 if w != nil { 320 if _, ok := s.engs[w.Engine]; !ok { 321 err = s.db.StatusFailed(models.WorkflowId{ 322 PipelineId: pipelineId, 323 Name: w.Name, 324 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 325 if err != nil { 326 return err 327 } 328 329 continue 330 } 331 332 eng := s.engs[w.Engine] 333 334 if _, ok := workflows[eng]; !ok { 335 workflows[eng] = []models.Workflow{} 336 } 337 338 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 339 if err != nil { 340 return err 341 } 342 343 // inject TANGLED_* env vars after InitWorkflow 344 // This prevents user-defined env vars from overriding them 345 if ewf.Environment == nil { 346 ewf.Environment = make(map[string]string) 347 } 348 maps.Copy(ewf.Environment, pipelineEnv) 349 350 workflows[eng] = append(workflows[eng], *ewf) 351 352 err = s.db.StatusPending(models.WorkflowId{ 353 PipelineId: pipelineId, 354 Name: w.Name, 355 }, s.n) 356 if err != nil { 357 return err 358 } 359 } 360 } 361 362 ok := s.jq.Enqueue(queue.Job{ 363 Run: func() error { 364 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 365 RepoOwner: tpl.TriggerMetadata.Repo.Did, 366 RepoName: tpl.TriggerMetadata.Repo.Repo, 367 Workflows: workflows, 368 }, pipelineId) 369 return nil 370 }, 371 OnFail: func(jobError error) { 372 s.l.Error("pipeline run failed", "error", jobError) 373 }, 374 }) 375 if ok { 376 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 377 } else { 378 s.l.Error("failed to enqueue pipeline: queue is full") 379 } 380 } 381 382 return nil 383} 384 385func (s *Spindle) configureOwner() error { 386 cfgOwner := s.cfg.Server.Owner 387 388 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 389 if err != nil { 390 return err 391 } 392 393 switch len(existing) { 394 case 0: 395 // no owner configured, continue 396 case 1: 397 // find existing owner 398 existingOwner := existing[0] 399 400 // no ownership change, this is okay 401 if existingOwner == s.cfg.Server.Owner { 402 break 403 } 404 405 // remove existing owner 406 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 407 if err != nil { 408 return nil 409 } 410 default: 411 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 412 } 413 414 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 415}