spindle/server: refactor spindle code to allow calling functions separate from Run() #779

merged
opened by evan.jarrett.net targeting master from evan.jarrett.net/core: spindle
Changed files
+85 -40
spindle
+85 -40
spindle/server.go
···
vault secrets.Manager
}
-
func Run(ctx context.Context) error {
+
// New creates a new Spindle server with the provided configuration and engines.
+
func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
logger := log.FromContext(ctx)
-
cfg, err := config.Load(ctx)
-
if err != nil {
-
return fmt.Errorf("failed to load config: %w", err)
-
}
-
d, err := db.Make(cfg.Server.DBPath)
if err != nil {
-
return fmt.Errorf("failed to setup db: %w", err)
+
return nil, fmt.Errorf("failed to setup db: %w", err)
}
e, err := rbac.NewEnforcer(cfg.Server.DBPath)
if err != nil {
-
return fmt.Errorf("failed to setup rbac enforcer: %w", err)
+
return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
}
e.E.EnableAutoSave(true)
···
switch cfg.Server.Secrets.Provider {
case "openbao":
if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
-
return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
+
return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
}
vault, err = secrets.NewOpenBaoManager(
cfg.Server.Secrets.OpenBao.ProxyAddr,
···
secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
)
if err != nil {
-
return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
+
return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
}
logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
case "sqlite", "":
vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
if err != nil {
-
return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
+
return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
}
logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
default:
-
return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
-
}
-
-
nixeryEng, err := nixery.New(ctx, cfg)
-
if err != nil {
-
return err
+
return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
}
jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
···
}
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
if err != nil {
-
return fmt.Errorf("failed to setup jetstream client: %w", err)
+
return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
}
jc.AddDid(cfg.Server.Owner)
// Check if the spindle knows about any Dids;
dids, err := d.GetAllDids()
if err != nil {
-
return fmt.Errorf("failed to get all dids: %w", err)
+
return nil, fmt.Errorf("failed to get all dids: %w", err)
}
for _, d := range dids {
jc.AddDid(d)
···
resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
-
spindle := Spindle{
+
spindle := &Spindle{
jc: jc,
e: e,
db: d,
l: logger,
n: &n,
-
engs: map[string]models.Engine{"nixery": nixeryEng},
+
engs: engines,
jq: jq,
cfg: cfg,
res: resolver,
···
err = e.AddSpindle(rbacDomain)
if err != nil {
-
return fmt.Errorf("failed to set rbac domain: %w", err)
+
return nil, fmt.Errorf("failed to set rbac domain: %w", err)
}
err = spindle.configureOwner()
if err != nil {
-
return err
+
return nil, err
}
logger.Info("owner set", "did", cfg.Server.Owner)
-
// starts a job queue runner in the background
-
jq.Start()
-
defer jq.Stop()
-
-
// Stop vault token renewal if it implements Stopper
-
if stopper, ok := vault.(secrets.Stopper); ok {
-
defer stopper.Stop()
-
}
-
cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
if err != nil {
-
return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
+
return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
}
err = jc.StartJetstream(ctx, spindle.ingest())
if err != nil {
-
return fmt.Errorf("failed to start jetstream consumer: %w", err)
+
return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
}
// for each incoming sh.tangled.pipeline, we execute
···
ccfg.CursorStore = cursorStore
knownKnots, err := d.Knots()
if err != nil {
-
return err
+
return nil, err
}
for _, knot := range knownKnots {
logger.Info("adding source start", "knot", knot)
···
}
spindle.ks = eventconsumer.NewConsumer(*ccfg)
+
return spindle, nil
+
}
+
+
// DB returns the database instance.
+
func (s *Spindle) DB() *db.DB {
+
return s.db
+
}
+
+
// Queue returns the job queue instance.
+
func (s *Spindle) Queue() *queue.Queue {
+
return s.jq
+
}
+
+
// Engines returns the map of available engines.
+
func (s *Spindle) Engines() map[string]models.Engine {
+
return s.engs
+
}
+
+
// Vault returns the secrets manager instance.
+
func (s *Spindle) Vault() secrets.Manager {
+
return s.vault
+
}
+
+
// Notifier returns the notifier instance.
+
func (s *Spindle) Notifier() *notifier.Notifier {
+
return s.n
+
}
+
+
// Enforcer returns the RBAC enforcer instance.
+
func (s *Spindle) Enforcer() *rbac.Enforcer {
+
return s.e
+
}
+
+
// Start starts the Spindle server (blocking).
+
func (s *Spindle) Start(ctx context.Context) error {
+
// starts a job queue runner in the background
+
s.jq.Start()
+
defer s.jq.Stop()
+
+
// Stop vault token renewal if it implements Stopper
+
if stopper, ok := s.vault.(secrets.Stopper); ok {
+
defer stopper.Stop()
+
}
+
go func() {
-
logger.Info("starting knot event consumer")
-
spindle.ks.Start(ctx)
+
s.l.Info("starting knot event consumer")
+
s.ks.Start(ctx)
}()
-
logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
-
logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
+
s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
+
return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
+
}
-
return nil
+
func Run(ctx context.Context) error {
+
cfg, err := config.Load(ctx)
+
if err != nil {
+
return fmt.Errorf("failed to load config: %w", err)
+
}
+
+
nixeryEng, err := nixery.New(ctx, cfg)
+
if err != nil {
+
return err
+
}
+
+
s, err := New(ctx, cfg, map[string]models.Engine{
+
"nixery": nixeryEng,
+
})
+
if err != nil {
+
return err
+
}
+
+
return s.Start(ctx)
}
func (s *Spindle) Router() http.Handler {