1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/eventconsumer"
14 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
15 "tangled.sh/tangled.sh/core/idresolver"
16 "tangled.sh/tangled.sh/core/jetstream"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/notifier"
19 "tangled.sh/tangled.sh/core/rbac"
20 "tangled.sh/tangled.sh/core/spindle/config"
21 "tangled.sh/tangled.sh/core/spindle/db"
22 "tangled.sh/tangled.sh/core/spindle/engine"
23 "tangled.sh/tangled.sh/core/spindle/models"
24 "tangled.sh/tangled.sh/core/spindle/queue"
25 "tangled.sh/tangled.sh/core/spindle/secrets"
26 "tangled.sh/tangled.sh/core/spindle/xrpc"
27)
28
29//go:embed motd
30var motd []byte
31
32const (
33 rbacDomain = "thisserver"
34)
35
36type Spindle struct {
37 jc *jetstream.JetstreamClient
38 db *db.DB
39 e *rbac.Enforcer
40 l *slog.Logger
41 n *notifier.Notifier
42 eng *engine.Engine
43 jq *queue.Queue
44 cfg *config.Config
45 ks *eventconsumer.Consumer
46 res *idresolver.Resolver
47 vault secrets.Manager
48}
49
50func Run(ctx context.Context) error {
51 logger := log.FromContext(ctx)
52
53 cfg, err := config.Load(ctx)
54 if err != nil {
55 return fmt.Errorf("failed to load config: %w", err)
56 }
57
58 d, err := db.Make(cfg.Server.DBPath)
59 if err != nil {
60 return fmt.Errorf("failed to setup db: %w", err)
61 }
62
63 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
64 if err != nil {
65 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
66 }
67 e.E.EnableAutoSave(true)
68
69 n := notifier.New()
70
71 var vault secrets.Manager
72 switch cfg.Server.Secrets.Provider {
73 case "openbao":
74 if cfg.Server.Secrets.OpenBao.Addr == "" {
75 return fmt.Errorf("openbao address is required when using openbao secrets provider")
76 }
77 if cfg.Server.Secrets.OpenBao.RoleID == "" {
78 return fmt.Errorf("openbao role_id is required when using openbao secrets provider")
79 }
80 if cfg.Server.Secrets.OpenBao.SecretID == "" {
81 return fmt.Errorf("openbao secret_id is required when using openbao secrets provider")
82 }
83 vault, err = secrets.NewOpenBaoManager(
84 cfg.Server.Secrets.OpenBao.Addr,
85 cfg.Server.Secrets.OpenBao.RoleID,
86 cfg.Server.Secrets.OpenBao.SecretID,
87 logger,
88 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
89 )
90 if err != nil {
91 return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
92 }
93 logger.Info("using openbao secrets provider", "address", cfg.Server.Secrets.OpenBao.Addr, "mount", cfg.Server.Secrets.OpenBao.Mount)
94 case "sqlite", "":
95 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
96 if err != nil {
97 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
98 }
99 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
100 default:
101 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
102 }
103
104 eng, err := engine.New(ctx, cfg, d, &n, vault)
105 if err != nil {
106 return err
107 }
108
109 jq := queue.NewQueue(100, 2)
110
111 collections := []string{
112 tangled.SpindleMemberNSID,
113 tangled.RepoNSID,
114 tangled.RepoCollaboratorNSID,
115 }
116 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
117 if err != nil {
118 return fmt.Errorf("failed to setup jetstream client: %w", err)
119 }
120 jc.AddDid(cfg.Server.Owner)
121
122 resolver := idresolver.DefaultResolver()
123
124 spindle := Spindle{
125 jc: jc,
126 e: e,
127 db: d,
128 l: logger,
129 n: &n,
130 eng: eng,
131 jq: jq,
132 cfg: cfg,
133 res: resolver,
134 vault: vault,
135 }
136
137 err = e.AddSpindle(rbacDomain)
138 if err != nil {
139 return fmt.Errorf("failed to set rbac domain: %w", err)
140 }
141 err = spindle.configureOwner()
142 if err != nil {
143 return err
144 }
145 logger.Info("owner set", "did", cfg.Server.Owner)
146
147 // starts a job queue runner in the background
148 jq.Start()
149 defer jq.Stop()
150
151 // Stop vault token renewal if it implements Stopper
152 if stopper, ok := vault.(secrets.Stopper); ok {
153 defer stopper.Stop()
154 }
155
156 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
157 if err != nil {
158 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
159 }
160
161 err = jc.StartJetstream(ctx, spindle.ingest())
162 if err != nil {
163 return fmt.Errorf("failed to start jetstream consumer: %w", err)
164 }
165
166 // for each incoming sh.tangled.pipeline, we execute
167 // spindle.processPipeline, which in turn enqueues the pipeline
168 // job in the above registered queue.
169 ccfg := eventconsumer.NewConsumerConfig()
170 ccfg.Logger = logger
171 ccfg.Dev = cfg.Server.Dev
172 ccfg.ProcessFunc = spindle.processPipeline
173 ccfg.CursorStore = cursorStore
174 knownKnots, err := d.Knots()
175 if err != nil {
176 return err
177 }
178 for _, knot := range knownKnots {
179 logger.Info("adding source start", "knot", knot)
180 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
181 }
182 spindle.ks = eventconsumer.NewConsumer(*ccfg)
183
184 go func() {
185 logger.Info("starting knot event consumer")
186 spindle.ks.Start(ctx)
187 }()
188
189 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
190 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
191
192 return nil
193}
194
195func (s *Spindle) Router() http.Handler {
196 mux := chi.NewRouter()
197
198 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
199 w.Write(motd)
200 })
201 mux.HandleFunc("/events", s.Events)
202 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
203 w.Write([]byte(s.cfg.Server.Owner))
204 })
205 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
206
207 mux.Mount("/xrpc", s.XrpcRouter())
208 return mux
209}
210
211func (s *Spindle) XrpcRouter() http.Handler {
212 logger := s.l.With("route", "xrpc")
213
214 x := xrpc.Xrpc{
215 Logger: logger,
216 Db: s.db,
217 Enforcer: s.e,
218 Engine: s.eng,
219 Config: s.cfg,
220 Resolver: s.res,
221 Vault: s.vault,
222 }
223
224 return x.Router()
225}
226
227func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
228 if msg.Nsid == tangled.PipelineNSID {
229 tpl := tangled.Pipeline{}
230 err := json.Unmarshal(msg.EventJson, &tpl)
231 if err != nil {
232 fmt.Println("error unmarshalling", err)
233 return err
234 }
235
236 if tpl.TriggerMetadata == nil {
237 return fmt.Errorf("no trigger metadata found")
238 }
239
240 if tpl.TriggerMetadata.Repo == nil {
241 return fmt.Errorf("no repo data found")
242 }
243
244 // filter by repos
245 _, err = s.db.GetRepo(
246 tpl.TriggerMetadata.Repo.Knot,
247 tpl.TriggerMetadata.Repo.Did,
248 tpl.TriggerMetadata.Repo.Repo,
249 )
250 if err != nil {
251 return err
252 }
253
254 pipelineId := models.PipelineId{
255 Knot: src.Key(),
256 Rkey: msg.Rkey,
257 }
258
259 for _, w := range tpl.Workflows {
260 if w != nil {
261 err := s.db.StatusPending(models.WorkflowId{
262 PipelineId: pipelineId,
263 Name: w.Name,
264 }, s.n)
265 if err != nil {
266 return err
267 }
268 }
269 }
270
271 spl := models.ToPipeline(tpl, *s.cfg)
272
273 ok := s.jq.Enqueue(queue.Job{
274 Run: func() error {
275 s.eng.StartWorkflows(ctx, spl, pipelineId)
276 return nil
277 },
278 OnFail: func(jobError error) {
279 s.l.Error("pipeline run failed", "error", jobError)
280 },
281 })
282 if ok {
283 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
284 } else {
285 s.l.Error("failed to enqueue pipeline: queue is full")
286 }
287 }
288
289 return nil
290}
291
292func (s *Spindle) configureOwner() error {
293 cfgOwner := s.cfg.Server.Owner
294
295 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
296 if err != nil {
297 return err
298 }
299
300 switch len(existing) {
301 case 0:
302 // no owner configured, continue
303 case 1:
304 // find existing owner
305 existingOwner := existing[0]
306
307 // no ownership change, this is okay
308 if existingOwner == s.cfg.Server.Owner {
309 break
310 }
311
312 // remove existing owner
313 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
314 if err != nil {
315 return nil
316 }
317 default:
318 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
319 }
320
321 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
322}