1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/eventconsumer"
14 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
15 "tangled.sh/tangled.sh/core/idresolver"
16 "tangled.sh/tangled.sh/core/jetstream"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/notifier"
19 "tangled.sh/tangled.sh/core/rbac"
20 "tangled.sh/tangled.sh/core/spindle/config"
21 "tangled.sh/tangled.sh/core/spindle/db"
22 "tangled.sh/tangled.sh/core/spindle/engine"
23 "tangled.sh/tangled.sh/core/spindle/models"
24 "tangled.sh/tangled.sh/core/spindle/queue"
25 "tangled.sh/tangled.sh/core/spindle/secrets"
26 "tangled.sh/tangled.sh/core/spindle/xrpc"
27)
28
29//go:embed motd
30var motd []byte
31
32const (
33 rbacDomain = "thisserver"
34)
35
36type Spindle struct {
37 jc *jetstream.JetstreamClient
38 db *db.DB
39 e *rbac.Enforcer
40 l *slog.Logger
41 n *notifier.Notifier
42 eng *engine.Engine
43 jq *queue.Queue
44 cfg *config.Config
45 ks *eventconsumer.Consumer
46 res *idresolver.Resolver
47 vault secrets.Manager
48}
49
50func Run(ctx context.Context) error {
51 logger := log.FromContext(ctx)
52
53 cfg, err := config.Load(ctx)
54 if err != nil {
55 return fmt.Errorf("failed to load config: %w", err)
56 }
57
58 d, err := db.Make(cfg.Server.DBPath)
59 if err != nil {
60 return fmt.Errorf("failed to setup db: %w", err)
61 }
62
63 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
64 if err != nil {
65 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
66 }
67 e.E.EnableAutoSave(true)
68
69 n := notifier.New()
70
71 var vault secrets.Manager
72 switch cfg.Server.Secrets.Provider {
73 case "openbao":
74 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
75 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
76 }
77 vault, err = secrets.NewOpenBaoManager(
78 cfg.Server.Secrets.OpenBao.ProxyAddr,
79 logger,
80 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
81 )
82 if err != nil {
83 return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
84 }
85 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
86 case "sqlite", "":
87 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
88 if err != nil {
89 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
90 }
91 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
92 default:
93 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
94 }
95
96 eng, err := engine.New(ctx, cfg, d, &n, vault)
97 if err != nil {
98 return err
99 }
100
101 jq := queue.NewQueue(100, 2)
102
103 collections := []string{
104 tangled.SpindleMemberNSID,
105 tangled.RepoNSID,
106 tangled.RepoCollaboratorNSID,
107 }
108 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
109 if err != nil {
110 return fmt.Errorf("failed to setup jetstream client: %w", err)
111 }
112 jc.AddDid(cfg.Server.Owner)
113
114 resolver := idresolver.DefaultResolver()
115
116 spindle := Spindle{
117 jc: jc,
118 e: e,
119 db: d,
120 l: logger,
121 n: &n,
122 eng: eng,
123 jq: jq,
124 cfg: cfg,
125 res: resolver,
126 vault: vault,
127 }
128
129 err = e.AddSpindle(rbacDomain)
130 if err != nil {
131 return fmt.Errorf("failed to set rbac domain: %w", err)
132 }
133 err = spindle.configureOwner()
134 if err != nil {
135 return err
136 }
137 logger.Info("owner set", "did", cfg.Server.Owner)
138
139 // starts a job queue runner in the background
140 jq.Start()
141 defer jq.Stop()
142
143 // Stop vault token renewal if it implements Stopper
144 if stopper, ok := vault.(secrets.Stopper); ok {
145 defer stopper.Stop()
146 }
147
148 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
149 if err != nil {
150 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
151 }
152
153 err = jc.StartJetstream(ctx, spindle.ingest())
154 if err != nil {
155 return fmt.Errorf("failed to start jetstream consumer: %w", err)
156 }
157
158 // for each incoming sh.tangled.pipeline, we execute
159 // spindle.processPipeline, which in turn enqueues the pipeline
160 // job in the above registered queue.
161 ccfg := eventconsumer.NewConsumerConfig()
162 ccfg.Logger = logger
163 ccfg.Dev = cfg.Server.Dev
164 ccfg.ProcessFunc = spindle.processPipeline
165 ccfg.CursorStore = cursorStore
166 knownKnots, err := d.Knots()
167 if err != nil {
168 return err
169 }
170 for _, knot := range knownKnots {
171 logger.Info("adding source start", "knot", knot)
172 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
173 }
174 spindle.ks = eventconsumer.NewConsumer(*ccfg)
175
176 go func() {
177 logger.Info("starting knot event consumer")
178 spindle.ks.Start(ctx)
179 }()
180
181 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
182 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
183
184 return nil
185}
186
187func (s *Spindle) Router() http.Handler {
188 mux := chi.NewRouter()
189
190 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
191 w.Write(motd)
192 })
193 mux.HandleFunc("/events", s.Events)
194 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
195 w.Write([]byte(s.cfg.Server.Owner))
196 })
197 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
198
199 mux.Mount("/xrpc", s.XrpcRouter())
200 return mux
201}
202
203func (s *Spindle) XrpcRouter() http.Handler {
204 logger := s.l.With("route", "xrpc")
205
206 x := xrpc.Xrpc{
207 Logger: logger,
208 Db: s.db,
209 Enforcer: s.e,
210 Engine: s.eng,
211 Config: s.cfg,
212 Resolver: s.res,
213 Vault: s.vault,
214 }
215
216 return x.Router()
217}
218
219func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
220 if msg.Nsid == tangled.PipelineNSID {
221 tpl := tangled.Pipeline{}
222 err := json.Unmarshal(msg.EventJson, &tpl)
223 if err != nil {
224 fmt.Println("error unmarshalling", err)
225 return err
226 }
227
228 if tpl.TriggerMetadata == nil {
229 return fmt.Errorf("no trigger metadata found")
230 }
231
232 if tpl.TriggerMetadata.Repo == nil {
233 return fmt.Errorf("no repo data found")
234 }
235
236 // filter by repos
237 _, err = s.db.GetRepo(
238 tpl.TriggerMetadata.Repo.Knot,
239 tpl.TriggerMetadata.Repo.Did,
240 tpl.TriggerMetadata.Repo.Repo,
241 )
242 if err != nil {
243 return err
244 }
245
246 pipelineId := models.PipelineId{
247 Knot: src.Key(),
248 Rkey: msg.Rkey,
249 }
250
251 for _, w := range tpl.Workflows {
252 if w != nil {
253 err := s.db.StatusPending(models.WorkflowId{
254 PipelineId: pipelineId,
255 Name: w.Name,
256 }, s.n)
257 if err != nil {
258 return err
259 }
260 }
261 }
262
263 spl := models.ToPipeline(tpl, *s.cfg)
264
265 ok := s.jq.Enqueue(queue.Job{
266 Run: func() error {
267 s.eng.StartWorkflows(ctx, spl, pipelineId)
268 return nil
269 },
270 OnFail: func(jobError error) {
271 s.l.Error("pipeline run failed", "error", jobError)
272 },
273 })
274 if ok {
275 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
276 } else {
277 s.l.Error("failed to enqueue pipeline: queue is full")
278 }
279 }
280
281 return nil
282}
283
284func (s *Spindle) configureOwner() error {
285 cfgOwner := s.cfg.Server.Owner
286
287 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
288 if err != nil {
289 return err
290 }
291
292 switch len(existing) {
293 case 0:
294 // no owner configured, continue
295 case 1:
296 // find existing owner
297 existingOwner := existing[0]
298
299 // no ownership change, this is okay
300 if existingOwner == s.cfg.Server.Owner {
301 break
302 }
303
304 // remove existing owner
305 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
306 if err != nil {
307 return nil
308 }
309 default:
310 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
311 }
312
313 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
314}