···
-
func Run(ctx context.Context) error {
logger := log.FromContext(ctx)
-
cfg, err := config.Load(ctx)
-
return fmt.Errorf("failed to load config: %w", err)
d, err := db.Make(cfg.Server.DBPath)
-
return fmt.Errorf("failed to setup db: %w", err)
e, err := rbac.NewEnforcer(cfg.Server.DBPath)
-
return fmt.Errorf("failed to setup rbac enforcer: %w", err)
···
switch cfg.Server.Secrets.Provider {
if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
-
return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
vault, err = secrets.NewOpenBaoManager(
cfg.Server.Secrets.OpenBao.ProxyAddr,
···
secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
-
return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
-
return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
-
return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
-
nixeryEng, err := nixery.New(ctx, cfg)
jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
···
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
-
return fmt.Errorf("failed to setup jetstream client: %w", err)
jc.AddDid(cfg.Server.Owner)
// Check if the spindle knows about any Dids;
dids, err := d.GetAllDids()
-
return fmt.Errorf("failed to get all dids: %w", err)
···
resolver := idresolver.DefaultResolver()
-
engs: map[string]models.Engine{"nixery": nixeryEng},
···
err = e.AddSpindle(rbacDomain)
-
return fmt.Errorf("failed to set rbac domain: %w", err)
err = spindle.configureOwner()
logger.Info("owner set", "did", cfg.Server.Owner)
-
// starts a job queue runner in the background
-
// Stop vault token renewal if it implements Stopper
-
if stopper, ok := vault.(secrets.Stopper); ok {
cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
-
return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
err = jc.StartJetstream(ctx, spindle.ingest())
-
return fmt.Errorf("failed to start jetstream consumer: %w", err)
// for each incoming sh.tangled.pipeline, we execute
···
ccfg.CursorStore = cursorStore
knownKnots, err := d.Knots()
for _, knot := range knownKnots {
logger.Info("adding source start", "knot", knot)
···
spindle.ks = eventconsumer.NewConsumer(*ccfg)
-
logger.Info("starting knot event consumer")
-
logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
-
logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
func (s *Spindle) Router() http.Handler {
···
+
// New creates a new Spindle server with the provided configuration and engines.
+
func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
logger := log.FromContext(ctx)
d, err := db.Make(cfg.Server.DBPath)
+
return nil, fmt.Errorf("failed to setup db: %w", err)
e, err := rbac.NewEnforcer(cfg.Server.DBPath)
+
return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
···
switch cfg.Server.Secrets.Provider {
if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
+
return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
vault, err = secrets.NewOpenBaoManager(
cfg.Server.Secrets.OpenBao.ProxyAddr,
···
secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
+
return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
+
return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
+
return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
···
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
+
return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
jc.AddDid(cfg.Server.Owner)
// Check if the spindle knows about any Dids;
dids, err := d.GetAllDids()
+
return nil, fmt.Errorf("failed to get all dids: %w", err)
···
resolver := idresolver.DefaultResolver()
···
err = e.AddSpindle(rbacDomain)
+
return nil, fmt.Errorf("failed to set rbac domain: %w", err)
err = spindle.configureOwner()
logger.Info("owner set", "did", cfg.Server.Owner)
cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
+
return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
err = jc.StartJetstream(ctx, spindle.ingest())
+
return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
// for each incoming sh.tangled.pipeline, we execute
···
ccfg.CursorStore = cursorStore
knownKnots, err := d.Knots()
for _, knot := range knownKnots {
logger.Info("adding source start", "knot", knot)
···
spindle.ks = eventconsumer.NewConsumer(*ccfg)
+
// DB returns the database instance.
+
func (s *Spindle) DB() *db.DB {
+
// Queue returns the job queue instance.
+
func (s *Spindle) Queue() *queue.Queue {
+
// Engines returns the map of available engines.
+
func (s *Spindle) Engines() map[string]models.Engine {
+
// Vault returns the secrets manager instance.
+
func (s *Spindle) Vault() secrets.Manager {
+
// Notifier returns the notifier instance.
+
func (s *Spindle) Notifier() *notifier.Notifier {
+
// Enforcer returns the RBAC enforcer instance.
+
func (s *Spindle) Enforcer() *rbac.Enforcer {
+
// Start starts the Spindle server (blocking).
+
func (s *Spindle) Start(ctx context.Context) error {
+
// starts a job queue runner in the background
+
// Stop vault token renewal if it implements Stopper
+
if stopper, ok := s.vault.(secrets.Stopper); ok {
+
s.l.Info("starting knot event consumer")
+
s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
+
return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
+
func Run(ctx context.Context) error {
+
cfg, err := config.Load(ctx)
+
return fmt.Errorf("failed to load config: %w", err)
+
nixeryEng, err := nixery.New(ctx, cfg)
+
s, err := New(ctx, cfg, map[string]models.Engine{
func (s *Spindle) Router() http.Handler {