forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "net/http" 9 10 "github.com/go-chi/chi/v5" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/jetstream" 13 "tangled.sh/tangled.sh/core/knotclient" 14 "tangled.sh/tangled.sh/core/log" 15 "tangled.sh/tangled.sh/core/notifier" 16 "tangled.sh/tangled.sh/core/rbac" 17 "tangled.sh/tangled.sh/core/spindle/config" 18 "tangled.sh/tangled.sh/core/spindle/db" 19 "tangled.sh/tangled.sh/core/spindle/engine" 20 "tangled.sh/tangled.sh/core/spindle/queue" 21) 22 23type Spindle struct { 24 jc *jetstream.JetstreamClient 25 db *db.DB 26 e *rbac.Enforcer 27 l *slog.Logger 28 n *notifier.Notifier 29 eng *engine.Engine 30 jq *queue.Queue 31} 32 33func Run(ctx context.Context) error { 34 cfg, err := config.Load(ctx) 35 if err != nil { 36 return fmt.Errorf("failed to load config: %w", err) 37 } 38 39 d, err := db.Make(cfg.Server.DBPath) 40 if err != nil { 41 return fmt.Errorf("failed to setup db: %w", err) 42 } 43 44 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 45 if err != nil { 46 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 47 } 48 49 logger := log.FromContext(ctx) 50 51 collections := []string{tangled.SpindleMemberNSID} 52 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false) 53 if err != nil { 54 return fmt.Errorf("failed to setup jetstream client: %w", err) 55 } 56 57 n := notifier.New() 58 eng, err := engine.New(ctx, d, &n) 59 if err != nil { 60 return err 61 } 62 63 jq := queue.NewQueue(100) 64 65 // starts a job queue runner in the background 66 jq.StartRunner() 67 68 spindle := Spindle{ 69 jc: jc, 70 e: e, 71 db: d, 72 l: logger, 73 n: &n, 74 eng: eng, 75 jq: jq, 76 } 77 78 // for each incoming sh.tangled.pipeline, we execute 79 // spindle.processPipeline, which in turn enqueues the pipeline 80 // job in the above registered queue. 81 go func() { 82 logger.Info("starting event consumer") 83 knotEventSource := knotclient.NewEventSource("localhost:5555") 84 85 ccfg := knotclient.NewConsumerConfig() 86 ccfg.Logger = logger 87 ccfg.Dev = cfg.Server.Dev 88 ccfg.ProcessFunc = spindle.processPipeline 89 ccfg.AddEventSource(knotEventSource) 90 91 ec := knotclient.NewEventConsumer(*ccfg) 92 93 ec.Start(ctx) 94 }() 95 96 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 97 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 98 99 return nil 100} 101 102func (s *Spindle) Router() http.Handler { 103 mux := chi.NewRouter() 104 105 mux.HandleFunc("/events", s.Events) 106 mux.HandleFunc("/logs/{pipelineID}", s.Logs) 107 return mux 108} 109 110func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error { 111 if msg.Nsid == tangled.PipelineNSID { 112 pipeline := tangled.Pipeline{} 113 err := json.Unmarshal(msg.EventJson, &pipeline) 114 if err != nil { 115 fmt.Println("error unmarshalling", err) 116 return err 117 } 118 119 ok := s.jq.Enqueue(queue.Job{ 120 Run: func() error { 121 // this is a "fake" at uri for now 122 pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey) 123 124 rkey := TID() 125 126 err = s.db.CreatePipeline(rkey, pipelineAtUri, s.n) 127 if err != nil { 128 return err 129 } 130 131 return s.eng.StartWorkflows(ctx, &pipeline, rkey) 132 }, 133 OnFail: func(error) { 134 s.l.Error("pipeline run failed", "error", err) 135 }, 136 }) 137 if ok { 138 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 139 } else { 140 s.l.Error("failed to enqueue pipeline: queue is full") 141 } 142 } 143 144 return nil 145}