···
52
-
func Run(ctx context.Context) error {
52
+
// New creates a new Spindle server with the provided configuration and engines.
53
+
func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
logger := log.FromContext(ctx)
55
-
cfg, err := config.Load(ctx)
57
-
return fmt.Errorf("failed to load config: %w", err)
d, err := db.Make(cfg.Server.DBPath)
62
-
return fmt.Errorf("failed to setup db: %w", err)
58
+
return nil, fmt.Errorf("failed to setup db: %w", err)
e, err := rbac.NewEnforcer(cfg.Server.DBPath)
67
-
return fmt.Errorf("failed to setup rbac enforcer: %w", err)
63
+
return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
···
switch cfg.Server.Secrets.Provider {
if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
77
-
return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
73
+
return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
vault, err = secrets.NewOpenBaoManager(
cfg.Server.Secrets.OpenBao.ProxyAddr,
···
secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
85
-
return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
81
+
return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
91
-
return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
87
+
return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
95
-
return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
98
-
nixeryEng, err := nixery.New(ctx, cfg)
91
+
return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
···
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
113
-
return fmt.Errorf("failed to setup jetstream client: %w", err)
104
+
return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
jc.AddDid(cfg.Server.Owner)
// Check if the spindle knows about any Dids;
dids, err := d.GetAllDids()
120
-
return fmt.Errorf("failed to get all dids: %w", err)
111
+
return nil, fmt.Errorf("failed to get all dids: %w", err)
···
resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
128
-
spindle := Spindle{
119
+
spindle := &Spindle{
134
-
engs: map[string]models.Engine{"nixery": nixeryEng},
···
err = e.AddSpindle(rbacDomain)
143
-
return fmt.Errorf("failed to set rbac domain: %w", err)
134
+
return nil, fmt.Errorf("failed to set rbac domain: %w", err)
err = spindle.configureOwner()
logger.Info("owner set", "did", cfg.Server.Owner)
151
-
// starts a job queue runner in the background
155
-
// Stop vault token renewal if it implements Stopper
156
-
if stopper, ok := vault.(secrets.Stopper); ok {
157
-
defer stopper.Stop()
cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
162
-
return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
144
+
return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
err = jc.StartJetstream(ctx, spindle.ingest())
167
-
return fmt.Errorf("failed to start jetstream consumer: %w", err)
149
+
return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
// for each incoming sh.tangled.pipeline, we execute
···
ccfg.CursorStore = cursorStore
knownKnots, err := d.Knots()
for _, knot := range knownKnots {
logger.Info("adding source start", "knot", knot)
···
spindle.ks = eventconsumer.NewConsumer(*ccfg)
170
+
return spindle, nil
173
+
// DB returns the database instance.
174
+
func (s *Spindle) DB() *db.DB {
178
+
// Queue returns the job queue instance.
179
+
func (s *Spindle) Queue() *queue.Queue {
183
+
// Engines returns the map of available engines.
184
+
func (s *Spindle) Engines() map[string]models.Engine {
188
+
// Vault returns the secrets manager instance.
189
+
func (s *Spindle) Vault() secrets.Manager {
193
+
// Notifier returns the notifier instance.
194
+
func (s *Spindle) Notifier() *notifier.Notifier {
198
+
// Enforcer returns the RBAC enforcer instance.
199
+
func (s *Spindle) Enforcer() *rbac.Enforcer {
203
+
// Start starts the Spindle server (blocking).
204
+
func (s *Spindle) Start(ctx context.Context) error {
205
+
// starts a job queue runner in the background
209
+
// Stop vault token renewal if it implements Stopper
210
+
if stopper, ok := s.vault.(secrets.Stopper); ok {
211
+
defer stopper.Stop()
189
-
logger.Info("starting knot event consumer")
190
-
spindle.ks.Start(ctx)
215
+
s.l.Info("starting knot event consumer")
193
-
logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
194
-
logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
219
+
s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
220
+
return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
223
+
func Run(ctx context.Context) error {
224
+
cfg, err := config.Load(ctx)
226
+
return fmt.Errorf("failed to load config: %w", err)
229
+
nixeryEng, err := nixery.New(ctx, cfg)
234
+
s, err := New(ctx, cfg, map[string]models.Engine{
235
+
"nixery": nixeryEng,
241
+
return s.Start(ctx)
func (s *Spindle) Router() http.Handler {