forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "encoding/json" 5 "fmt" 6 "log/slog" 7 "net/http" 8 9 "github.com/go-chi/chi/v5" 10 "golang.org/x/net/context" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/jetstream" 13 "tangled.sh/tangled.sh/core/knotclient" 14 "tangled.sh/tangled.sh/core/knotserver/notifier" 15 "tangled.sh/tangled.sh/core/log" 16 "tangled.sh/tangled.sh/core/rbac" 17 "tangled.sh/tangled.sh/core/spindle/config" 18 "tangled.sh/tangled.sh/core/spindle/db" 19 "tangled.sh/tangled.sh/core/spindle/engine" 20) 21 22type Spindle struct { 23 jc *jetstream.JetstreamClient 24 db *db.DB 25 e *rbac.Enforcer 26 l *slog.Logger 27 n *notifier.Notifier 28 eng *engine.Engine 29} 30 31func Run(ctx context.Context) error { 32 cfg, err := config.Load(ctx) 33 if err != nil { 34 return fmt.Errorf("failed to load config: %w", err) 35 } 36 37 d, err := db.Make(cfg.Server.DBPath) 38 if err != nil { 39 return fmt.Errorf("failed to setup db: %w", err) 40 } 41 42 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 43 if err != nil { 44 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 45 } 46 47 logger := log.FromContext(ctx) 48 49 collections := []string{tangled.SpindleMemberNSID} 50 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false) 51 if err != nil { 52 return fmt.Errorf("failed to setup jetstream client: %w", err) 53 } 54 55 n := notifier.New() 56 eng, err := engine.New(ctx, d, &n) 57 if err != nil { 58 return err 59 } 60 61 spindle := Spindle{ 62 jc: jc, 63 e: e, 64 db: d, 65 l: logger, 66 n: &n, 67 eng: eng, 68 } 69 70 go func() { 71 logger.Info("starting event consumer") 72 knotEventSource := knotclient.NewEventSource("localhost:5555") 73 74 ccfg := knotclient.NewConsumerConfig() 75 ccfg.Logger = logger 76 ccfg.Dev = cfg.Server.Dev 77 ccfg.ProcessFunc = spindle.exec 78 ccfg.AddEventSource(knotEventSource) 79 80 ec := knotclient.NewEventConsumer(*ccfg) 81 82 ec.Start(ctx) 83 }() 84 85 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 86 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 87 88 return nil 89} 90 91func (s *Spindle) Router() http.Handler { 92 mux := chi.NewRouter() 93 94 mux.HandleFunc("/events", s.Events) 95 mux.HandleFunc("/logs/{pipelineID}", s.Logs) 96 return mux 97} 98 99func (s *Spindle) exec(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error { 100 if msg.Nsid == tangled.PipelineNSID { 101 pipeline := tangled.Pipeline{} 102 err := json.Unmarshal(msg.EventJson, &pipeline) 103 if err != nil { 104 fmt.Println("error unmarshalling", err) 105 return err 106 } 107 108 // this is a "fake" at uri for now 109 pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey) 110 111 rkey := TID() 112 err = s.eng.SetupPipeline(ctx, &pipeline, pipelineAtUri, rkey) 113 if err != nil { 114 return err 115 } 116 err = s.eng.StartWorkflows(ctx, &pipeline, rkey) 117 if err != nil { 118 return err 119 } 120 } 121 122 return nil 123}