forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 11 "github.com/go-chi/chi/v5" 12 "tangled.sh/tangled.sh/core/api/tangled" 13 "tangled.sh/tangled.sh/core/eventconsumer" 14 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 15 "tangled.sh/tangled.sh/core/idresolver" 16 "tangled.sh/tangled.sh/core/jetstream" 17 "tangled.sh/tangled.sh/core/log" 18 "tangled.sh/tangled.sh/core/notifier" 19 "tangled.sh/tangled.sh/core/rbac" 20 "tangled.sh/tangled.sh/core/spindle/config" 21 "tangled.sh/tangled.sh/core/spindle/db" 22 "tangled.sh/tangled.sh/core/spindle/engine" 23 "tangled.sh/tangled.sh/core/spindle/models" 24 "tangled.sh/tangled.sh/core/spindle/queue" 25 "tangled.sh/tangled.sh/core/spindle/secrets" 26 "tangled.sh/tangled.sh/core/spindle/xrpc" 27) 28 29//go:embed motd 30var motd []byte 31 32const ( 33 rbacDomain = "thisserver" 34) 35 36type Spindle struct { 37 jc *jetstream.JetstreamClient 38 db *db.DB 39 e *rbac.Enforcer 40 l *slog.Logger 41 n *notifier.Notifier 42 eng *engine.Engine 43 jq *queue.Queue 44 cfg *config.Config 45 ks *eventconsumer.Consumer 46 res *idresolver.Resolver 47 vault secrets.Manager 48} 49 50func Run(ctx context.Context) error { 51 logger := log.FromContext(ctx) 52 53 cfg, err := config.Load(ctx) 54 if err != nil { 55 return fmt.Errorf("failed to load config: %w", err) 56 } 57 58 d, err := db.Make(cfg.Server.DBPath) 59 if err != nil { 60 return fmt.Errorf("failed to setup db: %w", err) 61 } 62 63 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 64 if err != nil { 65 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 66 } 67 e.E.EnableAutoSave(true) 68 69 n := notifier.New() 70 71 var vault secrets.Manager 72 switch cfg.Server.Secrets.Provider { 73 case "openbao": 74 if cfg.Server.Secrets.OpenBao.Addr == "" { 75 return fmt.Errorf("openbao address is required when using openbao secrets provider") 76 } 77 if cfg.Server.Secrets.OpenBao.RoleID == "" { 78 return fmt.Errorf("openbao role_id is required when using openbao secrets provider") 79 } 80 if cfg.Server.Secrets.OpenBao.SecretID == "" { 81 return fmt.Errorf("openbao secret_id is required when using openbao secrets provider") 82 } 83 vault, err = secrets.NewOpenBaoManager( 84 cfg.Server.Secrets.OpenBao.Addr, 85 cfg.Server.Secrets.OpenBao.RoleID, 86 cfg.Server.Secrets.OpenBao.SecretID, 87 logger, 88 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 89 ) 90 if err != nil { 91 return fmt.Errorf("failed to setup openbao secrets provider: %w", err) 92 } 93 logger.Info("using openbao secrets provider", "address", cfg.Server.Secrets.OpenBao.Addr, "mount", cfg.Server.Secrets.OpenBao.Mount) 94 case "sqlite", "": 95 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 96 if err != nil { 97 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 98 } 99 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 100 default: 101 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 102 } 103 104 eng, err := engine.New(ctx, cfg, d, &n, vault) 105 if err != nil { 106 return err 107 } 108 109 jq := queue.NewQueue(100, 2) 110 111 collections := []string{ 112 tangled.SpindleMemberNSID, 113 tangled.RepoNSID, 114 tangled.RepoCollaboratorNSID, 115 } 116 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true) 117 if err != nil { 118 return fmt.Errorf("failed to setup jetstream client: %w", err) 119 } 120 jc.AddDid(cfg.Server.Owner) 121 122 resolver := idresolver.DefaultResolver() 123 124 spindle := Spindle{ 125 jc: jc, 126 e: e, 127 db: d, 128 l: logger, 129 n: &n, 130 eng: eng, 131 jq: jq, 132 cfg: cfg, 133 res: resolver, 134 vault: vault, 135 } 136 137 err = e.AddSpindle(rbacDomain) 138 if err != nil { 139 return fmt.Errorf("failed to set rbac domain: %w", err) 140 } 141 err = spindle.configureOwner() 142 if err != nil { 143 return err 144 } 145 logger.Info("owner set", "did", cfg.Server.Owner) 146 147 // starts a job queue runner in the background 148 jq.Start() 149 defer jq.Stop() 150 151 // Stop vault token renewal if it implements Stopper 152 if stopper, ok := vault.(secrets.Stopper); ok { 153 defer stopper.Stop() 154 } 155 156 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 157 if err != nil { 158 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 159 } 160 161 err = jc.StartJetstream(ctx, spindle.ingest()) 162 if err != nil { 163 return fmt.Errorf("failed to start jetstream consumer: %w", err) 164 } 165 166 // for each incoming sh.tangled.pipeline, we execute 167 // spindle.processPipeline, which in turn enqueues the pipeline 168 // job in the above registered queue. 169 ccfg := eventconsumer.NewConsumerConfig() 170 ccfg.Logger = logger 171 ccfg.Dev = cfg.Server.Dev 172 ccfg.ProcessFunc = spindle.processPipeline 173 ccfg.CursorStore = cursorStore 174 knownKnots, err := d.Knots() 175 if err != nil { 176 return err 177 } 178 for _, knot := range knownKnots { 179 logger.Info("adding source start", "knot", knot) 180 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 181 } 182 spindle.ks = eventconsumer.NewConsumer(*ccfg) 183 184 go func() { 185 logger.Info("starting knot event consumer") 186 spindle.ks.Start(ctx) 187 }() 188 189 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 190 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 191 192 return nil 193} 194 195func (s *Spindle) Router() http.Handler { 196 mux := chi.NewRouter() 197 198 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 199 w.Write(motd) 200 }) 201 mux.HandleFunc("/events", s.Events) 202 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) { 203 w.Write([]byte(s.cfg.Server.Owner)) 204 }) 205 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 206 207 mux.Mount("/xrpc", s.XrpcRouter()) 208 return mux 209} 210 211func (s *Spindle) XrpcRouter() http.Handler { 212 logger := s.l.With("route", "xrpc") 213 214 x := xrpc.Xrpc{ 215 Logger: logger, 216 Db: s.db, 217 Enforcer: s.e, 218 Engine: s.eng, 219 Config: s.cfg, 220 Resolver: s.res, 221 Vault: s.vault, 222 } 223 224 return x.Router() 225} 226 227func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 228 if msg.Nsid == tangled.PipelineNSID { 229 tpl := tangled.Pipeline{} 230 err := json.Unmarshal(msg.EventJson, &tpl) 231 if err != nil { 232 fmt.Println("error unmarshalling", err) 233 return err 234 } 235 236 if tpl.TriggerMetadata == nil { 237 return fmt.Errorf("no trigger metadata found") 238 } 239 240 if tpl.TriggerMetadata.Repo == nil { 241 return fmt.Errorf("no repo data found") 242 } 243 244 // filter by repos 245 _, err = s.db.GetRepo( 246 tpl.TriggerMetadata.Repo.Knot, 247 tpl.TriggerMetadata.Repo.Did, 248 tpl.TriggerMetadata.Repo.Repo, 249 ) 250 if err != nil { 251 return err 252 } 253 254 pipelineId := models.PipelineId{ 255 Knot: src.Key(), 256 Rkey: msg.Rkey, 257 } 258 259 for _, w := range tpl.Workflows { 260 if w != nil { 261 err := s.db.StatusPending(models.WorkflowId{ 262 PipelineId: pipelineId, 263 Name: w.Name, 264 }, s.n) 265 if err != nil { 266 return err 267 } 268 } 269 } 270 271 spl := models.ToPipeline(tpl, *s.cfg) 272 273 ok := s.jq.Enqueue(queue.Job{ 274 Run: func() error { 275 s.eng.StartWorkflows(ctx, spl, pipelineId) 276 return nil 277 }, 278 OnFail: func(jobError error) { 279 s.l.Error("pipeline run failed", "error", jobError) 280 }, 281 }) 282 if ok { 283 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 284 } else { 285 s.l.Error("failed to enqueue pipeline: queue is full") 286 } 287 } 288 289 return nil 290} 291 292func (s *Spindle) configureOwner() error { 293 cfgOwner := s.cfg.Server.Owner 294 295 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 296 if err != nil { 297 return err 298 } 299 300 switch len(existing) { 301 case 0: 302 // no owner configured, continue 303 case 1: 304 // find existing owner 305 existingOwner := existing[0] 306 307 // no ownership change, this is okay 308 if existingOwner == s.cfg.Server.Owner { 309 break 310 } 311 312 // remove existing owner 313 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 314 if err != nil { 315 return nil 316 } 317 default: 318 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 319 } 320 321 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 322}