forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 10 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 11 "github.com/go-chi/chi/v5" 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/eventconsumer" 14 "tangled.org/core/eventconsumer/cursor" 15 "tangled.org/core/idresolver" 16 "tangled.org/core/jetstream" 17 "tangled.org/core/log" 18 "tangled.org/core/notifier" 19 "tangled.org/core/rbac" 20 "tangled.org/core/spindle/config" 21 "tangled.org/core/spindle/db" 22 "tangled.org/core/spindle/engine" 23 "tangled.org/core/spindle/engines/nixery" 24 "tangled.org/core/spindle/models" 25 "tangled.org/core/spindle/queue" 26 "tangled.org/core/spindle/secrets" 27 "tangled.org/core/spindle/xrpc" 28 "tangled.org/core/xrpc/serviceauth" 29) 30 31//go:embed motd 32var motd []byte 33 34const ( 35 rbacDomain = "thisserver" 36) 37 38type Spindle struct { 39 jc *jetstream.JetstreamClient 40 db *db.DB 41 e *rbac.Enforcer 42 l *slog.Logger 43 n *notifier.Notifier 44 engs map[string]models.Engine 45 jq *queue.Queue 46 cfg *config.Config 47 ks *eventconsumer.Consumer 48 res *idresolver.Resolver 49 vault secrets.Manager 50} 51 52// New creates a new Spindle server with the provided configuration and engines. 53func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 54 logger := log.FromContext(ctx) 55 56 d, err := db.Make(cfg.Server.DBPath) 57 if err != nil { 58 return nil, fmt.Errorf("failed to setup db: %w", err) 59 } 60 61 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 62 if err != nil { 63 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 64 } 65 e.E.EnableAutoSave(true) 66 67 n := notifier.New() 68 69 var vault secrets.Manager 70 switch cfg.Server.Secrets.Provider { 71 case "openbao": 72 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 73 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 74 } 75 vault, err = secrets.NewOpenBaoManager( 76 cfg.Server.Secrets.OpenBao.ProxyAddr, 77 logger, 78 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 79 ) 80 if err != nil { 81 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 82 } 83 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 84 case "sqlite", "": 85 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 86 if err != nil { 87 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 88 } 89 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 90 default: 91 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 92 } 93 94 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 95 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 96 97 collections := []string{ 98 tangled.SpindleMemberNSID, 99 tangled.RepoNSID, 100 tangled.RepoCollaboratorNSID, 101 } 102 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 103 if err != nil { 104 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 105 } 106 jc.AddDid(cfg.Server.Owner) 107 108 // Check if the spindle knows about any Dids; 109 dids, err := d.GetAllDids() 110 if err != nil { 111 return nil, fmt.Errorf("failed to get all dids: %w", err) 112 } 113 for _, d := range dids { 114 jc.AddDid(d) 115 } 116 117 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 118 119 spindle := &Spindle{ 120 jc: jc, 121 e: e, 122 db: d, 123 l: logger, 124 n: &n, 125 engs: engines, 126 jq: jq, 127 cfg: cfg, 128 res: resolver, 129 vault: vault, 130 } 131 132 err = e.AddSpindle(rbacDomain) 133 if err != nil { 134 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 135 } 136 err = spindle.configureOwner() 137 if err != nil { 138 return nil, err 139 } 140 logger.Info("owner set", "did", cfg.Server.Owner) 141 142 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 143 if err != nil { 144 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 145 } 146 147 err = jc.StartJetstream(ctx, spindle.ingest()) 148 if err != nil { 149 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 150 } 151 152 // for each incoming sh.tangled.pipeline, we execute 153 // spindle.processPipeline, which in turn enqueues the pipeline 154 // job in the above registered queue. 155 ccfg := eventconsumer.NewConsumerConfig() 156 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 157 ccfg.Dev = cfg.Server.Dev 158 ccfg.ProcessFunc = spindle.processPipeline 159 ccfg.CursorStore = cursorStore 160 knownKnots, err := d.Knots() 161 if err != nil { 162 return nil, err 163 } 164 for _, knot := range knownKnots { 165 logger.Info("adding source start", "knot", knot) 166 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 167 } 168 spindle.ks = eventconsumer.NewConsumer(*ccfg) 169 170 return spindle, nil 171} 172 173// DB returns the database instance. 174func (s *Spindle) DB() *db.DB { 175 return s.db 176} 177 178// Queue returns the job queue instance. 179func (s *Spindle) Queue() *queue.Queue { 180 return s.jq 181} 182 183// Engines returns the map of available engines. 184func (s *Spindle) Engines() map[string]models.Engine { 185 return s.engs 186} 187 188// Vault returns the secrets manager instance. 189func (s *Spindle) Vault() secrets.Manager { 190 return s.vault 191} 192 193// Notifier returns the notifier instance. 194func (s *Spindle) Notifier() *notifier.Notifier { 195 return s.n 196} 197 198// Enforcer returns the RBAC enforcer instance. 199func (s *Spindle) Enforcer() *rbac.Enforcer { 200 return s.e 201} 202 203// Start starts the Spindle server (blocking). 204func (s *Spindle) Start(ctx context.Context) error { 205 // starts a job queue runner in the background 206 s.jq.Start() 207 defer s.jq.Stop() 208 209 // Stop vault token renewal if it implements Stopper 210 if stopper, ok := s.vault.(secrets.Stopper); ok { 211 defer stopper.Stop() 212 } 213 214 go func() { 215 s.l.Info("starting knot event consumer") 216 s.ks.Start(ctx) 217 }() 218 219 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 220 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 221} 222 223func Run(ctx context.Context) error { 224 cfg, err := config.Load(ctx) 225 if err != nil { 226 return fmt.Errorf("failed to load config: %w", err) 227 } 228 229 nixeryEng, err := nixery.New(ctx, cfg) 230 if err != nil { 231 return err 232 } 233 234 s, err := New(ctx, cfg, map[string]models.Engine{ 235 "nixery": nixeryEng, 236 }) 237 if err != nil { 238 return err 239 } 240 241 return s.Start(ctx) 242} 243 244func (s *Spindle) Router() http.Handler { 245 mux := chi.NewRouter() 246 247 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 248 w.Write(motd) 249 }) 250 mux.HandleFunc("/events", s.Events) 251 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 252 253 mux.Mount("/xrpc", s.XrpcRouter()) 254 return mux 255} 256 257func (s *Spindle) XrpcRouter() http.Handler { 258 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 259 260 l := log.SubLogger(s.l, "xrpc") 261 262 x := xrpc.Xrpc{ 263 Logger: l, 264 Db: s.db, 265 Enforcer: s.e, 266 Engines: s.engs, 267 Config: s.cfg, 268 Resolver: s.res, 269 Vault: s.vault, 270 ServiceAuth: serviceAuth, 271 } 272 273 return x.Router() 274} 275 276func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 277 if msg.Nsid == tangled.PipelineNSID { 278 tpl := tangled.Pipeline{} 279 err := json.Unmarshal(msg.EventJson, &tpl) 280 if err != nil { 281 fmt.Println("error unmarshalling", err) 282 return err 283 } 284 285 if tpl.TriggerMetadata == nil { 286 return fmt.Errorf("no trigger metadata found") 287 } 288 289 if tpl.TriggerMetadata.Repo == nil { 290 return fmt.Errorf("no repo data found") 291 } 292 293 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 294 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 295 } 296 297 // filter by repos 298 _, err = s.db.GetRepo( 299 tpl.TriggerMetadata.Repo.Knot, 300 tpl.TriggerMetadata.Repo.Did, 301 tpl.TriggerMetadata.Repo.Repo, 302 ) 303 if err != nil { 304 return err 305 } 306 307 pipelineId := models.PipelineId{ 308 Knot: src.Key(), 309 Rkey: msg.Rkey, 310 } 311 312 workflows := make(map[models.Engine][]models.Workflow) 313 314 for _, w := range tpl.Workflows { 315 if w != nil { 316 if _, ok := s.engs[w.Engine]; !ok { 317 err = s.db.StatusFailed(models.WorkflowId{ 318 PipelineId: pipelineId, 319 Name: w.Name, 320 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 321 if err != nil { 322 return err 323 } 324 325 continue 326 } 327 328 eng := s.engs[w.Engine] 329 330 if _, ok := workflows[eng]; !ok { 331 workflows[eng] = []models.Workflow{} 332 } 333 334 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 335 if err != nil { 336 return err 337 } 338 339 workflows[eng] = append(workflows[eng], *ewf) 340 341 err = s.db.StatusPending(models.WorkflowId{ 342 PipelineId: pipelineId, 343 Name: w.Name, 344 }, s.n) 345 if err != nil { 346 return err 347 } 348 } 349 } 350 351 ok := s.jq.Enqueue(queue.Job{ 352 Run: func() error { 353 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 354 RepoOwner: tpl.TriggerMetadata.Repo.Did, 355 RepoName: tpl.TriggerMetadata.Repo.Repo, 356 Workflows: workflows, 357 }, pipelineId) 358 return nil 359 }, 360 OnFail: func(jobError error) { 361 s.l.Error("pipeline run failed", "error", jobError) 362 }, 363 }) 364 if ok { 365 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 366 } else { 367 s.l.Error("failed to enqueue pipeline: queue is full") 368 } 369 } 370 371 return nil 372} 373 374func (s *Spindle) configureOwner() error { 375 cfgOwner := s.cfg.Server.Owner 376 377 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 378 if err != nil { 379 return err 380 } 381 382 switch len(existing) { 383 case 0: 384 // no owner configured, continue 385 case 1: 386 // find existing owner 387 existingOwner := existing[0] 388 389 // no ownership change, this is okay 390 if existingOwner == s.cfg.Server.Owner { 391 break 392 } 393 394 // remove existing owner 395 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 396 if err != nil { 397 return nil 398 } 399 default: 400 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 401 } 402 403 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 404}