forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 11 "github.com/go-chi/chi/v5" 12 "tangled.sh/tangled.sh/core/api/tangled" 13 "tangled.sh/tangled.sh/core/eventconsumer" 14 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 15 "tangled.sh/tangled.sh/core/idresolver" 16 "tangled.sh/tangled.sh/core/jetstream" 17 "tangled.sh/tangled.sh/core/log" 18 "tangled.sh/tangled.sh/core/notifier" 19 "tangled.sh/tangled.sh/core/rbac" 20 "tangled.sh/tangled.sh/core/spindle/config" 21 "tangled.sh/tangled.sh/core/spindle/db" 22 "tangled.sh/tangled.sh/core/spindle/engine" 23 "tangled.sh/tangled.sh/core/spindle/models" 24 "tangled.sh/tangled.sh/core/spindle/queue" 25 "tangled.sh/tangled.sh/core/spindle/secrets" 26 "tangled.sh/tangled.sh/core/spindle/xrpc" 27) 28 29//go:embed motd 30var motd []byte 31 32const ( 33 rbacDomain = "thisserver" 34) 35 36type Spindle struct { 37 jc *jetstream.JetstreamClient 38 db *db.DB 39 e *rbac.Enforcer 40 l *slog.Logger 41 n *notifier.Notifier 42 eng *engine.Engine 43 jq *queue.Queue 44 cfg *config.Config 45 ks *eventconsumer.Consumer 46 res *idresolver.Resolver 47 vault secrets.Manager 48} 49 50func Run(ctx context.Context) error { 51 logger := log.FromContext(ctx) 52 53 cfg, err := config.Load(ctx) 54 if err != nil { 55 return fmt.Errorf("failed to load config: %w", err) 56 } 57 58 d, err := db.Make(cfg.Server.DBPath) 59 if err != nil { 60 return fmt.Errorf("failed to setup db: %w", err) 61 } 62 63 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 64 if err != nil { 65 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 66 } 67 e.E.EnableAutoSave(true) 68 69 n := notifier.New() 70 71 var vault secrets.Manager 72 switch cfg.Server.Secrets.Provider { 73 case "openbao": 74 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 75 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 76 } 77 vault, err = secrets.NewOpenBaoManager( 78 cfg.Server.Secrets.OpenBao.ProxyAddr, 79 logger, 80 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 81 ) 82 if err != nil { 83 return fmt.Errorf("failed to setup openbao secrets provider: %w", err) 84 } 85 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 86 case "sqlite", "": 87 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 88 if err != nil { 89 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 90 } 91 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 92 default: 93 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 94 } 95 96 eng, err := engine.New(ctx, cfg, d, &n, vault) 97 if err != nil { 98 return err 99 } 100 101 jq := queue.NewQueue(100, 5) 102 103 collections := []string{ 104 tangled.SpindleMemberNSID, 105 tangled.RepoNSID, 106 tangled.RepoCollaboratorNSID, 107 } 108 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true) 109 if err != nil { 110 return fmt.Errorf("failed to setup jetstream client: %w", err) 111 } 112 jc.AddDid(cfg.Server.Owner) 113 114 // Check if the spindle knows about any Dids; 115 dids, err := d.GetAllDids() 116 if err != nil { 117 return fmt.Errorf("failed to get all dids: %w", err) 118 } 119 for _, d := range dids { 120 jc.AddDid(d) 121 } 122 123 resolver := idresolver.DefaultResolver() 124 125 spindle := Spindle{ 126 jc: jc, 127 e: e, 128 db: d, 129 l: logger, 130 n: &n, 131 eng: eng, 132 jq: jq, 133 cfg: cfg, 134 res: resolver, 135 vault: vault, 136 } 137 138 err = e.AddSpindle(rbacDomain) 139 if err != nil { 140 return fmt.Errorf("failed to set rbac domain: %w", err) 141 } 142 err = spindle.configureOwner() 143 if err != nil { 144 return err 145 } 146 logger.Info("owner set", "did", cfg.Server.Owner) 147 148 // starts a job queue runner in the background 149 jq.Start() 150 defer jq.Stop() 151 152 // Stop vault token renewal if it implements Stopper 153 if stopper, ok := vault.(secrets.Stopper); ok { 154 defer stopper.Stop() 155 } 156 157 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 158 if err != nil { 159 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 160 } 161 162 err = jc.StartJetstream(ctx, spindle.ingest()) 163 if err != nil { 164 return fmt.Errorf("failed to start jetstream consumer: %w", err) 165 } 166 167 // for each incoming sh.tangled.pipeline, we execute 168 // spindle.processPipeline, which in turn enqueues the pipeline 169 // job in the above registered queue. 170 ccfg := eventconsumer.NewConsumerConfig() 171 ccfg.Logger = logger 172 ccfg.Dev = cfg.Server.Dev 173 ccfg.ProcessFunc = spindle.processPipeline 174 ccfg.CursorStore = cursorStore 175 knownKnots, err := d.Knots() 176 if err != nil { 177 return err 178 } 179 for _, knot := range knownKnots { 180 logger.Info("adding source start", "knot", knot) 181 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 182 } 183 spindle.ks = eventconsumer.NewConsumer(*ccfg) 184 185 go func() { 186 logger.Info("starting knot event consumer") 187 spindle.ks.Start(ctx) 188 }() 189 190 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 191 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 192 193 return nil 194} 195 196func (s *Spindle) Router() http.Handler { 197 mux := chi.NewRouter() 198 199 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 200 w.Write(motd) 201 }) 202 mux.HandleFunc("/events", s.Events) 203 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) { 204 w.Write([]byte(s.cfg.Server.Owner)) 205 }) 206 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 207 208 mux.Mount("/xrpc", s.XrpcRouter()) 209 return mux 210} 211 212func (s *Spindle) XrpcRouter() http.Handler { 213 logger := s.l.With("route", "xrpc") 214 215 x := xrpc.Xrpc{ 216 Logger: logger, 217 Db: s.db, 218 Enforcer: s.e, 219 Engine: s.eng, 220 Config: s.cfg, 221 Resolver: s.res, 222 Vault: s.vault, 223 } 224 225 return x.Router() 226} 227 228func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 229 if msg.Nsid == tangled.PipelineNSID { 230 tpl := tangled.Pipeline{} 231 err := json.Unmarshal(msg.EventJson, &tpl) 232 if err != nil { 233 fmt.Println("error unmarshalling", err) 234 return err 235 } 236 237 if tpl.TriggerMetadata == nil { 238 return fmt.Errorf("no trigger metadata found") 239 } 240 241 if tpl.TriggerMetadata.Repo == nil { 242 return fmt.Errorf("no repo data found") 243 } 244 245 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 246 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 247 } 248 249 // filter by repos 250 _, err = s.db.GetRepo( 251 tpl.TriggerMetadata.Repo.Knot, 252 tpl.TriggerMetadata.Repo.Did, 253 tpl.TriggerMetadata.Repo.Repo, 254 ) 255 if err != nil { 256 return err 257 } 258 259 pipelineId := models.PipelineId{ 260 Knot: src.Key(), 261 Rkey: msg.Rkey, 262 } 263 264 for _, w := range tpl.Workflows { 265 if w != nil { 266 err := s.db.StatusPending(models.WorkflowId{ 267 PipelineId: pipelineId, 268 Name: w.Name, 269 }, s.n) 270 if err != nil { 271 return err 272 } 273 } 274 } 275 276 spl := models.ToPipeline(tpl, *s.cfg) 277 278 ok := s.jq.Enqueue(queue.Job{ 279 Run: func() error { 280 s.eng.StartWorkflows(ctx, spl, pipelineId) 281 return nil 282 }, 283 OnFail: func(jobError error) { 284 s.l.Error("pipeline run failed", "error", jobError) 285 }, 286 }) 287 if ok { 288 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 289 } else { 290 s.l.Error("failed to enqueue pipeline: queue is full") 291 } 292 } 293 294 return nil 295} 296 297func (s *Spindle) configureOwner() error { 298 cfgOwner := s.cfg.Server.Owner 299 300 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 301 if err != nil { 302 return err 303 } 304 305 switch len(existing) { 306 case 0: 307 // no owner configured, continue 308 case 1: 309 // find existing owner 310 existingOwner := existing[0] 311 312 // no ownership change, this is okay 313 if existingOwner == s.cfg.Server.Owner { 314 break 315 } 316 317 // remove existing owner 318 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 319 if err != nil { 320 return nil 321 } 322 default: 323 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 324 } 325 326 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 327}