1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/eventconsumer"
14 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
15 "tangled.sh/tangled.sh/core/idresolver"
16 "tangled.sh/tangled.sh/core/jetstream"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/notifier"
19 "tangled.sh/tangled.sh/core/rbac"
20 "tangled.sh/tangled.sh/core/spindle/config"
21 "tangled.sh/tangled.sh/core/spindle/db"
22 "tangled.sh/tangled.sh/core/spindle/engine"
23 "tangled.sh/tangled.sh/core/spindle/engines/nixery"
24 "tangled.sh/tangled.sh/core/spindle/models"
25 "tangled.sh/tangled.sh/core/spindle/queue"
26 "tangled.sh/tangled.sh/core/spindle/secrets"
27 "tangled.sh/tangled.sh/core/spindle/xrpc"
28 "tangled.sh/tangled.sh/core/xrpc/serviceauth"
29)
30
31//go:embed motd
32var motd []byte
33
34const (
35 rbacDomain = "thisserver"
36)
37
38type Spindle struct {
39 jc *jetstream.JetstreamClient
40 db *db.DB
41 e *rbac.Enforcer
42 l *slog.Logger
43 n *notifier.Notifier
44 engs map[string]models.Engine
45 jq *queue.Queue
46 cfg *config.Config
47 ks *eventconsumer.Consumer
48 res *idresolver.Resolver
49 vault secrets.Manager
50}
51
52func Run(ctx context.Context) error {
53 logger := log.FromContext(ctx)
54
55 cfg, err := config.Load(ctx)
56 if err != nil {
57 return fmt.Errorf("failed to load config: %w", err)
58 }
59
60 d, err := db.Make(cfg.Server.DBPath)
61 if err != nil {
62 return fmt.Errorf("failed to setup db: %w", err)
63 }
64
65 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
66 if err != nil {
67 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
68 }
69 e.E.EnableAutoSave(true)
70
71 n := notifier.New()
72
73 var vault secrets.Manager
74 switch cfg.Server.Secrets.Provider {
75 case "openbao":
76 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
77 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
78 }
79 vault, err = secrets.NewOpenBaoManager(
80 cfg.Server.Secrets.OpenBao.ProxyAddr,
81 logger,
82 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
83 )
84 if err != nil {
85 return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
86 }
87 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
88 case "sqlite", "":
89 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
90 if err != nil {
91 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
92 }
93 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
94 default:
95 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
96 }
97
98 nixeryEng, err := nixery.New(ctx, cfg)
99 if err != nil {
100 return err
101 }
102
103 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
104 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
105
106 collections := []string{
107 tangled.SpindleMemberNSID,
108 tangled.RepoNSID,
109 tangled.RepoCollaboratorNSID,
110 }
111 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
112 if err != nil {
113 return fmt.Errorf("failed to setup jetstream client: %w", err)
114 }
115 jc.AddDid(cfg.Server.Owner)
116
117 // Check if the spindle knows about any Dids;
118 dids, err := d.GetAllDids()
119 if err != nil {
120 return fmt.Errorf("failed to get all dids: %w", err)
121 }
122 for _, d := range dids {
123 jc.AddDid(d)
124 }
125
126 resolver := idresolver.DefaultResolver()
127
128 spindle := Spindle{
129 jc: jc,
130 e: e,
131 db: d,
132 l: logger,
133 n: &n,
134 engs: map[string]models.Engine{"nixery": nixeryEng},
135 jq: jq,
136 cfg: cfg,
137 res: resolver,
138 vault: vault,
139 }
140
141 err = e.AddSpindle(rbacDomain)
142 if err != nil {
143 return fmt.Errorf("failed to set rbac domain: %w", err)
144 }
145 err = spindle.configureOwner()
146 if err != nil {
147 return err
148 }
149 logger.Info("owner set", "did", cfg.Server.Owner)
150
151 // starts a job queue runner in the background
152 jq.Start()
153 defer jq.Stop()
154
155 // Stop vault token renewal if it implements Stopper
156 if stopper, ok := vault.(secrets.Stopper); ok {
157 defer stopper.Stop()
158 }
159
160 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
161 if err != nil {
162 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
163 }
164
165 err = jc.StartJetstream(ctx, spindle.ingest())
166 if err != nil {
167 return fmt.Errorf("failed to start jetstream consumer: %w", err)
168 }
169
170 // for each incoming sh.tangled.pipeline, we execute
171 // spindle.processPipeline, which in turn enqueues the pipeline
172 // job in the above registered queue.
173 ccfg := eventconsumer.NewConsumerConfig()
174 ccfg.Logger = logger
175 ccfg.Dev = cfg.Server.Dev
176 ccfg.ProcessFunc = spindle.processPipeline
177 ccfg.CursorStore = cursorStore
178 knownKnots, err := d.Knots()
179 if err != nil {
180 return err
181 }
182 for _, knot := range knownKnots {
183 logger.Info("adding source start", "knot", knot)
184 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
185 }
186 spindle.ks = eventconsumer.NewConsumer(*ccfg)
187
188 go func() {
189 logger.Info("starting knot event consumer")
190 spindle.ks.Start(ctx)
191 }()
192
193 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
194 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
195
196 return nil
197}
198
199func (s *Spindle) Router() http.Handler {
200 mux := chi.NewRouter()
201
202 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
203 w.Write(motd)
204 })
205 mux.HandleFunc("/events", s.Events)
206 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
207 w.Write([]byte(s.cfg.Server.Owner))
208 })
209 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
210
211 mux.Mount("/xrpc", s.XrpcRouter())
212 return mux
213}
214
215func (s *Spindle) XrpcRouter() http.Handler {
216 logger := s.l.With("route", "xrpc")
217
218 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
219
220 x := xrpc.Xrpc{
221 Logger: logger,
222 Db: s.db,
223 Enforcer: s.e,
224 Engines: s.engs,
225 Config: s.cfg,
226 Resolver: s.res,
227 Vault: s.vault,
228 ServiceAuth: serviceAuth,
229 }
230
231 return x.Router()
232}
233
234func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
235 if msg.Nsid == tangled.PipelineNSID {
236 tpl := tangled.Pipeline{}
237 err := json.Unmarshal(msg.EventJson, &tpl)
238 if err != nil {
239 fmt.Println("error unmarshalling", err)
240 return err
241 }
242
243 if tpl.TriggerMetadata == nil {
244 return fmt.Errorf("no trigger metadata found")
245 }
246
247 if tpl.TriggerMetadata.Repo == nil {
248 return fmt.Errorf("no repo data found")
249 }
250
251 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
252 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
253 }
254
255 // filter by repos
256 _, err = s.db.GetRepo(
257 tpl.TriggerMetadata.Repo.Knot,
258 tpl.TriggerMetadata.Repo.Did,
259 tpl.TriggerMetadata.Repo.Repo,
260 )
261 if err != nil {
262 return err
263 }
264
265 pipelineId := models.PipelineId{
266 Knot: src.Key(),
267 Rkey: msg.Rkey,
268 }
269
270 workflows := make(map[models.Engine][]models.Workflow)
271
272 for _, w := range tpl.Workflows {
273 if w != nil {
274 if _, ok := s.engs[w.Engine]; !ok {
275 err = s.db.StatusFailed(models.WorkflowId{
276 PipelineId: pipelineId,
277 Name: w.Name,
278 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
279 if err != nil {
280 return err
281 }
282
283 continue
284 }
285
286 eng := s.engs[w.Engine]
287
288 if _, ok := workflows[eng]; !ok {
289 workflows[eng] = []models.Workflow{}
290 }
291
292 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
293 if err != nil {
294 return err
295 }
296
297 workflows[eng] = append(workflows[eng], *ewf)
298
299 err = s.db.StatusPending(models.WorkflowId{
300 PipelineId: pipelineId,
301 Name: w.Name,
302 }, s.n)
303 if err != nil {
304 return err
305 }
306 }
307 }
308
309 ok := s.jq.Enqueue(queue.Job{
310 Run: func() error {
311 engine.StartWorkflows(s.l, s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
312 RepoOwner: tpl.TriggerMetadata.Repo.Did,
313 RepoName: tpl.TriggerMetadata.Repo.Repo,
314 Workflows: workflows,
315 }, pipelineId)
316 return nil
317 },
318 OnFail: func(jobError error) {
319 s.l.Error("pipeline run failed", "error", jobError)
320 },
321 })
322 if ok {
323 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
324 } else {
325 s.l.Error("failed to enqueue pipeline: queue is full")
326 }
327 }
328
329 return nil
330}
331
332func (s *Spindle) configureOwner() error {
333 cfgOwner := s.cfg.Server.Owner
334
335 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
336 if err != nil {
337 return err
338 }
339
340 switch len(existing) {
341 case 0:
342 // no owner configured, continue
343 case 1:
344 // find existing owner
345 existingOwner := existing[0]
346
347 // no ownership change, this is okay
348 if existingOwner == s.cfg.Server.Owner {
349 break
350 }
351
352 // remove existing owner
353 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
354 if err != nil {
355 return nil
356 }
357 default:
358 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
359 }
360
361 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
362}