forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "net/http" 9 10 "github.com/go-chi/chi/v5" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/eventconsumer" 13 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 14 "tangled.sh/tangled.sh/core/jetstream" 15 "tangled.sh/tangled.sh/core/log" 16 "tangled.sh/tangled.sh/core/notifier" 17 "tangled.sh/tangled.sh/core/rbac" 18 "tangled.sh/tangled.sh/core/spindle/config" 19 "tangled.sh/tangled.sh/core/spindle/db" 20 "tangled.sh/tangled.sh/core/spindle/engine" 21 "tangled.sh/tangled.sh/core/spindle/models" 22 "tangled.sh/tangled.sh/core/spindle/queue" 23) 24 25const ( 26 rbacDomain = "thisserver" 27) 28 29type Spindle struct { 30 jc *jetstream.JetstreamClient 31 db *db.DB 32 e *rbac.Enforcer 33 l *slog.Logger 34 n *notifier.Notifier 35 eng *engine.Engine 36 jq *queue.Queue 37 cfg *config.Config 38 ks *eventconsumer.Consumer 39} 40 41func Run(ctx context.Context) error { 42 logger := log.FromContext(ctx) 43 44 cfg, err := config.Load(ctx) 45 if err != nil { 46 return fmt.Errorf("failed to load config: %w", err) 47 } 48 49 d, err := db.Make(cfg.Server.DBPath) 50 if err != nil { 51 return fmt.Errorf("failed to setup db: %w", err) 52 } 53 54 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 55 if err != nil { 56 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 57 } 58 e.E.EnableAutoSave(true) 59 60 n := notifier.New() 61 62 eng, err := engine.New(ctx, cfg, d, &n) 63 if err != nil { 64 return err 65 } 66 67 jq := queue.NewQueue(100, 2) 68 69 collections := []string{ 70 tangled.SpindleMemberNSID, 71 tangled.RepoNSID, 72 } 73 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true) 74 if err != nil { 75 return fmt.Errorf("failed to setup jetstream client: %w", err) 76 } 77 jc.AddDid(cfg.Server.Owner) 78 79 spindle := Spindle{ 80 jc: jc, 81 e: e, 82 db: d, 83 l: logger, 84 n: &n, 85 eng: eng, 86 jq: jq, 87 cfg: cfg, 88 } 89 90 err = e.AddSpindle(rbacDomain) 91 if err != nil { 92 return fmt.Errorf("failed to set rbac domain: %w", err) 93 } 94 err = spindle.configureOwner() 95 if err != nil { 96 return err 97 } 98 logger.Info("owner set", "did", cfg.Server.Owner) 99 100 // starts a job queue runner in the background 101 jq.Start() 102 defer jq.Stop() 103 104 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 105 if err != nil { 106 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 107 } 108 109 err = jc.StartJetstream(ctx, spindle.ingest()) 110 if err != nil { 111 return fmt.Errorf("failed to start jetstream consumer: %w", err) 112 } 113 114 // for each incoming sh.tangled.pipeline, we execute 115 // spindle.processPipeline, which in turn enqueues the pipeline 116 // job in the above registered queue. 117 ccfg := eventconsumer.NewConsumerConfig() 118 ccfg.Logger = logger 119 ccfg.Dev = cfg.Server.Dev 120 ccfg.ProcessFunc = spindle.processPipeline 121 ccfg.CursorStore = cursorStore 122 knownKnots, err := d.Knots() 123 if err != nil { 124 return err 125 } 126 for _, knot := range knownKnots { 127 logger.Info("adding source start", "knot", knot) 128 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 129 } 130 spindle.ks = eventconsumer.NewConsumer(*ccfg) 131 132 go func() { 133 logger.Info("starting knot event consumer") 134 spindle.ks.Start(ctx) 135 }() 136 137 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 138 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 139 140 return nil 141} 142 143func (s *Spindle) Router() http.Handler { 144 mux := chi.NewRouter() 145 146 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 147 w.Write([]byte( 148 ` **** 149 *** *** 150 *** ** ****** ** 151 ** * ***** 152 * ** ** 153 * * * *************** 154 ** ** *# ** 155 * ** ** *** ** 156 * * ** ** * ****** 157 * ** ** * ** * * 158 ** ** *** ** ** * 159 ** ** * ** * * 160 ** **** ** * * 161 ** *** ** ** ** 162 *** ** ***** 163 ******************** 164 ** 165 * 166 #************** 167 ** 168 ******** 169 170This is a spindle server. More info at https://tangled.sh/@tangled.sh/core/tree/master/docs/spindle`)) 171 }) 172 mux.HandleFunc("/events", s.Events) 173 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) { 174 w.Write([]byte(s.cfg.Server.Owner)) 175 }) 176 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 177 return mux 178} 179 180func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 181 if msg.Nsid == tangled.PipelineNSID { 182 tpl := tangled.Pipeline{} 183 err := json.Unmarshal(msg.EventJson, &tpl) 184 if err != nil { 185 fmt.Println("error unmarshalling", err) 186 return err 187 } 188 189 if tpl.TriggerMetadata == nil { 190 return fmt.Errorf("no trigger metadata found") 191 } 192 193 if tpl.TriggerMetadata.Repo == nil { 194 return fmt.Errorf("no repo data found") 195 } 196 197 // filter by repos 198 _, err = s.db.GetRepo( 199 tpl.TriggerMetadata.Repo.Knot, 200 tpl.TriggerMetadata.Repo.Did, 201 tpl.TriggerMetadata.Repo.Repo, 202 ) 203 if err != nil { 204 return err 205 } 206 207 pipelineId := models.PipelineId{ 208 Knot: src.Key(), 209 Rkey: msg.Rkey, 210 } 211 212 for _, w := range tpl.Workflows { 213 if w != nil { 214 err := s.db.StatusPending(models.WorkflowId{ 215 PipelineId: pipelineId, 216 Name: w.Name, 217 }, s.n) 218 if err != nil { 219 return err 220 } 221 } 222 } 223 224 spl := models.ToPipeline(tpl, *s.cfg) 225 226 ok := s.jq.Enqueue(queue.Job{ 227 Run: func() error { 228 s.eng.StartWorkflows(ctx, spl, pipelineId) 229 return nil 230 }, 231 OnFail: func(jobError error) { 232 s.l.Error("pipeline run failed", "error", jobError) 233 }, 234 }) 235 if ok { 236 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 237 } else { 238 s.l.Error("failed to enqueue pipeline: queue is full") 239 } 240 } 241 242 return nil 243} 244 245func (s *Spindle) configureOwner() error { 246 cfgOwner := s.cfg.Server.Owner 247 248 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 249 if err != nil { 250 return err 251 } 252 253 switch len(existing) { 254 case 0: 255 // no owner configured, continue 256 case 1: 257 // find existing owner 258 existingOwner := existing[0] 259 260 // no ownership change, this is okay 261 if existingOwner == s.cfg.Server.Owner { 262 break 263 } 264 265 // remove existing owner 266 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 267 if err != nil { 268 return nil 269 } 270 default: 271 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 272 } 273 274 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 275}