From 596cca869c619e7f74eff54eaa978780a3755021 Mon Sep 17 00:00:00 2001 From: Evan Jarrett Date: Fri, 7 Nov 2025 23:26:45 -0600 Subject: [PATCH] spindle/server: refactor spindle code to allow calling functions separate from Run() Change-Id: zvmnzxxqzzvtwrxxyunlxonoyuspnnsn Signed-off-by: Evan Jarrett --- spindle/server.go | 125 +++++++++++++++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 40 deletions(-) diff --git a/spindle/server.go b/spindle/server.go index f329d17b..0805f7d4 100644 --- a/spindle/server.go +++ b/spindle/server.go @@ -49,22 +49,18 @@ type Spindle struct { vault secrets.Manager } -func Run(ctx context.Context) error { +// New creates a new Spindle server with the provided configuration and engines. +func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { logger := log.FromContext(ctx) - cfg, err := config.Load(ctx) - if err != nil { - return fmt.Errorf("failed to load config: %w", err) - } - d, err := db.Make(cfg.Server.DBPath) if err != nil { - return fmt.Errorf("failed to setup db: %w", err) + return nil, fmt.Errorf("failed to setup db: %w", err) } e, err := rbac.NewEnforcer(cfg.Server.DBPath) if err != nil { - return fmt.Errorf("failed to setup rbac enforcer: %w", err) + return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) } e.E.EnableAutoSave(true) @@ -74,7 +70,7 @@ func Run(ctx context.Context) error { switch cfg.Server.Secrets.Provider { case "openbao": if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { - return fmt.Errorf("openbao proxy address is required when using openbao secrets provider") + return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") } vault, err = secrets.NewOpenBaoManager( cfg.Server.Secrets.OpenBao.ProxyAddr, @@ -82,22 +78,17 @@ func Run(ctx context.Context) error { secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), ) if err != nil { - return fmt.Errorf("failed to setup openbao secrets provider: %w", err) + return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) } logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) case "sqlite", "": vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) if err != nil { - return fmt.Errorf("failed to setup sqlite secrets provider: %w", err) + return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) } logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) default: - return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) - } - - nixeryEng, err := nixery.New(ctx, cfg) - if err != nil { - return err + return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) } jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) @@ -110,14 +101,14 @@ func Run(ctx context.Context) error { } jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) if err != nil { - return fmt.Errorf("failed to setup jetstream client: %w", err) + return nil, fmt.Errorf("failed to setup jetstream client: %w", err) } jc.AddDid(cfg.Server.Owner) // Check if the spindle knows about any Dids; dids, err := d.GetAllDids() if err != nil { - return fmt.Errorf("failed to get all dids: %w", err) + return nil, fmt.Errorf("failed to get all dids: %w", err) } for _, d := range dids { jc.AddDid(d) @@ -125,13 +116,13 @@ func Run(ctx context.Context) error { resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) - spindle := Spindle{ + spindle := &Spindle{ jc: jc, e: e, db: d, l: logger, n: &n, - engs: map[string]models.Engine{"nixery": nixeryEng}, + engs: engines, jq: jq, cfg: cfg, res: resolver, @@ -140,31 +131,22 @@ func Run(ctx context.Context) error { err = e.AddSpindle(rbacDomain) if err != nil { - return fmt.Errorf("failed to set rbac domain: %w", err) + return nil, fmt.Errorf("failed to set rbac domain: %w", err) } err = spindle.configureOwner() if err != nil { - return err + return nil, err } logger.Info("owner set", "did", cfg.Server.Owner) - // starts a job queue runner in the background - jq.Start() - defer jq.Stop() - - // Stop vault token renewal if it implements Stopper - if stopper, ok := vault.(secrets.Stopper); ok { - defer stopper.Stop() - } - cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) if err != nil { - return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) + return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) } err = jc.StartJetstream(ctx, spindle.ingest()) if err != nil { - return fmt.Errorf("failed to start jetstream consumer: %w", err) + return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) } // for each incoming sh.tangled.pipeline, we execute @@ -177,7 +159,7 @@ func Run(ctx context.Context) error { ccfg.CursorStore = cursorStore knownKnots, err := d.Knots() if err != nil { - return err + return nil, err } for _, knot := range knownKnots { logger.Info("adding source start", "knot", knot) @@ -185,15 +167,78 @@ func Run(ctx context.Context) error { } spindle.ks = eventconsumer.NewConsumer(*ccfg) + return spindle, nil +} + +// DB returns the database instance. +func (s *Spindle) DB() *db.DB { + return s.db +} + +// Queue returns the job queue instance. +func (s *Spindle) Queue() *queue.Queue { + return s.jq +} + +// Engines returns the map of available engines. +func (s *Spindle) Engines() map[string]models.Engine { + return s.engs +} + +// Vault returns the secrets manager instance. +func (s *Spindle) Vault() secrets.Manager { + return s.vault +} + +// Notifier returns the notifier instance. +func (s *Spindle) Notifier() *notifier.Notifier { + return s.n +} + +// Enforcer returns the RBAC enforcer instance. +func (s *Spindle) Enforcer() *rbac.Enforcer { + return s.e +} + +// Start starts the Spindle server (blocking). +func (s *Spindle) Start(ctx context.Context) error { + // starts a job queue runner in the background + s.jq.Start() + defer s.jq.Stop() + + // Stop vault token renewal if it implements Stopper + if stopper, ok := s.vault.(secrets.Stopper); ok { + defer stopper.Stop() + } + go func() { - logger.Info("starting knot event consumer") - spindle.ks.Start(ctx) + s.l.Info("starting knot event consumer") + s.ks.Start(ctx) }() - logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) - logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) + s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) + return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) +} - return nil +func Run(ctx context.Context) error { + cfg, err := config.Load(ctx) + if err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + + nixeryEng, err := nixery.New(ctx, cfg) + if err != nil { + return err + } + + s, err := New(ctx, cfg, map[string]models.Engine{ + "nixery": nixeryEng, + }) + if err != nil { + return err + } + + return s.Start(ctx) } func (s *Spindle) Router() http.Handler { -- 2.43.0