1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/jetstream"
13 "tangled.sh/tangled.sh/core/knotclient"
14 "tangled.sh/tangled.sh/core/knotclient/cursor"
15 "tangled.sh/tangled.sh/core/log"
16 "tangled.sh/tangled.sh/core/notifier"
17 "tangled.sh/tangled.sh/core/rbac"
18 "tangled.sh/tangled.sh/core/spindle/config"
19 "tangled.sh/tangled.sh/core/spindle/db"
20 "tangled.sh/tangled.sh/core/spindle/engine"
21 "tangled.sh/tangled.sh/core/spindle/models"
22 "tangled.sh/tangled.sh/core/spindle/queue"
23)
24
25const (
26 rbacDomain = "thisserver"
27)
28
29type Spindle struct {
30 jc *jetstream.JetstreamClient
31 db *db.DB
32 e *rbac.Enforcer
33 l *slog.Logger
34 n *notifier.Notifier
35 eng *engine.Engine
36 jq *queue.Queue
37 cfg *config.Config
38}
39
40func Run(ctx context.Context) error {
41 logger := log.FromContext(ctx)
42
43 cfg, err := config.Load(ctx)
44 if err != nil {
45 return fmt.Errorf("failed to load config: %w", err)
46 }
47
48 d, err := db.Make(cfg.Server.DBPath)
49 if err != nil {
50 return fmt.Errorf("failed to setup db: %w", err)
51 }
52
53 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
54 if err != nil {
55 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
56 }
57 e.E.EnableAutoSave(true)
58
59 n := notifier.New()
60
61 eng, err := engine.New(ctx, d, &n)
62 if err != nil {
63 return err
64 }
65
66 jq := queue.NewQueue(100, 2)
67
68 collections := []string{tangled.SpindleMemberNSID}
69 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, false, false)
70 if err != nil {
71 return fmt.Errorf("failed to setup jetstream client: %w", err)
72 }
73
74 spindle := Spindle{
75 jc: jc,
76 e: e,
77 db: d,
78 l: logger,
79 n: &n,
80 eng: eng,
81 jq: jq,
82 cfg: cfg,
83 }
84
85 err = e.AddDomain(rbacDomain)
86 if err != nil {
87 return fmt.Errorf("failed to set rbac domain: %w", err)
88 }
89 err = spindle.configureOwner()
90 if err != nil {
91 return err
92 }
93 logger.Info("owner set", "did", cfg.Server.Owner)
94
95 // starts a job queue runner in the background
96 jq.Start()
97 defer jq.Stop()
98
99 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
100 if err != nil {
101 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
102 }
103
104 err = jc.StartJetstream(ctx, spindle.ingest())
105 if err != nil {
106 return fmt.Errorf("failed to start jetstream consumer: %w", err)
107 }
108
109 // for each incoming sh.tangled.pipeline, we execute
110 // spindle.processPipeline, which in turn enqueues the pipeline
111 // job in the above registered queue.
112
113 ccfg := knotclient.NewConsumerConfig()
114 ccfg.Logger = logger
115 ccfg.Dev = cfg.Server.Dev
116 ccfg.ProcessFunc = spindle.processPipeline
117 ccfg.CursorStore = cursorStore
118 for _, knot := range spindle.cfg.Knots {
119 kes := knotclient.NewEventSource(knot)
120 ccfg.AddEventSource(kes)
121 }
122 ec := knotclient.NewEventConsumer(*ccfg)
123
124 go func() {
125 logger.Info("starting knot event consumer", "knots", spindle.cfg.Knots)
126 ec.Start(ctx)
127 }()
128
129 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
130 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
131
132 return nil
133}
134
135func (s *Spindle) Router() http.Handler {
136 mux := chi.NewRouter()
137
138 mux.HandleFunc("/events", s.Events)
139 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
140 return mux
141}
142
143func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
144 if msg.Nsid == tangled.PipelineNSID {
145 pipeline := tangled.Pipeline{}
146 err := json.Unmarshal(msg.EventJson, &pipeline)
147 if err != nil {
148 fmt.Println("error unmarshalling", err)
149 return err
150 }
151
152 pipelineId := models.PipelineId{
153 Knot: src.Knot,
154 Rkey: msg.Rkey,
155 }
156
157 for _, w := range pipeline.Workflows {
158 if w != nil {
159 err := s.db.StatusPending(models.WorkflowId{
160 PipelineId: pipelineId,
161 Name: w.Name,
162 }, s.n)
163 if err != nil {
164 return err
165 }
166 }
167 }
168
169 ok := s.jq.Enqueue(queue.Job{
170 Run: func() error {
171 s.eng.StartWorkflows(ctx, &pipeline, pipelineId)
172 return nil
173 },
174 OnFail: func(jobError error) {
175 s.l.Error("pipeline run failed", "error", jobError)
176 },
177 })
178 if ok {
179 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
180 } else {
181 s.l.Error("failed to enqueue pipeline: queue is full")
182 }
183 }
184
185 return nil
186}
187
188func (s *Spindle) configureOwner() error {
189 cfgOwner := s.cfg.Server.Owner
190 serverOwner, err := s.e.GetUserByRole("server:owner", rbacDomain)
191 if err != nil {
192 return fmt.Errorf("failed to fetch server:owner: %w", err)
193 }
194
195 if len(serverOwner) == 0 {
196 s.e.AddOwner(rbacDomain, cfgOwner)
197 } else {
198 if serverOwner[0] != cfgOwner {
199 return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0])
200 }
201 }
202 return nil
203}