1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/eventconsumer"
14 "tangled.org/core/eventconsumer/cursor"
15 "tangled.org/core/idresolver"
16 "tangled.org/core/jetstream"
17 "tangled.org/core/log"
18 "tangled.org/core/notifier"
19 "tangled.org/core/rbac"
20 "tangled.org/core/spindle/config"
21 "tangled.org/core/spindle/db"
22 "tangled.org/core/spindle/engine"
23 "tangled.org/core/spindle/engines/nixery"
24 "tangled.org/core/spindle/models"
25 "tangled.org/core/spindle/queue"
26 "tangled.org/core/spindle/secrets"
27 "tangled.org/core/spindle/xrpc"
28 "tangled.org/core/xrpc/serviceauth"
29)
30
31//go:embed motd
32var motd []byte
33
34const (
35 rbacDomain = "thisserver"
36)
37
38type Spindle struct {
39 jc *jetstream.JetstreamClient
40 db *db.DB
41 e *rbac.Enforcer
42 l *slog.Logger
43 n *notifier.Notifier
44 engs map[string]models.Engine
45 jq *queue.Queue
46 cfg *config.Config
47 ks *eventconsumer.Consumer
48 res *idresolver.Resolver
49 vault secrets.Manager
50}
51
52// New creates a new Spindle server with the provided configuration and engines.
53func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
54 logger := log.FromContext(ctx)
55
56 d, err := db.Make(cfg.Server.DBPath)
57 if err != nil {
58 return nil, fmt.Errorf("failed to setup db: %w", err)
59 }
60
61 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
62 if err != nil {
63 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
64 }
65 e.E.EnableAutoSave(true)
66
67 n := notifier.New()
68
69 var vault secrets.Manager
70 switch cfg.Server.Secrets.Provider {
71 case "openbao":
72 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
73 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
74 }
75 vault, err = secrets.NewOpenBaoManager(
76 cfg.Server.Secrets.OpenBao.ProxyAddr,
77 logger,
78 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
79 )
80 if err != nil {
81 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
82 }
83 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
84 case "sqlite", "":
85 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
86 if err != nil {
87 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
88 }
89 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
90 default:
91 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
92 }
93
94 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
95 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
96
97 collections := []string{
98 tangled.SpindleMemberNSID,
99 tangled.RepoNSID,
100 tangled.RepoCollaboratorNSID,
101 }
102 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
103 if err != nil {
104 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
105 }
106 jc.AddDid(cfg.Server.Owner)
107
108 // Check if the spindle knows about any Dids;
109 dids, err := d.GetAllDids()
110 if err != nil {
111 return nil, fmt.Errorf("failed to get all dids: %w", err)
112 }
113 for _, d := range dids {
114 jc.AddDid(d)
115 }
116
117 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
118
119 spindle := &Spindle{
120 jc: jc,
121 e: e,
122 db: d,
123 l: logger,
124 n: &n,
125 engs: engines,
126 jq: jq,
127 cfg: cfg,
128 res: resolver,
129 vault: vault,
130 }
131
132 err = e.AddSpindle(rbacDomain)
133 if err != nil {
134 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
135 }
136 err = spindle.configureOwner()
137 if err != nil {
138 return nil, err
139 }
140 logger.Info("owner set", "did", cfg.Server.Owner)
141
142 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
143 if err != nil {
144 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
145 }
146
147 err = jc.StartJetstream(ctx, spindle.ingest())
148 if err != nil {
149 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
150 }
151
152 // for each incoming sh.tangled.pipeline, we execute
153 // spindle.processPipeline, which in turn enqueues the pipeline
154 // job in the above registered queue.
155 ccfg := eventconsumer.NewConsumerConfig()
156 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
157 ccfg.Dev = cfg.Server.Dev
158 ccfg.ProcessFunc = spindle.processPipeline
159 ccfg.CursorStore = cursorStore
160 knownKnots, err := d.Knots()
161 if err != nil {
162 return nil, err
163 }
164 for _, knot := range knownKnots {
165 logger.Info("adding source start", "knot", knot)
166 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
167 }
168 spindle.ks = eventconsumer.NewConsumer(*ccfg)
169
170 return spindle, nil
171}
172
173// DB returns the database instance.
174func (s *Spindle) DB() *db.DB {
175 return s.db
176}
177
178// Queue returns the job queue instance.
179func (s *Spindle) Queue() *queue.Queue {
180 return s.jq
181}
182
183// Engines returns the map of available engines.
184func (s *Spindle) Engines() map[string]models.Engine {
185 return s.engs
186}
187
188// Vault returns the secrets manager instance.
189func (s *Spindle) Vault() secrets.Manager {
190 return s.vault
191}
192
193// Notifier returns the notifier instance.
194func (s *Spindle) Notifier() *notifier.Notifier {
195 return s.n
196}
197
198// Enforcer returns the RBAC enforcer instance.
199func (s *Spindle) Enforcer() *rbac.Enforcer {
200 return s.e
201}
202
203// Start starts the Spindle server (blocking).
204func (s *Spindle) Start(ctx context.Context) error {
205 // starts a job queue runner in the background
206 s.jq.Start()
207 defer s.jq.Stop()
208
209 // Stop vault token renewal if it implements Stopper
210 if stopper, ok := s.vault.(secrets.Stopper); ok {
211 defer stopper.Stop()
212 }
213
214 go func() {
215 s.l.Info("starting knot event consumer")
216 s.ks.Start(ctx)
217 }()
218
219 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
220 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
221}
222
223func Run(ctx context.Context) error {
224 cfg, err := config.Load(ctx)
225 if err != nil {
226 return fmt.Errorf("failed to load config: %w", err)
227 }
228
229 nixeryEng, err := nixery.New(ctx, cfg)
230 if err != nil {
231 return err
232 }
233
234 s, err := New(ctx, cfg, map[string]models.Engine{
235 "nixery": nixeryEng,
236 })
237 if err != nil {
238 return err
239 }
240
241 return s.Start(ctx)
242}
243
244func (s *Spindle) Router() http.Handler {
245 mux := chi.NewRouter()
246
247 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
248 w.Write(motd)
249 })
250 mux.HandleFunc("/events", s.Events)
251 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
252
253 mux.Mount("/xrpc", s.XrpcRouter())
254 return mux
255}
256
257func (s *Spindle) XrpcRouter() http.Handler {
258 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
259
260 l := log.SubLogger(s.l, "xrpc")
261
262 x := xrpc.Xrpc{
263 Logger: l,
264 Db: s.db,
265 Enforcer: s.e,
266 Engines: s.engs,
267 Config: s.cfg,
268 Resolver: s.res,
269 Vault: s.vault,
270 ServiceAuth: serviceAuth,
271 }
272
273 return x.Router()
274}
275
276func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
277 if msg.Nsid == tangled.PipelineNSID {
278 tpl := tangled.Pipeline{}
279 err := json.Unmarshal(msg.EventJson, &tpl)
280 if err != nil {
281 fmt.Println("error unmarshalling", err)
282 return err
283 }
284
285 if tpl.TriggerMetadata == nil {
286 return fmt.Errorf("no trigger metadata found")
287 }
288
289 if tpl.TriggerMetadata.Repo == nil {
290 return fmt.Errorf("no repo data found")
291 }
292
293 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
294 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
295 }
296
297 // filter by repos
298 _, err = s.db.GetRepo(
299 tpl.TriggerMetadata.Repo.Knot,
300 tpl.TriggerMetadata.Repo.Did,
301 tpl.TriggerMetadata.Repo.Repo,
302 )
303 if err != nil {
304 return err
305 }
306
307 pipelineId := models.PipelineId{
308 Knot: src.Key(),
309 Rkey: msg.Rkey,
310 }
311
312 workflows := make(map[models.Engine][]models.Workflow)
313
314 for _, w := range tpl.Workflows {
315 if w != nil {
316 if _, ok := s.engs[w.Engine]; !ok {
317 err = s.db.StatusFailed(models.WorkflowId{
318 PipelineId: pipelineId,
319 Name: w.Name,
320 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
321 if err != nil {
322 return err
323 }
324
325 continue
326 }
327
328 eng := s.engs[w.Engine]
329
330 if _, ok := workflows[eng]; !ok {
331 workflows[eng] = []models.Workflow{}
332 }
333
334 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
335 if err != nil {
336 return err
337 }
338
339 workflows[eng] = append(workflows[eng], *ewf)
340
341 err = s.db.StatusPending(models.WorkflowId{
342 PipelineId: pipelineId,
343 Name: w.Name,
344 }, s.n)
345 if err != nil {
346 return err
347 }
348 }
349 }
350
351 ok := s.jq.Enqueue(queue.Job{
352 Run: func() error {
353 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
354 RepoOwner: tpl.TriggerMetadata.Repo.Did,
355 RepoName: tpl.TriggerMetadata.Repo.Repo,
356 Workflows: workflows,
357 }, pipelineId)
358 return nil
359 },
360 OnFail: func(jobError error) {
361 s.l.Error("pipeline run failed", "error", jobError)
362 },
363 })
364 if ok {
365 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
366 } else {
367 s.l.Error("failed to enqueue pipeline: queue is full")
368 }
369 }
370
371 return nil
372}
373
374func (s *Spindle) configureOwner() error {
375 cfgOwner := s.cfg.Server.Owner
376
377 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
378 if err != nil {
379 return err
380 }
381
382 switch len(existing) {
383 case 0:
384 // no owner configured, continue
385 case 1:
386 // find existing owner
387 existingOwner := existing[0]
388
389 // no ownership change, this is okay
390 if existingOwner == s.cfg.Server.Owner {
391 break
392 }
393
394 // remove existing owner
395 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
396 if err != nil {
397 return nil
398 }
399 default:
400 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
401 }
402
403 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
404}