forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 11 "github.com/go-chi/chi/v5" 12 "tangled.sh/tangled.sh/core/api/tangled" 13 "tangled.sh/tangled.sh/core/eventconsumer" 14 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 15 "tangled.sh/tangled.sh/core/idresolver" 16 "tangled.sh/tangled.sh/core/jetstream" 17 "tangled.sh/tangled.sh/core/log" 18 "tangled.sh/tangled.sh/core/notifier" 19 "tangled.sh/tangled.sh/core/rbac" 20 "tangled.sh/tangled.sh/core/spindle/config" 21 "tangled.sh/tangled.sh/core/spindle/db" 22 "tangled.sh/tangled.sh/core/spindle/engine" 23 "tangled.sh/tangled.sh/core/spindle/engines/nixery" 24 "tangled.sh/tangled.sh/core/spindle/models" 25 "tangled.sh/tangled.sh/core/spindle/queue" 26 "tangled.sh/tangled.sh/core/spindle/secrets" 27 "tangled.sh/tangled.sh/core/spindle/xrpc" 28) 29 30//go:embed motd 31var motd []byte 32 33const ( 34 rbacDomain = "thisserver" 35) 36 37type Spindle struct { 38 jc *jetstream.JetstreamClient 39 db *db.DB 40 e *rbac.Enforcer 41 l *slog.Logger 42 n *notifier.Notifier 43 engs map[string]models.Engine 44 jq *queue.Queue 45 cfg *config.Config 46 ks *eventconsumer.Consumer 47 res *idresolver.Resolver 48 vault secrets.Manager 49} 50 51func Run(ctx context.Context) error { 52 logger := log.FromContext(ctx) 53 54 cfg, err := config.Load(ctx) 55 if err != nil { 56 return fmt.Errorf("failed to load config: %w", err) 57 } 58 59 d, err := db.Make(cfg.Server.DBPath) 60 if err != nil { 61 return fmt.Errorf("failed to setup db: %w", err) 62 } 63 64 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 65 if err != nil { 66 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 67 } 68 e.E.EnableAutoSave(true) 69 70 n := notifier.New() 71 72 var vault secrets.Manager 73 switch cfg.Server.Secrets.Provider { 74 case "openbao": 75 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 76 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 77 } 78 vault, err = secrets.NewOpenBaoManager( 79 cfg.Server.Secrets.OpenBao.ProxyAddr, 80 logger, 81 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 82 ) 83 if err != nil { 84 return fmt.Errorf("failed to setup openbao secrets provider: %w", err) 85 } 86 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 87 case "sqlite", "": 88 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 89 if err != nil { 90 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 91 } 92 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 93 default: 94 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 95 } 96 97 nixeryEng, err := nixery.New(ctx, cfg) 98 if err != nil { 99 return err 100 } 101 102 jq := queue.NewQueue(100, 5) 103 104 collections := []string{ 105 tangled.SpindleMemberNSID, 106 tangled.RepoNSID, 107 tangled.RepoCollaboratorNSID, 108 } 109 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true) 110 if err != nil { 111 return fmt.Errorf("failed to setup jetstream client: %w", err) 112 } 113 jc.AddDid(cfg.Server.Owner) 114 115 // Check if the spindle knows about any Dids; 116 dids, err := d.GetAllDids() 117 if err != nil { 118 return fmt.Errorf("failed to get all dids: %w", err) 119 } 120 for _, d := range dids { 121 jc.AddDid(d) 122 } 123 124 resolver := idresolver.DefaultResolver() 125 126 spindle := Spindle{ 127 jc: jc, 128 e: e, 129 db: d, 130 l: logger, 131 n: &n, 132 engs: map[string]models.Engine{"nixery": nixeryEng}, 133 jq: jq, 134 cfg: cfg, 135 res: resolver, 136 vault: vault, 137 } 138 139 err = e.AddSpindle(rbacDomain) 140 if err != nil { 141 return fmt.Errorf("failed to set rbac domain: %w", err) 142 } 143 err = spindle.configureOwner() 144 if err != nil { 145 return err 146 } 147 logger.Info("owner set", "did", cfg.Server.Owner) 148 149 // starts a job queue runner in the background 150 jq.Start() 151 defer jq.Stop() 152 153 // Stop vault token renewal if it implements Stopper 154 if stopper, ok := vault.(secrets.Stopper); ok { 155 defer stopper.Stop() 156 } 157 158 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 159 if err != nil { 160 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 161 } 162 163 err = jc.StartJetstream(ctx, spindle.ingest()) 164 if err != nil { 165 return fmt.Errorf("failed to start jetstream consumer: %w", err) 166 } 167 168 // for each incoming sh.tangled.pipeline, we execute 169 // spindle.processPipeline, which in turn enqueues the pipeline 170 // job in the above registered queue. 171 ccfg := eventconsumer.NewConsumerConfig() 172 ccfg.Logger = logger 173 ccfg.Dev = cfg.Server.Dev 174 ccfg.ProcessFunc = spindle.processPipeline 175 ccfg.CursorStore = cursorStore 176 knownKnots, err := d.Knots() 177 if err != nil { 178 return err 179 } 180 for _, knot := range knownKnots { 181 logger.Info("adding source start", "knot", knot) 182 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 183 } 184 spindle.ks = eventconsumer.NewConsumer(*ccfg) 185 186 go func() { 187 logger.Info("starting knot event consumer") 188 spindle.ks.Start(ctx) 189 }() 190 191 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 192 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 193 194 return nil 195} 196 197func (s *Spindle) Router() http.Handler { 198 mux := chi.NewRouter() 199 200 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 201 w.Write(motd) 202 }) 203 mux.HandleFunc("/events", s.Events) 204 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) { 205 w.Write([]byte(s.cfg.Server.Owner)) 206 }) 207 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 208 209 mux.Mount("/xrpc", s.XrpcRouter()) 210 return mux 211} 212 213func (s *Spindle) XrpcRouter() http.Handler { 214 logger := s.l.With("route", "xrpc") 215 216 x := xrpc.Xrpc{ 217 Logger: logger, 218 Db: s.db, 219 Enforcer: s.e, 220 Engines: s.engs, 221 Config: s.cfg, 222 Resolver: s.res, 223 Vault: s.vault, 224 } 225 226 return x.Router() 227} 228 229func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 230 if msg.Nsid == tangled.PipelineNSID { 231 tpl := tangled.Pipeline{} 232 err := json.Unmarshal(msg.EventJson, &tpl) 233 if err != nil { 234 fmt.Println("error unmarshalling", err) 235 return err 236 } 237 238 if tpl.TriggerMetadata == nil { 239 return fmt.Errorf("no trigger metadata found") 240 } 241 242 if tpl.TriggerMetadata.Repo == nil { 243 return fmt.Errorf("no repo data found") 244 } 245 246 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 247 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 248 } 249 250 // filter by repos 251 _, err = s.db.GetRepo( 252 tpl.TriggerMetadata.Repo.Knot, 253 tpl.TriggerMetadata.Repo.Did, 254 tpl.TriggerMetadata.Repo.Repo, 255 ) 256 if err != nil { 257 return err 258 } 259 260 pipelineId := models.PipelineId{ 261 Knot: src.Key(), 262 Rkey: msg.Rkey, 263 } 264 265 workflows := make(map[models.Engine][]models.Workflow) 266 267 for _, w := range tpl.Workflows { 268 if w != nil { 269 if _, ok := s.engs[w.Engine]; !ok { 270 err = s.db.StatusFailed(models.WorkflowId{ 271 PipelineId: pipelineId, 272 Name: w.Name, 273 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 274 if err != nil { 275 return err 276 } 277 278 continue 279 } 280 281 eng := s.engs[w.Engine] 282 283 if _, ok := workflows[eng]; !ok { 284 workflows[eng] = []models.Workflow{} 285 } 286 287 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 288 if err != nil { 289 return err 290 } 291 292 workflows[eng] = append(workflows[eng], *ewf) 293 294 err = s.db.StatusPending(models.WorkflowId{ 295 PipelineId: pipelineId, 296 Name: w.Name, 297 }, s.n) 298 if err != nil { 299 return err 300 } 301 } 302 } 303 304 ok := s.jq.Enqueue(queue.Job{ 305 Run: func() error { 306 engine.StartWorkflows(s.l, s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 307 RepoOwner: tpl.TriggerMetadata.Repo.Did, 308 RepoName: tpl.TriggerMetadata.Repo.Repo, 309 Workflows: workflows, 310 }, pipelineId) 311 return nil 312 }, 313 OnFail: func(jobError error) { 314 s.l.Error("pipeline run failed", "error", jobError) 315 }, 316 }) 317 if ok { 318 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 319 } else { 320 s.l.Error("failed to enqueue pipeline: queue is full") 321 } 322 } 323 324 return nil 325} 326 327func (s *Spindle) configureOwner() error { 328 cfgOwner := s.cfg.Server.Owner 329 330 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 331 if err != nil { 332 return err 333 } 334 335 switch len(existing) { 336 case 0: 337 // no owner configured, continue 338 case 1: 339 // find existing owner 340 existingOwner := existing[0] 341 342 // no ownership change, this is okay 343 if existingOwner == s.cfg.Server.Owner { 344 break 345 } 346 347 // remove existing owner 348 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 349 if err != nil { 350 return nil 351 } 352 default: 353 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 354 } 355 356 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 357}