1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/eventconsumer"
14 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
15 "tangled.sh/tangled.sh/core/idresolver"
16 "tangled.sh/tangled.sh/core/jetstream"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/notifier"
19 "tangled.sh/tangled.sh/core/rbac"
20 "tangled.sh/tangled.sh/core/spindle/config"
21 "tangled.sh/tangled.sh/core/spindle/db"
22 "tangled.sh/tangled.sh/core/spindle/engine"
23 "tangled.sh/tangled.sh/core/spindle/models"
24 "tangled.sh/tangled.sh/core/spindle/queue"
25 "tangled.sh/tangled.sh/core/spindle/secrets"
26 "tangled.sh/tangled.sh/core/spindle/xrpc"
27)
28
29//go:embed motd
30var motd []byte
31
32const (
33 rbacDomain = "thisserver"
34)
35
36type Spindle struct {
37 jc *jetstream.JetstreamClient
38 db *db.DB
39 e *rbac.Enforcer
40 l *slog.Logger
41 n *notifier.Notifier
42 eng *engine.Engine
43 jq *queue.Queue
44 cfg *config.Config
45 ks *eventconsumer.Consumer
46 res *idresolver.Resolver
47 vault secrets.Manager
48}
49
50func Run(ctx context.Context) error {
51 logger := log.FromContext(ctx)
52
53 cfg, err := config.Load(ctx)
54 if err != nil {
55 return fmt.Errorf("failed to load config: %w", err)
56 }
57
58 d, err := db.Make(cfg.Server.DBPath)
59 if err != nil {
60 return fmt.Errorf("failed to setup db: %w", err)
61 }
62
63 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
64 if err != nil {
65 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
66 }
67 e.E.EnableAutoSave(true)
68
69 n := notifier.New()
70
71 var vault secrets.Manager
72 switch cfg.Server.Secrets.Provider {
73 case "openbao":
74 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
75 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
76 }
77 vault, err = secrets.NewOpenBaoManager(
78 cfg.Server.Secrets.OpenBao.ProxyAddr,
79 logger,
80 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
81 )
82 if err != nil {
83 return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
84 }
85 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
86 case "sqlite", "":
87 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
88 if err != nil {
89 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
90 }
91 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
92 default:
93 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
94 }
95
96 eng, err := engine.New(ctx, cfg, d, &n, vault)
97 if err != nil {
98 return err
99 }
100
101 jq := queue.NewQueue(100, 2)
102
103 collections := []string{
104 tangled.SpindleMemberNSID,
105 tangled.RepoNSID,
106 }
107 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
108 if err != nil {
109 return fmt.Errorf("failed to setup jetstream client: %w", err)
110 }
111 jc.AddDid(cfg.Server.Owner)
112
113 resolver := idresolver.DefaultResolver()
114
115 spindle := Spindle{
116 jc: jc,
117 e: e,
118 db: d,
119 l: logger,
120 n: &n,
121 eng: eng,
122 jq: jq,
123 cfg: cfg,
124 res: resolver,
125 vault: vault,
126 }
127
128 err = e.AddSpindle(rbacDomain)
129 if err != nil {
130 return fmt.Errorf("failed to set rbac domain: %w", err)
131 }
132 err = spindle.configureOwner()
133 if err != nil {
134 return err
135 }
136 logger.Info("owner set", "did", cfg.Server.Owner)
137
138 // starts a job queue runner in the background
139 jq.Start()
140 defer jq.Stop()
141
142 // Stop vault token renewal if it implements Stopper
143 if stopper, ok := vault.(secrets.Stopper); ok {
144 defer stopper.Stop()
145 }
146
147 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
148 if err != nil {
149 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
150 }
151
152 err = jc.StartJetstream(ctx, spindle.ingest())
153 if err != nil {
154 return fmt.Errorf("failed to start jetstream consumer: %w", err)
155 }
156
157 // for each incoming sh.tangled.pipeline, we execute
158 // spindle.processPipeline, which in turn enqueues the pipeline
159 // job in the above registered queue.
160 ccfg := eventconsumer.NewConsumerConfig()
161 ccfg.Logger = logger
162 ccfg.Dev = cfg.Server.Dev
163 ccfg.ProcessFunc = spindle.processPipeline
164 ccfg.CursorStore = cursorStore
165 knownKnots, err := d.Knots()
166 if err != nil {
167 return err
168 }
169 for _, knot := range knownKnots {
170 logger.Info("adding source start", "knot", knot)
171 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
172 }
173 spindle.ks = eventconsumer.NewConsumer(*ccfg)
174
175 go func() {
176 logger.Info("starting knot event consumer")
177 spindle.ks.Start(ctx)
178 }()
179
180 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
181 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
182
183 return nil
184}
185
186func (s *Spindle) Router() http.Handler {
187 mux := chi.NewRouter()
188
189 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
190 w.Write(motd)
191 })
192 mux.HandleFunc("/events", s.Events)
193 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
194 w.Write([]byte(s.cfg.Server.Owner))
195 })
196 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
197
198 mux.Mount("/xrpc", s.XrpcRouter())
199 return mux
200}
201
202func (s *Spindle) XrpcRouter() http.Handler {
203 logger := s.l.With("route", "xrpc")
204
205 x := xrpc.Xrpc{
206 Logger: logger,
207 Db: s.db,
208 Enforcer: s.e,
209 Engine: s.eng,
210 Config: s.cfg,
211 Resolver: s.res,
212 Vault: s.vault,
213 }
214
215 return x.Router()
216}
217
218func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
219 if msg.Nsid == tangled.PipelineNSID {
220 tpl := tangled.Pipeline{}
221 err := json.Unmarshal(msg.EventJson, &tpl)
222 if err != nil {
223 fmt.Println("error unmarshalling", err)
224 return err
225 }
226
227 if tpl.TriggerMetadata == nil {
228 return fmt.Errorf("no trigger metadata found")
229 }
230
231 if tpl.TriggerMetadata.Repo == nil {
232 return fmt.Errorf("no repo data found")
233 }
234
235 // filter by repos
236 _, err = s.db.GetRepo(
237 tpl.TriggerMetadata.Repo.Knot,
238 tpl.TriggerMetadata.Repo.Did,
239 tpl.TriggerMetadata.Repo.Repo,
240 )
241 if err != nil {
242 return err
243 }
244
245 pipelineId := models.PipelineId{
246 Knot: src.Key(),
247 Rkey: msg.Rkey,
248 }
249
250 for _, w := range tpl.Workflows {
251 if w != nil {
252 err := s.db.StatusPending(models.WorkflowId{
253 PipelineId: pipelineId,
254 Name: w.Name,
255 }, s.n)
256 if err != nil {
257 return err
258 }
259 }
260 }
261
262 spl := models.ToPipeline(tpl, *s.cfg)
263
264 ok := s.jq.Enqueue(queue.Job{
265 Run: func() error {
266 s.eng.StartWorkflows(ctx, spl, pipelineId)
267 return nil
268 },
269 OnFail: func(jobError error) {
270 s.l.Error("pipeline run failed", "error", jobError)
271 },
272 })
273 if ok {
274 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
275 } else {
276 s.l.Error("failed to enqueue pipeline: queue is full")
277 }
278 }
279
280 return nil
281}
282
283func (s *Spindle) configureOwner() error {
284 cfgOwner := s.cfg.Server.Owner
285
286 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
287 if err != nil {
288 return err
289 }
290
291 switch len(existing) {
292 case 0:
293 // no owner configured, continue
294 case 1:
295 // find existing owner
296 existingOwner := existing[0]
297
298 // no ownership change, this is okay
299 if existingOwner == s.cfg.Server.Owner {
300 break
301 }
302
303 // remove existing owner
304 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
305 if err != nil {
306 return nil
307 }
308 default:
309 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
310 }
311
312 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
313}