1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/eventconsumer"
14 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
15 "tangled.sh/tangled.sh/core/idresolver"
16 "tangled.sh/tangled.sh/core/jetstream"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/notifier"
19 "tangled.sh/tangled.sh/core/rbac"
20 "tangled.sh/tangled.sh/core/spindle/config"
21 "tangled.sh/tangled.sh/core/spindle/db"
22 "tangled.sh/tangled.sh/core/spindle/engine"
23 "tangled.sh/tangled.sh/core/spindle/models"
24 "tangled.sh/tangled.sh/core/spindle/queue"
25 "tangled.sh/tangled.sh/core/spindle/secrets"
26 "tangled.sh/tangled.sh/core/spindle/xrpc"
27 "tangled.sh/tangled.sh/core/xrpc/serviceauth"
28)
29
30//go:embed motd
31var motd []byte
32
33const (
34 rbacDomain = "thisserver"
35)
36
37type Spindle struct {
38 jc *jetstream.JetstreamClient
39 db *db.DB
40 e *rbac.Enforcer
41 l *slog.Logger
42 n *notifier.Notifier
43 eng *engine.Engine
44 jq *queue.Queue
45 cfg *config.Config
46 ks *eventconsumer.Consumer
47 res *idresolver.Resolver
48 vault secrets.Manager
49}
50
51func Run(ctx context.Context) error {
52 logger := log.FromContext(ctx)
53
54 cfg, err := config.Load(ctx)
55 if err != nil {
56 return fmt.Errorf("failed to load config: %w", err)
57 }
58
59 d, err := db.Make(cfg.Server.DBPath)
60 if err != nil {
61 return fmt.Errorf("failed to setup db: %w", err)
62 }
63
64 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
65 if err != nil {
66 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
67 }
68 e.E.EnableAutoSave(true)
69
70 n := notifier.New()
71
72 var vault secrets.Manager
73 switch cfg.Server.Secrets.Provider {
74 case "openbao":
75 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
76 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
77 }
78 vault, err = secrets.NewOpenBaoManager(
79 cfg.Server.Secrets.OpenBao.ProxyAddr,
80 logger,
81 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
82 )
83 if err != nil {
84 return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
85 }
86 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
87 case "sqlite", "":
88 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
89 if err != nil {
90 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
91 }
92 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
93 default:
94 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
95 }
96
97 eng, err := engine.New(ctx, cfg, d, &n, vault)
98 if err != nil {
99 return err
100 }
101
102 jq := queue.NewQueue(100, 5)
103
104 collections := []string{
105 tangled.SpindleMemberNSID,
106 tangled.RepoNSID,
107 tangled.RepoCollaboratorNSID,
108 }
109 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
110 if err != nil {
111 return fmt.Errorf("failed to setup jetstream client: %w", err)
112 }
113 jc.AddDid(cfg.Server.Owner)
114
115 // Check if the spindle knows about any Dids;
116 dids, err := d.GetAllDids()
117 if err != nil {
118 return fmt.Errorf("failed to get all dids: %w", err)
119 }
120 for _, d := range dids {
121 jc.AddDid(d)
122 }
123
124 resolver := idresolver.DefaultResolver()
125
126 spindle := Spindle{
127 jc: jc,
128 e: e,
129 db: d,
130 l: logger,
131 n: &n,
132 eng: eng,
133 jq: jq,
134 cfg: cfg,
135 res: resolver,
136 vault: vault,
137 }
138
139 err = e.AddSpindle(rbacDomain)
140 if err != nil {
141 return fmt.Errorf("failed to set rbac domain: %w", err)
142 }
143 err = spindle.configureOwner()
144 if err != nil {
145 return err
146 }
147 logger.Info("owner set", "did", cfg.Server.Owner)
148
149 // starts a job queue runner in the background
150 jq.Start()
151 defer jq.Stop()
152
153 // Stop vault token renewal if it implements Stopper
154 if stopper, ok := vault.(secrets.Stopper); ok {
155 defer stopper.Stop()
156 }
157
158 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
159 if err != nil {
160 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
161 }
162
163 err = jc.StartJetstream(ctx, spindle.ingest())
164 if err != nil {
165 return fmt.Errorf("failed to start jetstream consumer: %w", err)
166 }
167
168 // for each incoming sh.tangled.pipeline, we execute
169 // spindle.processPipeline, which in turn enqueues the pipeline
170 // job in the above registered queue.
171 ccfg := eventconsumer.NewConsumerConfig()
172 ccfg.Logger = logger
173 ccfg.Dev = cfg.Server.Dev
174 ccfg.ProcessFunc = spindle.processPipeline
175 ccfg.CursorStore = cursorStore
176 knownKnots, err := d.Knots()
177 if err != nil {
178 return err
179 }
180 for _, knot := range knownKnots {
181 logger.Info("adding source start", "knot", knot)
182 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
183 }
184 spindle.ks = eventconsumer.NewConsumer(*ccfg)
185
186 go func() {
187 logger.Info("starting knot event consumer")
188 spindle.ks.Start(ctx)
189 }()
190
191 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
192 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
193
194 return nil
195}
196
197func (s *Spindle) Router() http.Handler {
198 mux := chi.NewRouter()
199
200 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
201 w.Write(motd)
202 })
203 mux.HandleFunc("/events", s.Events)
204 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
205 w.Write([]byte(s.cfg.Server.Owner))
206 })
207 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
208
209 mux.Mount("/xrpc", s.XrpcRouter())
210 return mux
211}
212
213func (s *Spindle) XrpcRouter() http.Handler {
214 logger := s.l.With("route", "xrpc")
215
216 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
217
218 x := xrpc.Xrpc{
219 Logger: logger,
220 Db: s.db,
221 Enforcer: s.e,
222 Engine: s.eng,
223 Config: s.cfg,
224 Resolver: s.res,
225 Vault: s.vault,
226 ServiceAuth: serviceAuth,
227 }
228
229 return x.Router()
230}
231
232func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
233 if msg.Nsid == tangled.PipelineNSID {
234 tpl := tangled.Pipeline{}
235 err := json.Unmarshal(msg.EventJson, &tpl)
236 if err != nil {
237 fmt.Println("error unmarshalling", err)
238 return err
239 }
240
241 if tpl.TriggerMetadata == nil {
242 return fmt.Errorf("no trigger metadata found")
243 }
244
245 if tpl.TriggerMetadata.Repo == nil {
246 return fmt.Errorf("no repo data found")
247 }
248
249 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
250 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
251 }
252
253 // filter by repos
254 _, err = s.db.GetRepo(
255 tpl.TriggerMetadata.Repo.Knot,
256 tpl.TriggerMetadata.Repo.Did,
257 tpl.TriggerMetadata.Repo.Repo,
258 )
259 if err != nil {
260 return err
261 }
262
263 pipelineId := models.PipelineId{
264 Knot: src.Key(),
265 Rkey: msg.Rkey,
266 }
267
268 for _, w := range tpl.Workflows {
269 if w != nil {
270 err := s.db.StatusPending(models.WorkflowId{
271 PipelineId: pipelineId,
272 Name: w.Name,
273 }, s.n)
274 if err != nil {
275 return err
276 }
277 }
278 }
279
280 spl := models.ToPipeline(tpl, *s.cfg)
281
282 ok := s.jq.Enqueue(queue.Job{
283 Run: func() error {
284 s.eng.StartWorkflows(ctx, spl, pipelineId)
285 return nil
286 },
287 OnFail: func(jobError error) {
288 s.l.Error("pipeline run failed", "error", jobError)
289 },
290 })
291 if ok {
292 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
293 } else {
294 s.l.Error("failed to enqueue pipeline: queue is full")
295 }
296 }
297
298 return nil
299}
300
301func (s *Spindle) configureOwner() error {
302 cfgOwner := s.cfg.Server.Owner
303
304 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
305 if err != nil {
306 return err
307 }
308
309 switch len(existing) {
310 case 0:
311 // no owner configured, continue
312 case 1:
313 // find existing owner
314 existingOwner := existing[0]
315
316 // no ownership change, this is okay
317 if existingOwner == s.cfg.Server.Owner {
318 break
319 }
320
321 // remove existing owner
322 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
323 if err != nil {
324 return nil
325 }
326 default:
327 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
328 }
329
330 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
331}