forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle 2 3import ( 4 "encoding/json" 5 "fmt" 6 "log/slog" 7 "net/http" 8 9 "golang.org/x/net/context" 10 "tangled.sh/tangled.sh/core/api/tangled" 11 "tangled.sh/tangled.sh/core/jetstream" 12 "tangled.sh/tangled.sh/core/knotclient" 13 "tangled.sh/tangled.sh/core/knotserver/notifier" 14 "tangled.sh/tangled.sh/core/log" 15 "tangled.sh/tangled.sh/core/rbac" 16 "tangled.sh/tangled.sh/core/spindle/config" 17 "tangled.sh/tangled.sh/core/spindle/db" 18 "tangled.sh/tangled.sh/core/spindle/engine" 19) 20 21type Spindle struct { 22 jc *jetstream.JetstreamClient 23 db *db.DB 24 e *rbac.Enforcer 25 l *slog.Logger 26 n *notifier.Notifier 27 eng *engine.Engine 28} 29 30func Run(ctx context.Context) error { 31 cfg, err := config.Load(ctx) 32 if err != nil { 33 return fmt.Errorf("failed to load config: %w", err) 34 } 35 36 d, err := db.Make(cfg.Server.DBPath) 37 if err != nil { 38 return fmt.Errorf("failed to setup db: %w", err) 39 } 40 41 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 42 if err != nil { 43 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 44 } 45 46 logger := log.FromContext(ctx) 47 48 collections := []string{tangled.SpindleMemberNSID} 49 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false) 50 if err != nil { 51 return fmt.Errorf("failed to setup jetstream client: %w", err) 52 } 53 54 n := notifier.New() 55 56 eng, err := engine.New(ctx, d) 57 if err != nil { 58 return err 59 } 60 61 spindle := Spindle{ 62 jc: jc, 63 e: e, 64 db: d, 65 l: logger, 66 n: &n, 67 eng: eng, 68 } 69 70 go func() { 71 logger.Info("starting event consumer") 72 knotEventSource := knotclient.NewEventSource("localhost:5555") 73 ccfg := knotclient.ConsumerConfig{ 74 Logger: logger, 75 ProcessFunc: spindle.exec, 76 } 77 ccfg.AddEventSource(knotEventSource) 78 79 ec := knotclient.NewEventConsumer(ccfg) 80 81 ec.Start(ctx) 82 }() 83 84 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 85 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 86 87 return nil 88} 89 90func (s *Spindle) Router() http.Handler { 91 mux := &http.ServeMux{} 92 93 mux.HandleFunc("/events", s.Events) 94 return mux 95} 96 97func (s *Spindle) exec(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error { 98 pipeline := tangled.Pipeline{} 99 err := json.Unmarshal(msg.EventJson, &pipeline) 100 if err != nil { 101 fmt.Println("error unmarshalling", err) 102 return err 103 } 104 105 if msg.Nsid == tangled.PipelineNSID { 106 err = s.eng.SetupPipeline(ctx, &pipeline, msg.Rkey) 107 if err != nil { 108 return err 109 } 110 err = s.eng.StartWorkflows(ctx, &pipeline, msg.Rkey) 111 if err != nil { 112 return err 113 } 114 } 115 116 return nil 117}