forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 11 "github.com/go-chi/chi/v5" 12 "tangled.sh/tangled.sh/core/api/tangled" 13 "tangled.sh/tangled.sh/core/eventconsumer" 14 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 15 "tangled.sh/tangled.sh/core/idresolver" 16 "tangled.sh/tangled.sh/core/jetstream" 17 "tangled.sh/tangled.sh/core/log" 18 "tangled.sh/tangled.sh/core/notifier" 19 "tangled.sh/tangled.sh/core/rbac" 20 "tangled.sh/tangled.sh/core/spindle/config" 21 "tangled.sh/tangled.sh/core/spindle/db" 22 "tangled.sh/tangled.sh/core/spindle/engine" 23 "tangled.sh/tangled.sh/core/spindle/models" 24 "tangled.sh/tangled.sh/core/spindle/queue" 25 "tangled.sh/tangled.sh/core/spindle/secrets" 26 "tangled.sh/tangled.sh/core/spindle/xrpc" 27) 28 29//go:embed motd 30var motd []byte 31 32const ( 33 rbacDomain = "thisserver" 34) 35 36type Spindle struct { 37 jc *jetstream.JetstreamClient 38 db *db.DB 39 e *rbac.Enforcer 40 l *slog.Logger 41 n *notifier.Notifier 42 eng *engine.Engine 43 jq *queue.Queue 44 cfg *config.Config 45 ks *eventconsumer.Consumer 46 res *idresolver.Resolver 47 vault secrets.Manager 48} 49 50func Run(ctx context.Context) error { 51 logger := log.FromContext(ctx) 52 53 cfg, err := config.Load(ctx) 54 if err != nil { 55 return fmt.Errorf("failed to load config: %w", err) 56 } 57 58 d, err := db.Make(cfg.Server.DBPath) 59 if err != nil { 60 return fmt.Errorf("failed to setup db: %w", err) 61 } 62 63 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 64 if err != nil { 65 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 66 } 67 e.E.EnableAutoSave(true) 68 69 n := notifier.New() 70 71 // TODO: add hashicorp vault provider and choose here 72 vault, err := secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 73 if err != nil { 74 return fmt.Errorf("failed to setup secrets provider: %w", err) 75 } 76 77 eng, err := engine.New(ctx, cfg, d, &n, vault) 78 if err != nil { 79 return err 80 } 81 82 jq := queue.NewQueue(100, 2) 83 84 collections := []string{ 85 tangled.SpindleMemberNSID, 86 tangled.RepoNSID, 87 } 88 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true) 89 if err != nil { 90 return fmt.Errorf("failed to setup jetstream client: %w", err) 91 } 92 jc.AddDid(cfg.Server.Owner) 93 94 resolver := idresolver.DefaultResolver() 95 96 spindle := Spindle{ 97 jc: jc, 98 e: e, 99 db: d, 100 l: logger, 101 n: &n, 102 eng: eng, 103 jq: jq, 104 cfg: cfg, 105 res: resolver, 106 vault: vault, 107 } 108 109 err = e.AddSpindle(rbacDomain) 110 if err != nil { 111 return fmt.Errorf("failed to set rbac domain: %w", err) 112 } 113 err = spindle.configureOwner() 114 if err != nil { 115 return err 116 } 117 logger.Info("owner set", "did", cfg.Server.Owner) 118 119 // starts a job queue runner in the background 120 jq.Start() 121 defer jq.Stop() 122 123 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 124 if err != nil { 125 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 126 } 127 128 err = jc.StartJetstream(ctx, spindle.ingest()) 129 if err != nil { 130 return fmt.Errorf("failed to start jetstream consumer: %w", err) 131 } 132 133 // for each incoming sh.tangled.pipeline, we execute 134 // spindle.processPipeline, which in turn enqueues the pipeline 135 // job in the above registered queue. 136 ccfg := eventconsumer.NewConsumerConfig() 137 ccfg.Logger = logger 138 ccfg.Dev = cfg.Server.Dev 139 ccfg.ProcessFunc = spindle.processPipeline 140 ccfg.CursorStore = cursorStore 141 knownKnots, err := d.Knots() 142 if err != nil { 143 return err 144 } 145 for _, knot := range knownKnots { 146 logger.Info("adding source start", "knot", knot) 147 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 148 } 149 spindle.ks = eventconsumer.NewConsumer(*ccfg) 150 151 go func() { 152 logger.Info("starting knot event consumer") 153 spindle.ks.Start(ctx) 154 }() 155 156 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 157 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 158 159 return nil 160} 161 162func (s *Spindle) Router() http.Handler { 163 mux := chi.NewRouter() 164 165 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 166 w.Write(motd) 167 }) 168 mux.HandleFunc("/events", s.Events) 169 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) { 170 w.Write([]byte(s.cfg.Server.Owner)) 171 }) 172 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 173 174 mux.Mount("/xrpc", s.XrpcRouter()) 175 return mux 176} 177 178func (s *Spindle) XrpcRouter() http.Handler { 179 logger := s.l.With("route", "xrpc") 180 181 x := xrpc.Xrpc{ 182 Logger: logger, 183 Db: s.db, 184 Enforcer: s.e, 185 Engine: s.eng, 186 Config: s.cfg, 187 Resolver: s.res, 188 Vault: s.vault, 189 } 190 191 return x.Router() 192} 193 194func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 195 if msg.Nsid == tangled.PipelineNSID { 196 tpl := tangled.Pipeline{} 197 err := json.Unmarshal(msg.EventJson, &tpl) 198 if err != nil { 199 fmt.Println("error unmarshalling", err) 200 return err 201 } 202 203 if tpl.TriggerMetadata == nil { 204 return fmt.Errorf("no trigger metadata found") 205 } 206 207 if tpl.TriggerMetadata.Repo == nil { 208 return fmt.Errorf("no repo data found") 209 } 210 211 // filter by repos 212 _, err = s.db.GetRepo( 213 tpl.TriggerMetadata.Repo.Knot, 214 tpl.TriggerMetadata.Repo.Did, 215 tpl.TriggerMetadata.Repo.Repo, 216 ) 217 if err != nil { 218 return err 219 } 220 221 pipelineId := models.PipelineId{ 222 Knot: src.Key(), 223 Rkey: msg.Rkey, 224 } 225 226 for _, w := range tpl.Workflows { 227 if w != nil { 228 err := s.db.StatusPending(models.WorkflowId{ 229 PipelineId: pipelineId, 230 Name: w.Name, 231 }, s.n) 232 if err != nil { 233 return err 234 } 235 } 236 } 237 238 spl := models.ToPipeline(tpl, *s.cfg) 239 240 ok := s.jq.Enqueue(queue.Job{ 241 Run: func() error { 242 s.eng.StartWorkflows(ctx, spl, pipelineId) 243 return nil 244 }, 245 OnFail: func(jobError error) { 246 s.l.Error("pipeline run failed", "error", jobError) 247 }, 248 }) 249 if ok { 250 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 251 } else { 252 s.l.Error("failed to enqueue pipeline: queue is full") 253 } 254 } 255 256 return nil 257} 258 259func (s *Spindle) configureOwner() error { 260 cfgOwner := s.cfg.Server.Owner 261 262 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 263 if err != nil { 264 return err 265 } 266 267 switch len(existing) { 268 case 0: 269 // no owner configured, continue 270 case 1: 271 // find existing owner 272 existingOwner := existing[0] 273 274 // no ownership change, this is okay 275 if existingOwner == s.cfg.Server.Owner { 276 break 277 } 278 279 // remove existing owner 280 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 281 if err != nil { 282 return nil 283 } 284 default: 285 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 286 } 287 288 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 289}