1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/eventconsumer"
14 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
15 "tangled.sh/tangled.sh/core/idresolver"
16 "tangled.sh/tangled.sh/core/jetstream"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/notifier"
19 "tangled.sh/tangled.sh/core/rbac"
20 "tangled.sh/tangled.sh/core/spindle/config"
21 "tangled.sh/tangled.sh/core/spindle/db"
22 "tangled.sh/tangled.sh/core/spindle/engine"
23 "tangled.sh/tangled.sh/core/spindle/models"
24 "tangled.sh/tangled.sh/core/spindle/queue"
25 "tangled.sh/tangled.sh/core/spindle/secrets"
26 "tangled.sh/tangled.sh/core/spindle/xrpc"
27)
28
29//go:embed motd
30var motd []byte
31
32const (
33 rbacDomain = "thisserver"
34)
35
36type Spindle struct {
37 jc *jetstream.JetstreamClient
38 db *db.DB
39 e *rbac.Enforcer
40 l *slog.Logger
41 n *notifier.Notifier
42 eng *engine.Engine
43 jq *queue.Queue
44 cfg *config.Config
45 ks *eventconsumer.Consumer
46 res *idresolver.Resolver
47 vault secrets.Manager
48}
49
50func Run(ctx context.Context) error {
51 logger := log.FromContext(ctx)
52
53 cfg, err := config.Load(ctx)
54 if err != nil {
55 return fmt.Errorf("failed to load config: %w", err)
56 }
57
58 d, err := db.Make(cfg.Server.DBPath)
59 if err != nil {
60 return fmt.Errorf("failed to setup db: %w", err)
61 }
62
63 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
64 if err != nil {
65 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
66 }
67 e.E.EnableAutoSave(true)
68
69 n := notifier.New()
70
71 var vault secrets.Manager
72 switch cfg.Server.Secrets.Provider {
73 case "openbao":
74 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
75 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
76 }
77 vault, err = secrets.NewOpenBaoManager(
78 cfg.Server.Secrets.OpenBao.ProxyAddr,
79 logger,
80 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
81 )
82 if err != nil {
83 return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
84 }
85 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
86 case "sqlite", "":
87 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
88 if err != nil {
89 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
90 }
91 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
92 default:
93 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
94 }
95
96 eng, err := engine.New(ctx, cfg, d, &n, vault)
97 if err != nil {
98 return err
99 }
100
101 jq := queue.NewQueue(100, 5)
102
103 collections := []string{
104 tangled.SpindleMemberNSID,
105 tangled.RepoNSID,
106 tangled.RepoCollaboratorNSID,
107 }
108 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
109 if err != nil {
110 return fmt.Errorf("failed to setup jetstream client: %w", err)
111 }
112 jc.AddDid(cfg.Server.Owner)
113
114 // Check if the spindle knows about any Dids;
115 dids, err := d.GetAllDids()
116 if err != nil {
117 return fmt.Errorf("failed to get all dids: %w", err)
118 }
119 for _, d := range dids {
120 jc.AddDid(d)
121 }
122
123 resolver := idresolver.DefaultResolver()
124
125 spindle := Spindle{
126 jc: jc,
127 e: e,
128 db: d,
129 l: logger,
130 n: &n,
131 eng: eng,
132 jq: jq,
133 cfg: cfg,
134 res: resolver,
135 vault: vault,
136 }
137
138 err = e.AddSpindle(rbacDomain)
139 if err != nil {
140 return fmt.Errorf("failed to set rbac domain: %w", err)
141 }
142 err = spindle.configureOwner()
143 if err != nil {
144 return err
145 }
146 logger.Info("owner set", "did", cfg.Server.Owner)
147
148 // starts a job queue runner in the background
149 jq.Start()
150 defer jq.Stop()
151
152 // Stop vault token renewal if it implements Stopper
153 if stopper, ok := vault.(secrets.Stopper); ok {
154 defer stopper.Stop()
155 }
156
157 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
158 if err != nil {
159 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
160 }
161
162 err = jc.StartJetstream(ctx, spindle.ingest())
163 if err != nil {
164 return fmt.Errorf("failed to start jetstream consumer: %w", err)
165 }
166
167 // for each incoming sh.tangled.pipeline, we execute
168 // spindle.processPipeline, which in turn enqueues the pipeline
169 // job in the above registered queue.
170 ccfg := eventconsumer.NewConsumerConfig()
171 ccfg.Logger = logger
172 ccfg.Dev = cfg.Server.Dev
173 ccfg.ProcessFunc = spindle.processPipeline
174 ccfg.CursorStore = cursorStore
175 knownKnots, err := d.Knots()
176 if err != nil {
177 return err
178 }
179 for _, knot := range knownKnots {
180 logger.Info("adding source start", "knot", knot)
181 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
182 }
183 spindle.ks = eventconsumer.NewConsumer(*ccfg)
184
185 go func() {
186 logger.Info("starting knot event consumer")
187 spindle.ks.Start(ctx)
188 }()
189
190 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
191 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
192
193 return nil
194}
195
196func (s *Spindle) Router() http.Handler {
197 mux := chi.NewRouter()
198
199 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
200 w.Write(motd)
201 })
202 mux.HandleFunc("/events", s.Events)
203 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
204 w.Write([]byte(s.cfg.Server.Owner))
205 })
206 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
207
208 mux.Mount("/xrpc", s.XrpcRouter())
209 return mux
210}
211
212func (s *Spindle) XrpcRouter() http.Handler {
213 logger := s.l.With("route", "xrpc")
214
215 x := xrpc.Xrpc{
216 Logger: logger,
217 Db: s.db,
218 Enforcer: s.e,
219 Engine: s.eng,
220 Config: s.cfg,
221 Resolver: s.res,
222 Vault: s.vault,
223 }
224
225 return x.Router()
226}
227
228func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
229 if msg.Nsid == tangled.PipelineNSID {
230 tpl := tangled.Pipeline{}
231 err := json.Unmarshal(msg.EventJson, &tpl)
232 if err != nil {
233 fmt.Println("error unmarshalling", err)
234 return err
235 }
236
237 if tpl.TriggerMetadata == nil {
238 return fmt.Errorf("no trigger metadata found")
239 }
240
241 if tpl.TriggerMetadata.Repo == nil {
242 return fmt.Errorf("no repo data found")
243 }
244
245 // filter by repos
246 _, err = s.db.GetRepo(
247 tpl.TriggerMetadata.Repo.Knot,
248 tpl.TriggerMetadata.Repo.Did,
249 tpl.TriggerMetadata.Repo.Repo,
250 )
251 if err != nil {
252 return err
253 }
254
255 pipelineId := models.PipelineId{
256 Knot: src.Key(),
257 Rkey: msg.Rkey,
258 }
259
260 for _, w := range tpl.Workflows {
261 if w != nil {
262 err := s.db.StatusPending(models.WorkflowId{
263 PipelineId: pipelineId,
264 Name: w.Name,
265 }, s.n)
266 if err != nil {
267 return err
268 }
269 }
270 }
271
272 spl := models.ToPipeline(tpl, *s.cfg)
273
274 ok := s.jq.Enqueue(queue.Job{
275 Run: func() error {
276 s.eng.StartWorkflows(ctx, spl, pipelineId)
277 return nil
278 },
279 OnFail: func(jobError error) {
280 s.l.Error("pipeline run failed", "error", jobError)
281 },
282 })
283 if ok {
284 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
285 } else {
286 s.l.Error("failed to enqueue pipeline: queue is full")
287 }
288 }
289
290 return nil
291}
292
293func (s *Spindle) configureOwner() error {
294 cfgOwner := s.cfg.Server.Owner
295
296 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
297 if err != nil {
298 return err
299 }
300
301 switch len(existing) {
302 case 0:
303 // no owner configured, continue
304 case 1:
305 // find existing owner
306 existingOwner := existing[0]
307
308 // no ownership change, this is okay
309 if existingOwner == s.cfg.Server.Owner {
310 break
311 }
312
313 // remove existing owner
314 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
315 if err != nil {
316 return nil
317 }
318 default:
319 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
320 }
321
322 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
323}