forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 11 "github.com/go-chi/chi/v5" 12 "tangled.sh/tangled.sh/core/api/tangled" 13 "tangled.sh/tangled.sh/core/eventconsumer" 14 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 15 "tangled.sh/tangled.sh/core/idresolver" 16 "tangled.sh/tangled.sh/core/jetstream" 17 "tangled.sh/tangled.sh/core/log" 18 "tangled.sh/tangled.sh/core/notifier" 19 "tangled.sh/tangled.sh/core/rbac" 20 "tangled.sh/tangled.sh/core/spindle/config" 21 "tangled.sh/tangled.sh/core/spindle/db" 22 "tangled.sh/tangled.sh/core/spindle/engine" 23 "tangled.sh/tangled.sh/core/spindle/engines/nixery" 24 "tangled.sh/tangled.sh/core/spindle/models" 25 "tangled.sh/tangled.sh/core/spindle/queue" 26 "tangled.sh/tangled.sh/core/spindle/secrets" 27 "tangled.sh/tangled.sh/core/spindle/xrpc" 28 "tangled.sh/tangled.sh/core/xrpc/serviceauth" 29) 30 31//go:embed motd 32var motd []byte 33 34const ( 35 rbacDomain = "thisserver" 36) 37 38type Spindle struct { 39 jc *jetstream.JetstreamClient 40 db *db.DB 41 e *rbac.Enforcer 42 l *slog.Logger 43 n *notifier.Notifier 44 engs map[string]models.Engine 45 jq *queue.Queue 46 cfg *config.Config 47 ks *eventconsumer.Consumer 48 res *idresolver.Resolver 49 vault secrets.Manager 50} 51 52func Run(ctx context.Context) error { 53 logger := log.FromContext(ctx) 54 55 cfg, err := config.Load(ctx) 56 if err != nil { 57 return fmt.Errorf("failed to load config: %w", err) 58 } 59 60 d, err := db.Make(cfg.Server.DBPath) 61 if err != nil { 62 return fmt.Errorf("failed to setup db: %w", err) 63 } 64 65 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 66 if err != nil { 67 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 68 } 69 e.E.EnableAutoSave(true) 70 71 n := notifier.New() 72 73 var vault secrets.Manager 74 switch cfg.Server.Secrets.Provider { 75 case "openbao": 76 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 77 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 78 } 79 vault, err = secrets.NewOpenBaoManager( 80 cfg.Server.Secrets.OpenBao.ProxyAddr, 81 logger, 82 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 83 ) 84 if err != nil { 85 return fmt.Errorf("failed to setup openbao secrets provider: %w", err) 86 } 87 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 88 case "sqlite", "": 89 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 90 if err != nil { 91 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 92 } 93 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 94 default: 95 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 96 } 97 98 nixeryEng, err := nixery.New(ctx, cfg) 99 if err != nil { 100 return err 101 } 102 103 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 104 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 105 106 collections := []string{ 107 tangled.SpindleMemberNSID, 108 tangled.RepoNSID, 109 tangled.RepoCollaboratorNSID, 110 } 111 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true) 112 if err != nil { 113 return fmt.Errorf("failed to setup jetstream client: %w", err) 114 } 115 jc.AddDid(cfg.Server.Owner) 116 117 // Check if the spindle knows about any Dids; 118 dids, err := d.GetAllDids() 119 if err != nil { 120 return fmt.Errorf("failed to get all dids: %w", err) 121 } 122 for _, d := range dids { 123 jc.AddDid(d) 124 } 125 126 resolver := idresolver.DefaultResolver() 127 128 spindle := Spindle{ 129 jc: jc, 130 e: e, 131 db: d, 132 l: logger, 133 n: &n, 134 engs: map[string]models.Engine{"nixery": nixeryEng}, 135 jq: jq, 136 cfg: cfg, 137 res: resolver, 138 vault: vault, 139 } 140 141 err = e.AddSpindle(rbacDomain) 142 if err != nil { 143 return fmt.Errorf("failed to set rbac domain: %w", err) 144 } 145 err = spindle.configureOwner() 146 if err != nil { 147 return err 148 } 149 logger.Info("owner set", "did", cfg.Server.Owner) 150 151 // starts a job queue runner in the background 152 jq.Start() 153 defer jq.Stop() 154 155 // Stop vault token renewal if it implements Stopper 156 if stopper, ok := vault.(secrets.Stopper); ok { 157 defer stopper.Stop() 158 } 159 160 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 161 if err != nil { 162 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 163 } 164 165 err = jc.StartJetstream(ctx, spindle.ingest()) 166 if err != nil { 167 return fmt.Errorf("failed to start jetstream consumer: %w", err) 168 } 169 170 // for each incoming sh.tangled.pipeline, we execute 171 // spindle.processPipeline, which in turn enqueues the pipeline 172 // job in the above registered queue. 173 ccfg := eventconsumer.NewConsumerConfig() 174 ccfg.Logger = logger 175 ccfg.Dev = cfg.Server.Dev 176 ccfg.ProcessFunc = spindle.processPipeline 177 ccfg.CursorStore = cursorStore 178 knownKnots, err := d.Knots() 179 if err != nil { 180 return err 181 } 182 for _, knot := range knownKnots { 183 logger.Info("adding source start", "knot", knot) 184 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 185 } 186 spindle.ks = eventconsumer.NewConsumer(*ccfg) 187 188 go func() { 189 logger.Info("starting knot event consumer") 190 spindle.ks.Start(ctx) 191 }() 192 193 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 194 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 195 196 return nil 197} 198 199func (s *Spindle) Router() http.Handler { 200 mux := chi.NewRouter() 201 202 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 203 w.Write(motd) 204 }) 205 mux.HandleFunc("/events", s.Events) 206 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) { 207 w.Write([]byte(s.cfg.Server.Owner)) 208 }) 209 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 210 211 mux.Mount("/xrpc", s.XrpcRouter()) 212 return mux 213} 214 215func (s *Spindle) XrpcRouter() http.Handler { 216 logger := s.l.With("route", "xrpc") 217 218 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 219 220 x := xrpc.Xrpc{ 221 Logger: logger, 222 Db: s.db, 223 Enforcer: s.e, 224 Engines: s.engs, 225 Config: s.cfg, 226 Resolver: s.res, 227 Vault: s.vault, 228 ServiceAuth: serviceAuth, 229 } 230 231 return x.Router() 232} 233 234func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 235 if msg.Nsid == tangled.PipelineNSID { 236 tpl := tangled.Pipeline{} 237 err := json.Unmarshal(msg.EventJson, &tpl) 238 if err != nil { 239 fmt.Println("error unmarshalling", err) 240 return err 241 } 242 243 if tpl.TriggerMetadata == nil { 244 return fmt.Errorf("no trigger metadata found") 245 } 246 247 if tpl.TriggerMetadata.Repo == nil { 248 return fmt.Errorf("no repo data found") 249 } 250 251 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 252 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 253 } 254 255 // filter by repos 256 _, err = s.db.GetRepo( 257 tpl.TriggerMetadata.Repo.Knot, 258 tpl.TriggerMetadata.Repo.Did, 259 tpl.TriggerMetadata.Repo.Repo, 260 ) 261 if err != nil { 262 return err 263 } 264 265 pipelineId := models.PipelineId{ 266 Knot: src.Key(), 267 Rkey: msg.Rkey, 268 } 269 270 workflows := make(map[models.Engine][]models.Workflow) 271 272 for _, w := range tpl.Workflows { 273 if w != nil { 274 if _, ok := s.engs[w.Engine]; !ok { 275 err = s.db.StatusFailed(models.WorkflowId{ 276 PipelineId: pipelineId, 277 Name: w.Name, 278 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 279 if err != nil { 280 return err 281 } 282 283 continue 284 } 285 286 eng := s.engs[w.Engine] 287 288 if _, ok := workflows[eng]; !ok { 289 workflows[eng] = []models.Workflow{} 290 } 291 292 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 293 if err != nil { 294 return err 295 } 296 297 workflows[eng] = append(workflows[eng], *ewf) 298 299 err = s.db.StatusPending(models.WorkflowId{ 300 PipelineId: pipelineId, 301 Name: w.Name, 302 }, s.n) 303 if err != nil { 304 return err 305 } 306 } 307 } 308 309 ok := s.jq.Enqueue(queue.Job{ 310 Run: func() error { 311 engine.StartWorkflows(s.l, s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 312 RepoOwner: tpl.TriggerMetadata.Repo.Did, 313 RepoName: tpl.TriggerMetadata.Repo.Repo, 314 Workflows: workflows, 315 }, pipelineId) 316 return nil 317 }, 318 OnFail: func(jobError error) { 319 s.l.Error("pipeline run failed", "error", jobError) 320 }, 321 }) 322 if ok { 323 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 324 } else { 325 s.l.Error("failed to enqueue pipeline: queue is full") 326 } 327 } 328 329 return nil 330} 331 332func (s *Spindle) configureOwner() error { 333 cfgOwner := s.cfg.Server.Owner 334 335 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 336 if err != nil { 337 return err 338 } 339 340 switch len(existing) { 341 case 0: 342 // no owner configured, continue 343 case 1: 344 // find existing owner 345 existingOwner := existing[0] 346 347 // no ownership change, this is okay 348 if existingOwner == s.cfg.Server.Owner { 349 break 350 } 351 352 // remove existing owner 353 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 354 if err != nil { 355 return nil 356 } 357 default: 358 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 359 } 360 361 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 362}