forked from tangled.org/core
this repo has no description
at knot-xrpc 8.2 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 11 "github.com/go-chi/chi/v5" 12 "tangled.sh/tangled.sh/core/api/tangled" 13 "tangled.sh/tangled.sh/core/eventconsumer" 14 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 15 "tangled.sh/tangled.sh/core/idresolver" 16 "tangled.sh/tangled.sh/core/jetstream" 17 "tangled.sh/tangled.sh/core/log" 18 "tangled.sh/tangled.sh/core/notifier" 19 "tangled.sh/tangled.sh/core/rbac" 20 "tangled.sh/tangled.sh/core/spindle/config" 21 "tangled.sh/tangled.sh/core/spindle/db" 22 "tangled.sh/tangled.sh/core/spindle/engine" 23 "tangled.sh/tangled.sh/core/spindle/models" 24 "tangled.sh/tangled.sh/core/spindle/queue" 25 "tangled.sh/tangled.sh/core/spindle/secrets" 26 "tangled.sh/tangled.sh/core/spindle/xrpc" 27 "tangled.sh/tangled.sh/core/xrpc/serviceauth" 28) 29 30//go:embed motd 31var motd []byte 32 33const ( 34 rbacDomain = "thisserver" 35) 36 37type Spindle struct { 38 jc *jetstream.JetstreamClient 39 db *db.DB 40 e *rbac.Enforcer 41 l *slog.Logger 42 n *notifier.Notifier 43 eng *engine.Engine 44 jq *queue.Queue 45 cfg *config.Config 46 ks *eventconsumer.Consumer 47 res *idresolver.Resolver 48 vault secrets.Manager 49} 50 51func Run(ctx context.Context) error { 52 logger := log.FromContext(ctx) 53 54 cfg, err := config.Load(ctx) 55 if err != nil { 56 return fmt.Errorf("failed to load config: %w", err) 57 } 58 59 d, err := db.Make(cfg.Server.DBPath) 60 if err != nil { 61 return fmt.Errorf("failed to setup db: %w", err) 62 } 63 64 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 65 if err != nil { 66 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 67 } 68 e.E.EnableAutoSave(true) 69 70 n := notifier.New() 71 72 var vault secrets.Manager 73 switch cfg.Server.Secrets.Provider { 74 case "openbao": 75 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 76 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 77 } 78 vault, err = secrets.NewOpenBaoManager( 79 cfg.Server.Secrets.OpenBao.ProxyAddr, 80 logger, 81 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 82 ) 83 if err != nil { 84 return fmt.Errorf("failed to setup openbao secrets provider: %w", err) 85 } 86 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 87 case "sqlite", "": 88 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 89 if err != nil { 90 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 91 } 92 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 93 default: 94 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 95 } 96 97 eng, err := engine.New(ctx, cfg, d, &n, vault) 98 if err != nil { 99 return err 100 } 101 102 jq := queue.NewQueue(100, 5) 103 104 collections := []string{ 105 tangled.SpindleMemberNSID, 106 tangled.RepoNSID, 107 tangled.RepoCollaboratorNSID, 108 } 109 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true) 110 if err != nil { 111 return fmt.Errorf("failed to setup jetstream client: %w", err) 112 } 113 jc.AddDid(cfg.Server.Owner) 114 115 // Check if the spindle knows about any Dids; 116 dids, err := d.GetAllDids() 117 if err != nil { 118 return fmt.Errorf("failed to get all dids: %w", err) 119 } 120 for _, d := range dids { 121 jc.AddDid(d) 122 } 123 124 resolver := idresolver.DefaultResolver() 125 126 spindle := Spindle{ 127 jc: jc, 128 e: e, 129 db: d, 130 l: logger, 131 n: &n, 132 eng: eng, 133 jq: jq, 134 cfg: cfg, 135 res: resolver, 136 vault: vault, 137 } 138 139 err = e.AddSpindle(rbacDomain) 140 if err != nil { 141 return fmt.Errorf("failed to set rbac domain: %w", err) 142 } 143 err = spindle.configureOwner() 144 if err != nil { 145 return err 146 } 147 logger.Info("owner set", "did", cfg.Server.Owner) 148 149 // starts a job queue runner in the background 150 jq.Start() 151 defer jq.Stop() 152 153 // Stop vault token renewal if it implements Stopper 154 if stopper, ok := vault.(secrets.Stopper); ok { 155 defer stopper.Stop() 156 } 157 158 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 159 if err != nil { 160 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 161 } 162 163 err = jc.StartJetstream(ctx, spindle.ingest()) 164 if err != nil { 165 return fmt.Errorf("failed to start jetstream consumer: %w", err) 166 } 167 168 // for each incoming sh.tangled.pipeline, we execute 169 // spindle.processPipeline, which in turn enqueues the pipeline 170 // job in the above registered queue. 171 ccfg := eventconsumer.NewConsumerConfig() 172 ccfg.Logger = logger 173 ccfg.Dev = cfg.Server.Dev 174 ccfg.ProcessFunc = spindle.processPipeline 175 ccfg.CursorStore = cursorStore 176 knownKnots, err := d.Knots() 177 if err != nil { 178 return err 179 } 180 for _, knot := range knownKnots { 181 logger.Info("adding source start", "knot", knot) 182 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 183 } 184 spindle.ks = eventconsumer.NewConsumer(*ccfg) 185 186 go func() { 187 logger.Info("starting knot event consumer") 188 spindle.ks.Start(ctx) 189 }() 190 191 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 192 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 193 194 return nil 195} 196 197func (s *Spindle) Router() http.Handler { 198 mux := chi.NewRouter() 199 200 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 201 w.Write(motd) 202 }) 203 mux.HandleFunc("/events", s.Events) 204 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) { 205 w.Write([]byte(s.cfg.Server.Owner)) 206 }) 207 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 208 209 mux.Mount("/xrpc", s.XrpcRouter()) 210 return mux 211} 212 213func (s *Spindle) XrpcRouter() http.Handler { 214 logger := s.l.With("route", "xrpc") 215 216 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 217 218 x := xrpc.Xrpc{ 219 Logger: logger, 220 Db: s.db, 221 Enforcer: s.e, 222 Engine: s.eng, 223 Config: s.cfg, 224 Resolver: s.res, 225 Vault: s.vault, 226 ServiceAuth: serviceAuth, 227 } 228 229 return x.Router() 230} 231 232func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 233 if msg.Nsid == tangled.PipelineNSID { 234 tpl := tangled.Pipeline{} 235 err := json.Unmarshal(msg.EventJson, &tpl) 236 if err != nil { 237 fmt.Println("error unmarshalling", err) 238 return err 239 } 240 241 if tpl.TriggerMetadata == nil { 242 return fmt.Errorf("no trigger metadata found") 243 } 244 245 if tpl.TriggerMetadata.Repo == nil { 246 return fmt.Errorf("no repo data found") 247 } 248 249 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 250 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 251 } 252 253 // filter by repos 254 _, err = s.db.GetRepo( 255 tpl.TriggerMetadata.Repo.Knot, 256 tpl.TriggerMetadata.Repo.Did, 257 tpl.TriggerMetadata.Repo.Repo, 258 ) 259 if err != nil { 260 return err 261 } 262 263 pipelineId := models.PipelineId{ 264 Knot: src.Key(), 265 Rkey: msg.Rkey, 266 } 267 268 for _, w := range tpl.Workflows { 269 if w != nil { 270 err := s.db.StatusPending(models.WorkflowId{ 271 PipelineId: pipelineId, 272 Name: w.Name, 273 }, s.n) 274 if err != nil { 275 return err 276 } 277 } 278 } 279 280 spl := models.ToPipeline(tpl, *s.cfg) 281 282 ok := s.jq.Enqueue(queue.Job{ 283 Run: func() error { 284 s.eng.StartWorkflows(ctx, spl, pipelineId) 285 return nil 286 }, 287 OnFail: func(jobError error) { 288 s.l.Error("pipeline run failed", "error", jobError) 289 }, 290 }) 291 if ok { 292 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 293 } else { 294 s.l.Error("failed to enqueue pipeline: queue is full") 295 } 296 } 297 298 return nil 299} 300 301func (s *Spindle) configureOwner() error { 302 cfgOwner := s.cfg.Server.Owner 303 304 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 305 if err != nil { 306 return err 307 } 308 309 switch len(existing) { 310 case 0: 311 // no owner configured, continue 312 case 1: 313 // find existing owner 314 existingOwner := existing[0] 315 316 // no ownership change, this is okay 317 if existingOwner == s.cfg.Server.Owner { 318 break 319 } 320 321 // remove existing owner 322 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 323 if err != nil { 324 return nil 325 } 326 default: 327 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 328 } 329 330 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 331}