1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/eventconsumer"
14 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
15 "tangled.sh/tangled.sh/core/idresolver"
16 "tangled.sh/tangled.sh/core/jetstream"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/notifier"
19 "tangled.sh/tangled.sh/core/rbac"
20 "tangled.sh/tangled.sh/core/spindle/config"
21 "tangled.sh/tangled.sh/core/spindle/db"
22 "tangled.sh/tangled.sh/core/spindle/engine"
23 "tangled.sh/tangled.sh/core/spindle/models"
24 "tangled.sh/tangled.sh/core/spindle/queue"
25 "tangled.sh/tangled.sh/core/spindle/secrets"
26 "tangled.sh/tangled.sh/core/spindle/xrpc"
27)
28
29//go:embed motd
30var motd []byte
31
32const (
33 rbacDomain = "thisserver"
34)
35
36type Spindle struct {
37 jc *jetstream.JetstreamClient
38 db *db.DB
39 e *rbac.Enforcer
40 l *slog.Logger
41 n *notifier.Notifier
42 eng *engine.Engine
43 jq *queue.Queue
44 cfg *config.Config
45 ks *eventconsumer.Consumer
46 res *idresolver.Resolver
47 vault secrets.Manager
48}
49
50func Run(ctx context.Context) error {
51 logger := log.FromContext(ctx)
52
53 cfg, err := config.Load(ctx)
54 if err != nil {
55 return fmt.Errorf("failed to load config: %w", err)
56 }
57
58 d, err := db.Make(cfg.Server.DBPath)
59 if err != nil {
60 return fmt.Errorf("failed to setup db: %w", err)
61 }
62
63 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
64 if err != nil {
65 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
66 }
67 e.E.EnableAutoSave(true)
68
69 n := notifier.New()
70
71 var vault secrets.Manager
72 switch cfg.Server.Secrets.Provider {
73 case "openbao":
74 if cfg.Server.Secrets.OpenBao.Addr == "" {
75 return fmt.Errorf("openbao address is required when using openbao secrets provider")
76 }
77 if cfg.Server.Secrets.OpenBao.RoleID == "" {
78 return fmt.Errorf("openbao role_id is required when using openbao secrets provider")
79 }
80 if cfg.Server.Secrets.OpenBao.SecretID == "" {
81 return fmt.Errorf("openbao secret_id is required when using openbao secrets provider")
82 }
83 vault, err = secrets.NewOpenBaoManager(
84 cfg.Server.Secrets.OpenBao.Addr,
85 cfg.Server.Secrets.OpenBao.RoleID,
86 cfg.Server.Secrets.OpenBao.SecretID,
87 logger,
88 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
89 )
90 if err != nil {
91 return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
92 }
93 logger.Info("using openbao secrets provider", "address", cfg.Server.Secrets.OpenBao.Addr, "mount", cfg.Server.Secrets.OpenBao.Mount)
94 case "sqlite", "":
95 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
96 if err != nil {
97 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
98 }
99 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
100 default:
101 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
102 }
103
104 eng, err := engine.New(ctx, cfg, d, &n, vault)
105 if err != nil {
106 return err
107 }
108
109 jq := queue.NewQueue(100, 2)
110
111 collections := []string{
112 tangled.SpindleMemberNSID,
113 tangled.RepoNSID,
114 }
115 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
116 if err != nil {
117 return fmt.Errorf("failed to setup jetstream client: %w", err)
118 }
119 jc.AddDid(cfg.Server.Owner)
120
121 resolver := idresolver.DefaultResolver()
122
123 spindle := Spindle{
124 jc: jc,
125 e: e,
126 db: d,
127 l: logger,
128 n: &n,
129 eng: eng,
130 jq: jq,
131 cfg: cfg,
132 res: resolver,
133 vault: vault,
134 }
135
136 err = e.AddSpindle(rbacDomain)
137 if err != nil {
138 return fmt.Errorf("failed to set rbac domain: %w", err)
139 }
140 err = spindle.configureOwner()
141 if err != nil {
142 return err
143 }
144 logger.Info("owner set", "did", cfg.Server.Owner)
145
146 // starts a job queue runner in the background
147 jq.Start()
148 defer jq.Stop()
149
150 // Stop vault token renewal if it implements Stopper
151 if stopper, ok := vault.(secrets.Stopper); ok {
152 defer stopper.Stop()
153 }
154
155 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
156 if err != nil {
157 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
158 }
159
160 err = jc.StartJetstream(ctx, spindle.ingest())
161 if err != nil {
162 return fmt.Errorf("failed to start jetstream consumer: %w", err)
163 }
164
165 // for each incoming sh.tangled.pipeline, we execute
166 // spindle.processPipeline, which in turn enqueues the pipeline
167 // job in the above registered queue.
168 ccfg := eventconsumer.NewConsumerConfig()
169 ccfg.Logger = logger
170 ccfg.Dev = cfg.Server.Dev
171 ccfg.ProcessFunc = spindle.processPipeline
172 ccfg.CursorStore = cursorStore
173 knownKnots, err := d.Knots()
174 if err != nil {
175 return err
176 }
177 for _, knot := range knownKnots {
178 logger.Info("adding source start", "knot", knot)
179 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
180 }
181 spindle.ks = eventconsumer.NewConsumer(*ccfg)
182
183 go func() {
184 logger.Info("starting knot event consumer")
185 spindle.ks.Start(ctx)
186 }()
187
188 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
189 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
190
191 return nil
192}
193
194func (s *Spindle) Router() http.Handler {
195 mux := chi.NewRouter()
196
197 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
198 w.Write(motd)
199 })
200 mux.HandleFunc("/events", s.Events)
201 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
202 w.Write([]byte(s.cfg.Server.Owner))
203 })
204 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
205
206 mux.Mount("/xrpc", s.XrpcRouter())
207 return mux
208}
209
210func (s *Spindle) XrpcRouter() http.Handler {
211 logger := s.l.With("route", "xrpc")
212
213 x := xrpc.Xrpc{
214 Logger: logger,
215 Db: s.db,
216 Enforcer: s.e,
217 Engine: s.eng,
218 Config: s.cfg,
219 Resolver: s.res,
220 Vault: s.vault,
221 }
222
223 return x.Router()
224}
225
226func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
227 if msg.Nsid == tangled.PipelineNSID {
228 tpl := tangled.Pipeline{}
229 err := json.Unmarshal(msg.EventJson, &tpl)
230 if err != nil {
231 fmt.Println("error unmarshalling", err)
232 return err
233 }
234
235 if tpl.TriggerMetadata == nil {
236 return fmt.Errorf("no trigger metadata found")
237 }
238
239 if tpl.TriggerMetadata.Repo == nil {
240 return fmt.Errorf("no repo data found")
241 }
242
243 // filter by repos
244 _, err = s.db.GetRepo(
245 tpl.TriggerMetadata.Repo.Knot,
246 tpl.TriggerMetadata.Repo.Did,
247 tpl.TriggerMetadata.Repo.Repo,
248 )
249 if err != nil {
250 return err
251 }
252
253 pipelineId := models.PipelineId{
254 Knot: src.Key(),
255 Rkey: msg.Rkey,
256 }
257
258 for _, w := range tpl.Workflows {
259 if w != nil {
260 err := s.db.StatusPending(models.WorkflowId{
261 PipelineId: pipelineId,
262 Name: w.Name,
263 }, s.n)
264 if err != nil {
265 return err
266 }
267 }
268 }
269
270 spl := models.ToPipeline(tpl, *s.cfg)
271
272 ok := s.jq.Enqueue(queue.Job{
273 Run: func() error {
274 s.eng.StartWorkflows(ctx, spl, pipelineId)
275 return nil
276 },
277 OnFail: func(jobError error) {
278 s.l.Error("pipeline run failed", "error", jobError)
279 },
280 })
281 if ok {
282 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
283 } else {
284 s.l.Error("failed to enqueue pipeline: queue is full")
285 }
286 }
287
288 return nil
289}
290
291func (s *Spindle) configureOwner() error {
292 cfgOwner := s.cfg.Server.Owner
293
294 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
295 if err != nil {
296 return err
297 }
298
299 switch len(existing) {
300 case 0:
301 // no owner configured, continue
302 case 1:
303 // find existing owner
304 existingOwner := existing[0]
305
306 // no ownership change, this is okay
307 if existingOwner == s.cfg.Server.Owner {
308 break
309 }
310
311 // remove existing owner
312 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
313 if err != nil {
314 return nil
315 }
316 default:
317 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
318 }
319
320 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
321}