1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/eventconsumer"
13 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
14 "tangled.sh/tangled.sh/core/jetstream"
15 "tangled.sh/tangled.sh/core/log"
16 "tangled.sh/tangled.sh/core/notifier"
17 "tangled.sh/tangled.sh/core/rbac"
18 "tangled.sh/tangled.sh/core/spindle/config"
19 "tangled.sh/tangled.sh/core/spindle/db"
20 "tangled.sh/tangled.sh/core/spindle/engine"
21 "tangled.sh/tangled.sh/core/spindle/models"
22 "tangled.sh/tangled.sh/core/spindle/queue"
23)
24
25const (
26 rbacDomain = "thisserver"
27)
28
29type Spindle struct {
30 jc *jetstream.JetstreamClient
31 db *db.DB
32 e *rbac.Enforcer
33 l *slog.Logger
34 n *notifier.Notifier
35 eng *engine.Engine
36 jq *queue.Queue
37 cfg *config.Config
38 ks *eventconsumer.Consumer
39}
40
41func Run(ctx context.Context) error {
42 logger := log.FromContext(ctx)
43
44 cfg, err := config.Load(ctx)
45 if err != nil {
46 return fmt.Errorf("failed to load config: %w", err)
47 }
48
49 d, err := db.Make(cfg.Server.DBPath)
50 if err != nil {
51 return fmt.Errorf("failed to setup db: %w", err)
52 }
53
54 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
55 if err != nil {
56 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
57 }
58 e.E.EnableAutoSave(true)
59
60 n := notifier.New()
61
62 eng, err := engine.New(ctx, cfg, d, &n)
63 if err != nil {
64 return err
65 }
66
67 jq := queue.NewQueue(100, 2)
68
69 collections := []string{
70 tangled.SpindleMemberNSID,
71 tangled.RepoNSID,
72 }
73 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
74 if err != nil {
75 return fmt.Errorf("failed to setup jetstream client: %w", err)
76 }
77 jc.AddDid(cfg.Server.Owner)
78
79 spindle := Spindle{
80 jc: jc,
81 e: e,
82 db: d,
83 l: logger,
84 n: &n,
85 eng: eng,
86 jq: jq,
87 cfg: cfg,
88 }
89
90 err = e.AddSpindle(rbacDomain)
91 if err != nil {
92 return fmt.Errorf("failed to set rbac domain: %w", err)
93 }
94 err = spindle.configureOwner()
95 if err != nil {
96 return err
97 }
98 logger.Info("owner set", "did", cfg.Server.Owner)
99
100 // starts a job queue runner in the background
101 jq.Start()
102 defer jq.Stop()
103
104 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
105 if err != nil {
106 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
107 }
108
109 err = jc.StartJetstream(ctx, spindle.ingest())
110 if err != nil {
111 return fmt.Errorf("failed to start jetstream consumer: %w", err)
112 }
113
114 // for each incoming sh.tangled.pipeline, we execute
115 // spindle.processPipeline, which in turn enqueues the pipeline
116 // job in the above registered queue.
117 ccfg := eventconsumer.NewConsumerConfig()
118 ccfg.Logger = logger
119 ccfg.Dev = cfg.Server.Dev
120 ccfg.ProcessFunc = spindle.processPipeline
121 ccfg.CursorStore = cursorStore
122 knownKnots, err := d.Knots()
123 if err != nil {
124 return err
125 }
126 for _, knot := range knownKnots {
127 logger.Info("adding source start", "knot", knot)
128 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
129 }
130 spindle.ks = eventconsumer.NewConsumer(*ccfg)
131
132 go func() {
133 logger.Info("starting knot event consumer")
134 spindle.ks.Start(ctx)
135 }()
136
137 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
138 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
139
140 return nil
141}
142
143func (s *Spindle) Router() http.Handler {
144 mux := chi.NewRouter()
145
146 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
147 w.Write([]byte(
148 ` ****
149 *** ***
150 *** ** ****** **
151 ** * *****
152 * ** **
153 * * * ***************
154 ** ** *# **
155 * ** ** *** **
156 * * ** ** * ******
157 * ** ** * ** * *
158 ** ** *** ** ** *
159 ** ** * ** * *
160 ** **** ** * *
161 ** *** ** ** **
162 *** ** *****
163 ********************
164 **
165 *
166 #**************
167 **
168 ********
169
170This is a spindle server. More info at https://tangled.sh/@tangled.sh/core/tree/master/docs/spindle`))
171 })
172 mux.HandleFunc("/events", s.Events)
173 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
174 w.Write([]byte(s.cfg.Server.Owner))
175 })
176 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
177 return mux
178}
179
180func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
181 if msg.Nsid == tangled.PipelineNSID {
182 tpl := tangled.Pipeline{}
183 err := json.Unmarshal(msg.EventJson, &tpl)
184 if err != nil {
185 fmt.Println("error unmarshalling", err)
186 return err
187 }
188
189 if tpl.TriggerMetadata == nil {
190 return fmt.Errorf("no trigger metadata found")
191 }
192
193 if tpl.TriggerMetadata.Repo == nil {
194 return fmt.Errorf("no repo data found")
195 }
196
197 // filter by repos
198 _, err = s.db.GetRepo(
199 tpl.TriggerMetadata.Repo.Knot,
200 tpl.TriggerMetadata.Repo.Did,
201 tpl.TriggerMetadata.Repo.Repo,
202 )
203 if err != nil {
204 return err
205 }
206
207 pipelineId := models.PipelineId{
208 Knot: src.Key(),
209 Rkey: msg.Rkey,
210 }
211
212 for _, w := range tpl.Workflows {
213 if w != nil {
214 err := s.db.StatusPending(models.WorkflowId{
215 PipelineId: pipelineId,
216 Name: w.Name,
217 }, s.n)
218 if err != nil {
219 return err
220 }
221 }
222 }
223
224 spl := models.ToPipeline(tpl, *s.cfg)
225
226 ok := s.jq.Enqueue(queue.Job{
227 Run: func() error {
228 s.eng.StartWorkflows(ctx, spl, pipelineId)
229 return nil
230 },
231 OnFail: func(jobError error) {
232 s.l.Error("pipeline run failed", "error", jobError)
233 },
234 })
235 if ok {
236 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
237 } else {
238 s.l.Error("failed to enqueue pipeline: queue is full")
239 }
240 }
241
242 return nil
243}
244
245func (s *Spindle) configureOwner() error {
246 cfgOwner := s.cfg.Server.Owner
247
248 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
249 if err != nil {
250 return err
251 }
252
253 switch len(existing) {
254 case 0:
255 // no owner configured, continue
256 case 1:
257 // find existing owner
258 existingOwner := existing[0]
259
260 // no ownership change, this is okay
261 if existingOwner == s.cfg.Server.Owner {
262 break
263 }
264
265 // remove existing owner
266 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
267 if err != nil {
268 return nil
269 }
270 default:
271 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
272 }
273
274 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
275}