forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 11 "github.com/go-chi/chi/v5" 12 "tangled.sh/tangled.sh/core/api/tangled" 13 "tangled.sh/tangled.sh/core/eventconsumer" 14 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 15 "tangled.sh/tangled.sh/core/idresolver" 16 "tangled.sh/tangled.sh/core/jetstream" 17 "tangled.sh/tangled.sh/core/log" 18 "tangled.sh/tangled.sh/core/notifier" 19 "tangled.sh/tangled.sh/core/rbac" 20 "tangled.sh/tangled.sh/core/spindle/config" 21 "tangled.sh/tangled.sh/core/spindle/db" 22 "tangled.sh/tangled.sh/core/spindle/engine" 23 "tangled.sh/tangled.sh/core/spindle/models" 24 "tangled.sh/tangled.sh/core/spindle/queue" 25 "tangled.sh/tangled.sh/core/spindle/secrets" 26 "tangled.sh/tangled.sh/core/spindle/xrpc" 27) 28 29//go:embed motd 30var motd []byte 31 32const ( 33 rbacDomain = "thisserver" 34) 35 36type Spindle struct { 37 jc *jetstream.JetstreamClient 38 db *db.DB 39 e *rbac.Enforcer 40 l *slog.Logger 41 n *notifier.Notifier 42 eng *engine.Engine 43 jq *queue.Queue 44 cfg *config.Config 45 ks *eventconsumer.Consumer 46 res *idresolver.Resolver 47 vault secrets.Manager 48} 49 50func Run(ctx context.Context) error { 51 logger := log.FromContext(ctx) 52 53 cfg, err := config.Load(ctx) 54 if err != nil { 55 return fmt.Errorf("failed to load config: %w", err) 56 } 57 58 d, err := db.Make(cfg.Server.DBPath) 59 if err != nil { 60 return fmt.Errorf("failed to setup db: %w", err) 61 } 62 63 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 64 if err != nil { 65 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 66 } 67 e.E.EnableAutoSave(true) 68 69 n := notifier.New() 70 71 var vault secrets.Manager 72 switch cfg.Server.Secrets.Provider { 73 case "openbao": 74 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 75 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 76 } 77 vault, err = secrets.NewOpenBaoManager( 78 cfg.Server.Secrets.OpenBao.ProxyAddr, 79 logger, 80 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 81 ) 82 if err != nil { 83 return fmt.Errorf("failed to setup openbao secrets provider: %w", err) 84 } 85 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 86 case "sqlite", "": 87 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 88 if err != nil { 89 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 90 } 91 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 92 default: 93 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 94 } 95 96 eng, err := engine.New(ctx, cfg, d, &n, vault) 97 if err != nil { 98 return err 99 } 100 101 jq := queue.NewQueue(100, 2) 102 103 collections := []string{ 104 tangled.SpindleMemberNSID, 105 tangled.RepoNSID, 106 } 107 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true) 108 if err != nil { 109 return fmt.Errorf("failed to setup jetstream client: %w", err) 110 } 111 jc.AddDid(cfg.Server.Owner) 112 113 resolver := idresolver.DefaultResolver() 114 115 spindle := Spindle{ 116 jc: jc, 117 e: e, 118 db: d, 119 l: logger, 120 n: &n, 121 eng: eng, 122 jq: jq, 123 cfg: cfg, 124 res: resolver, 125 vault: vault, 126 } 127 128 err = e.AddSpindle(rbacDomain) 129 if err != nil { 130 return fmt.Errorf("failed to set rbac domain: %w", err) 131 } 132 err = spindle.configureOwner() 133 if err != nil { 134 return err 135 } 136 logger.Info("owner set", "did", cfg.Server.Owner) 137 138 // starts a job queue runner in the background 139 jq.Start() 140 defer jq.Stop() 141 142 // Stop vault token renewal if it implements Stopper 143 if stopper, ok := vault.(secrets.Stopper); ok { 144 defer stopper.Stop() 145 } 146 147 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 148 if err != nil { 149 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 150 } 151 152 err = jc.StartJetstream(ctx, spindle.ingest()) 153 if err != nil { 154 return fmt.Errorf("failed to start jetstream consumer: %w", err) 155 } 156 157 // for each incoming sh.tangled.pipeline, we execute 158 // spindle.processPipeline, which in turn enqueues the pipeline 159 // job in the above registered queue. 160 ccfg := eventconsumer.NewConsumerConfig() 161 ccfg.Logger = logger 162 ccfg.Dev = cfg.Server.Dev 163 ccfg.ProcessFunc = spindle.processPipeline 164 ccfg.CursorStore = cursorStore 165 knownKnots, err := d.Knots() 166 if err != nil { 167 return err 168 } 169 for _, knot := range knownKnots { 170 logger.Info("adding source start", "knot", knot) 171 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 172 } 173 spindle.ks = eventconsumer.NewConsumer(*ccfg) 174 175 go func() { 176 logger.Info("starting knot event consumer") 177 spindle.ks.Start(ctx) 178 }() 179 180 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 181 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 182 183 return nil 184} 185 186func (s *Spindle) Router() http.Handler { 187 mux := chi.NewRouter() 188 189 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 190 w.Write(motd) 191 }) 192 mux.HandleFunc("/events", s.Events) 193 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) { 194 w.Write([]byte(s.cfg.Server.Owner)) 195 }) 196 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 197 198 mux.Mount("/xrpc", s.XrpcRouter()) 199 return mux 200} 201 202func (s *Spindle) XrpcRouter() http.Handler { 203 logger := s.l.With("route", "xrpc") 204 205 x := xrpc.Xrpc{ 206 Logger: logger, 207 Db: s.db, 208 Enforcer: s.e, 209 Engine: s.eng, 210 Config: s.cfg, 211 Resolver: s.res, 212 Vault: s.vault, 213 } 214 215 return x.Router() 216} 217 218func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 219 if msg.Nsid == tangled.PipelineNSID { 220 tpl := tangled.Pipeline{} 221 err := json.Unmarshal(msg.EventJson, &tpl) 222 if err != nil { 223 fmt.Println("error unmarshalling", err) 224 return err 225 } 226 227 if tpl.TriggerMetadata == nil { 228 return fmt.Errorf("no trigger metadata found") 229 } 230 231 if tpl.TriggerMetadata.Repo == nil { 232 return fmt.Errorf("no repo data found") 233 } 234 235 // filter by repos 236 _, err = s.db.GetRepo( 237 tpl.TriggerMetadata.Repo.Knot, 238 tpl.TriggerMetadata.Repo.Did, 239 tpl.TriggerMetadata.Repo.Repo, 240 ) 241 if err != nil { 242 return err 243 } 244 245 pipelineId := models.PipelineId{ 246 Knot: src.Key(), 247 Rkey: msg.Rkey, 248 } 249 250 for _, w := range tpl.Workflows { 251 if w != nil { 252 err := s.db.StatusPending(models.WorkflowId{ 253 PipelineId: pipelineId, 254 Name: w.Name, 255 }, s.n) 256 if err != nil { 257 return err 258 } 259 } 260 } 261 262 spl := models.ToPipeline(tpl, *s.cfg) 263 264 ok := s.jq.Enqueue(queue.Job{ 265 Run: func() error { 266 s.eng.StartWorkflows(ctx, spl, pipelineId) 267 return nil 268 }, 269 OnFail: func(jobError error) { 270 s.l.Error("pipeline run failed", "error", jobError) 271 }, 272 }) 273 if ok { 274 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 275 } else { 276 s.l.Error("failed to enqueue pipeline: queue is full") 277 } 278 } 279 280 return nil 281} 282 283func (s *Spindle) configureOwner() error { 284 cfgOwner := s.cfg.Server.Owner 285 286 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 287 if err != nil { 288 return err 289 } 290 291 switch len(existing) { 292 case 0: 293 // no owner configured, continue 294 case 1: 295 // find existing owner 296 existingOwner := existing[0] 297 298 // no ownership change, this is okay 299 if existingOwner == s.cfg.Server.Owner { 300 break 301 } 302 303 // remove existing owner 304 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 305 if err != nil { 306 return nil 307 } 308 default: 309 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 310 } 311 312 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 313}