1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/eventconsumer"
14 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
15 "tangled.sh/tangled.sh/core/idresolver"
16 "tangled.sh/tangled.sh/core/jetstream"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/notifier"
19 "tangled.sh/tangled.sh/core/rbac"
20 "tangled.sh/tangled.sh/core/spindle/config"
21 "tangled.sh/tangled.sh/core/spindle/db"
22 "tangled.sh/tangled.sh/core/spindle/engine"
23 "tangled.sh/tangled.sh/core/spindle/engines/nixery"
24 "tangled.sh/tangled.sh/core/spindle/models"
25 "tangled.sh/tangled.sh/core/spindle/queue"
26 "tangled.sh/tangled.sh/core/spindle/secrets"
27 "tangled.sh/tangled.sh/core/spindle/xrpc"
28)
29
30//go:embed motd
31var motd []byte
32
33const (
34 rbacDomain = "thisserver"
35)
36
37type Spindle struct {
38 jc *jetstream.JetstreamClient
39 db *db.DB
40 e *rbac.Enforcer
41 l *slog.Logger
42 n *notifier.Notifier
43 engs map[string]models.Engine
44 jq *queue.Queue
45 cfg *config.Config
46 ks *eventconsumer.Consumer
47 res *idresolver.Resolver
48 vault secrets.Manager
49}
50
51func Run(ctx context.Context) error {
52 logger := log.FromContext(ctx)
53
54 cfg, err := config.Load(ctx)
55 if err != nil {
56 return fmt.Errorf("failed to load config: %w", err)
57 }
58
59 d, err := db.Make(cfg.Server.DBPath)
60 if err != nil {
61 return fmt.Errorf("failed to setup db: %w", err)
62 }
63
64 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
65 if err != nil {
66 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
67 }
68 e.E.EnableAutoSave(true)
69
70 n := notifier.New()
71
72 var vault secrets.Manager
73 switch cfg.Server.Secrets.Provider {
74 case "openbao":
75 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
76 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
77 }
78 vault, err = secrets.NewOpenBaoManager(
79 cfg.Server.Secrets.OpenBao.ProxyAddr,
80 logger,
81 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
82 )
83 if err != nil {
84 return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
85 }
86 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
87 case "sqlite", "":
88 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
89 if err != nil {
90 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
91 }
92 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
93 default:
94 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
95 }
96
97 nixeryEng, err := nixery.New(ctx, cfg)
98 if err != nil {
99 return err
100 }
101
102 jq := queue.NewQueue(100, 5)
103
104 collections := []string{
105 tangled.SpindleMemberNSID,
106 tangled.RepoNSID,
107 tangled.RepoCollaboratorNSID,
108 }
109 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
110 if err != nil {
111 return fmt.Errorf("failed to setup jetstream client: %w", err)
112 }
113 jc.AddDid(cfg.Server.Owner)
114
115 // Check if the spindle knows about any Dids;
116 dids, err := d.GetAllDids()
117 if err != nil {
118 return fmt.Errorf("failed to get all dids: %w", err)
119 }
120 for _, d := range dids {
121 jc.AddDid(d)
122 }
123
124 resolver := idresolver.DefaultResolver()
125
126 spindle := Spindle{
127 jc: jc,
128 e: e,
129 db: d,
130 l: logger,
131 n: &n,
132 engs: map[string]models.Engine{"nixery": nixeryEng},
133 jq: jq,
134 cfg: cfg,
135 res: resolver,
136 vault: vault,
137 }
138
139 err = e.AddSpindle(rbacDomain)
140 if err != nil {
141 return fmt.Errorf("failed to set rbac domain: %w", err)
142 }
143 err = spindle.configureOwner()
144 if err != nil {
145 return err
146 }
147 logger.Info("owner set", "did", cfg.Server.Owner)
148
149 // starts a job queue runner in the background
150 jq.Start()
151 defer jq.Stop()
152
153 // Stop vault token renewal if it implements Stopper
154 if stopper, ok := vault.(secrets.Stopper); ok {
155 defer stopper.Stop()
156 }
157
158 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
159 if err != nil {
160 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
161 }
162
163 err = jc.StartJetstream(ctx, spindle.ingest())
164 if err != nil {
165 return fmt.Errorf("failed to start jetstream consumer: %w", err)
166 }
167
168 // for each incoming sh.tangled.pipeline, we execute
169 // spindle.processPipeline, which in turn enqueues the pipeline
170 // job in the above registered queue.
171 ccfg := eventconsumer.NewConsumerConfig()
172 ccfg.Logger = logger
173 ccfg.Dev = cfg.Server.Dev
174 ccfg.ProcessFunc = spindle.processPipeline
175 ccfg.CursorStore = cursorStore
176 knownKnots, err := d.Knots()
177 if err != nil {
178 return err
179 }
180 for _, knot := range knownKnots {
181 logger.Info("adding source start", "knot", knot)
182 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
183 }
184 spindle.ks = eventconsumer.NewConsumer(*ccfg)
185
186 go func() {
187 logger.Info("starting knot event consumer")
188 spindle.ks.Start(ctx)
189 }()
190
191 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
192 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
193
194 return nil
195}
196
197func (s *Spindle) Router() http.Handler {
198 mux := chi.NewRouter()
199
200 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
201 w.Write(motd)
202 })
203 mux.HandleFunc("/events", s.Events)
204 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
205 w.Write([]byte(s.cfg.Server.Owner))
206 })
207 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
208
209 mux.Mount("/xrpc", s.XrpcRouter())
210 return mux
211}
212
213func (s *Spindle) XrpcRouter() http.Handler {
214 logger := s.l.With("route", "xrpc")
215
216 x := xrpc.Xrpc{
217 Logger: logger,
218 Db: s.db,
219 Enforcer: s.e,
220 Engines: s.engs,
221 Config: s.cfg,
222 Resolver: s.res,
223 Vault: s.vault,
224 }
225
226 return x.Router()
227}
228
229func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
230 if msg.Nsid == tangled.PipelineNSID {
231 tpl := tangled.Pipeline{}
232 err := json.Unmarshal(msg.EventJson, &tpl)
233 if err != nil {
234 fmt.Println("error unmarshalling", err)
235 return err
236 }
237
238 if tpl.TriggerMetadata == nil {
239 return fmt.Errorf("no trigger metadata found")
240 }
241
242 if tpl.TriggerMetadata.Repo == nil {
243 return fmt.Errorf("no repo data found")
244 }
245
246 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
247 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
248 }
249
250 // filter by repos
251 _, err = s.db.GetRepo(
252 tpl.TriggerMetadata.Repo.Knot,
253 tpl.TriggerMetadata.Repo.Did,
254 tpl.TriggerMetadata.Repo.Repo,
255 )
256 if err != nil {
257 return err
258 }
259
260 pipelineId := models.PipelineId{
261 Knot: src.Key(),
262 Rkey: msg.Rkey,
263 }
264
265 workflows := make(map[models.Engine][]models.Workflow)
266
267 for _, w := range tpl.Workflows {
268 if w != nil {
269 if _, ok := s.engs[w.Engine]; !ok {
270 err = s.db.StatusFailed(models.WorkflowId{
271 PipelineId: pipelineId,
272 Name: w.Name,
273 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
274 if err != nil {
275 return err
276 }
277
278 continue
279 }
280
281 eng := s.engs[w.Engine]
282
283 if _, ok := workflows[eng]; !ok {
284 workflows[eng] = []models.Workflow{}
285 }
286
287 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
288 if err != nil {
289 return err
290 }
291
292 workflows[eng] = append(workflows[eng], *ewf)
293
294 err = s.db.StatusPending(models.WorkflowId{
295 PipelineId: pipelineId,
296 Name: w.Name,
297 }, s.n)
298 if err != nil {
299 return err
300 }
301 }
302 }
303
304 ok := s.jq.Enqueue(queue.Job{
305 Run: func() error {
306 engine.StartWorkflows(s.l, s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
307 RepoOwner: tpl.TriggerMetadata.Repo.Did,
308 RepoName: tpl.TriggerMetadata.Repo.Repo,
309 Workflows: workflows,
310 }, pipelineId)
311 return nil
312 },
313 OnFail: func(jobError error) {
314 s.l.Error("pipeline run failed", "error", jobError)
315 },
316 })
317 if ok {
318 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
319 } else {
320 s.l.Error("failed to enqueue pipeline: queue is full")
321 }
322 }
323
324 return nil
325}
326
327func (s *Spindle) configureOwner() error {
328 cfgOwner := s.cfg.Server.Owner
329
330 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
331 if err != nil {
332 return err
333 }
334
335 switch len(existing) {
336 case 0:
337 // no owner configured, continue
338 case 1:
339 // find existing owner
340 existingOwner := existing[0]
341
342 // no ownership change, this is okay
343 if existingOwner == s.cfg.Server.Owner {
344 break
345 }
346
347 // remove existing owner
348 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
349 if err != nil {
350 return nil
351 }
352 default:
353 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
354 }
355
356 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
357}