forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/jetstream"
13 "tangled.sh/tangled.sh/core/knotclient"
14 "tangled.sh/tangled.sh/core/knotclient/cursor"
15 "tangled.sh/tangled.sh/core/log"
16 "tangled.sh/tangled.sh/core/notifier"
17 "tangled.sh/tangled.sh/core/rbac"
18 "tangled.sh/tangled.sh/core/spindle/config"
19 "tangled.sh/tangled.sh/core/spindle/db"
20 "tangled.sh/tangled.sh/core/spindle/engine"
21 "tangled.sh/tangled.sh/core/spindle/models"
22 "tangled.sh/tangled.sh/core/spindle/queue"
23)
24
25type Spindle struct {
26 jc *jetstream.JetstreamClient
27 db *db.DB
28 e *rbac.Enforcer
29 l *slog.Logger
30 n *notifier.Notifier
31 eng *engine.Engine
32 jq *queue.Queue
33}
34
35func Run(ctx context.Context) error {
36 cfg, err := config.Load(ctx)
37 if err != nil {
38 return fmt.Errorf("failed to load config: %w", err)
39 }
40
41 d, err := db.Make(cfg.Server.DBPath)
42 if err != nil {
43 return fmt.Errorf("failed to setup db: %w", err)
44 }
45
46 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
47 if err != nil {
48 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
49 }
50
51 logger := log.FromContext(ctx)
52
53 collections := []string{tangled.SpindleMemberNSID}
54 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false)
55 if err != nil {
56 return fmt.Errorf("failed to setup jetstream client: %w", err)
57 }
58
59 n := notifier.New()
60 eng, err := engine.New(ctx, d, &n)
61 if err != nil {
62 return err
63 }
64
65 jq := queue.NewQueue(100, 2)
66
67 // starts a job queue runner in the background
68 jq.Start()
69 defer jq.Stop()
70
71 spindle := Spindle{
72 jc: jc,
73 e: e,
74 db: d,
75 l: logger,
76 n: &n,
77 eng: eng,
78 jq: jq,
79 }
80
81 // for each incoming sh.tangled.pipeline, we execute
82 // spindle.processPipeline, which in turn enqueues the pipeline
83 // job in the above registered queue.
84 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
85 if err != nil {
86 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
87 }
88 go func() {
89 logger.Info("starting event consumer")
90 knotEventSource := knotclient.NewEventSource("localhost:6000")
91
92 ccfg := knotclient.NewConsumerConfig()
93 ccfg.Logger = logger
94 ccfg.Dev = cfg.Server.Dev
95 ccfg.ProcessFunc = spindle.processPipeline
96 ccfg.CursorStore = cursorStore
97 ccfg.AddEventSource(knotEventSource)
98
99 ec := knotclient.NewEventConsumer(*ccfg)
100
101 ec.Start(ctx)
102 }()
103
104 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
105 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
106
107 return nil
108}
109
110func (s *Spindle) Router() http.Handler {
111 mux := chi.NewRouter()
112
113 mux.HandleFunc("/events", s.Events)
114 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
115 return mux
116}
117
118func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
119 if msg.Nsid == tangled.PipelineNSID {
120 pipeline := tangled.Pipeline{}
121 err := json.Unmarshal(msg.EventJson, &pipeline)
122 if err != nil {
123 fmt.Println("error unmarshalling", err)
124 return err
125 }
126
127 pipelineId := models.PipelineId{
128 Knot: src.Knot,
129 Rkey: msg.Rkey,
130 }
131
132 for _, w := range pipeline.Workflows {
133 if w != nil {
134 err := s.db.StatusPending(models.WorkflowId{
135 PipelineId: pipelineId,
136 Name: w.Name,
137 }, s.n)
138 if err != nil {
139 return err
140 }
141 }
142 }
143
144 ok := s.jq.Enqueue(queue.Job{
145 Run: func() error {
146 s.eng.StartWorkflows(ctx, &pipeline, pipelineId)
147 return nil
148 },
149 OnFail: func(jobError error) {
150 s.l.Error("pipeline run failed", "error", jobError)
151 },
152 })
153 if ok {
154 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
155 } else {
156 s.l.Error("failed to enqueue pipeline: queue is full")
157 }
158 }
159
160 return nil
161}