1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/eventconsumer"
14 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
15 "tangled.sh/tangled.sh/core/idresolver"
16 "tangled.sh/tangled.sh/core/jetstream"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/notifier"
19 "tangled.sh/tangled.sh/core/rbac"
20 "tangled.sh/tangled.sh/core/spindle/config"
21 "tangled.sh/tangled.sh/core/spindle/db"
22 "tangled.sh/tangled.sh/core/spindle/engine"
23 "tangled.sh/tangled.sh/core/spindle/models"
24 "tangled.sh/tangled.sh/core/spindle/queue"
25 "tangled.sh/tangled.sh/core/spindle/secrets"
26 "tangled.sh/tangled.sh/core/spindle/xrpc"
27)
28
29//go:embed motd
30var motd []byte
31
32const (
33 rbacDomain = "thisserver"
34)
35
36type Spindle struct {
37 jc *jetstream.JetstreamClient
38 db *db.DB
39 e *rbac.Enforcer
40 l *slog.Logger
41 n *notifier.Notifier
42 eng *engine.Engine
43 jq *queue.Queue
44 cfg *config.Config
45 ks *eventconsumer.Consumer
46 res *idresolver.Resolver
47 vault secrets.Manager
48}
49
50func Run(ctx context.Context) error {
51 logger := log.FromContext(ctx)
52
53 cfg, err := config.Load(ctx)
54 if err != nil {
55 return fmt.Errorf("failed to load config: %w", err)
56 }
57
58 d, err := db.Make(cfg.Server.DBPath)
59 if err != nil {
60 return fmt.Errorf("failed to setup db: %w", err)
61 }
62
63 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
64 if err != nil {
65 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
66 }
67 e.E.EnableAutoSave(true)
68
69 n := notifier.New()
70
71 // TODO: add hashicorp vault provider and choose here
72 vault, err := secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
73 if err != nil {
74 return fmt.Errorf("failed to setup secrets provider: %w", err)
75 }
76
77 eng, err := engine.New(ctx, cfg, d, &n, vault)
78 if err != nil {
79 return err
80 }
81
82 jq := queue.NewQueue(100, 2)
83
84 collections := []string{
85 tangled.SpindleMemberNSID,
86 tangled.RepoNSID,
87 }
88 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
89 if err != nil {
90 return fmt.Errorf("failed to setup jetstream client: %w", err)
91 }
92 jc.AddDid(cfg.Server.Owner)
93
94 resolver := idresolver.DefaultResolver()
95
96 spindle := Spindle{
97 jc: jc,
98 e: e,
99 db: d,
100 l: logger,
101 n: &n,
102 eng: eng,
103 jq: jq,
104 cfg: cfg,
105 res: resolver,
106 vault: vault,
107 }
108
109 err = e.AddSpindle(rbacDomain)
110 if err != nil {
111 return fmt.Errorf("failed to set rbac domain: %w", err)
112 }
113 err = spindle.configureOwner()
114 if err != nil {
115 return err
116 }
117 logger.Info("owner set", "did", cfg.Server.Owner)
118
119 // starts a job queue runner in the background
120 jq.Start()
121 defer jq.Stop()
122
123 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
124 if err != nil {
125 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
126 }
127
128 err = jc.StartJetstream(ctx, spindle.ingest())
129 if err != nil {
130 return fmt.Errorf("failed to start jetstream consumer: %w", err)
131 }
132
133 // for each incoming sh.tangled.pipeline, we execute
134 // spindle.processPipeline, which in turn enqueues the pipeline
135 // job in the above registered queue.
136 ccfg := eventconsumer.NewConsumerConfig()
137 ccfg.Logger = logger
138 ccfg.Dev = cfg.Server.Dev
139 ccfg.ProcessFunc = spindle.processPipeline
140 ccfg.CursorStore = cursorStore
141 knownKnots, err := d.Knots()
142 if err != nil {
143 return err
144 }
145 for _, knot := range knownKnots {
146 logger.Info("adding source start", "knot", knot)
147 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
148 }
149 spindle.ks = eventconsumer.NewConsumer(*ccfg)
150
151 go func() {
152 logger.Info("starting knot event consumer")
153 spindle.ks.Start(ctx)
154 }()
155
156 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
157 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
158
159 return nil
160}
161
162func (s *Spindle) Router() http.Handler {
163 mux := chi.NewRouter()
164
165 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
166 w.Write(motd)
167 })
168 mux.HandleFunc("/events", s.Events)
169 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
170 w.Write([]byte(s.cfg.Server.Owner))
171 })
172 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
173
174 mux.Mount("/xrpc", s.XrpcRouter())
175 return mux
176}
177
178func (s *Spindle) XrpcRouter() http.Handler {
179 logger := s.l.With("route", "xrpc")
180
181 x := xrpc.Xrpc{
182 Logger: logger,
183 Db: s.db,
184 Enforcer: s.e,
185 Engine: s.eng,
186 Config: s.cfg,
187 Resolver: s.res,
188 Vault: s.vault,
189 }
190
191 return x.Router()
192}
193
194func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
195 if msg.Nsid == tangled.PipelineNSID {
196 tpl := tangled.Pipeline{}
197 err := json.Unmarshal(msg.EventJson, &tpl)
198 if err != nil {
199 fmt.Println("error unmarshalling", err)
200 return err
201 }
202
203 if tpl.TriggerMetadata == nil {
204 return fmt.Errorf("no trigger metadata found")
205 }
206
207 if tpl.TriggerMetadata.Repo == nil {
208 return fmt.Errorf("no repo data found")
209 }
210
211 // filter by repos
212 _, err = s.db.GetRepo(
213 tpl.TriggerMetadata.Repo.Knot,
214 tpl.TriggerMetadata.Repo.Did,
215 tpl.TriggerMetadata.Repo.Repo,
216 )
217 if err != nil {
218 return err
219 }
220
221 pipelineId := models.PipelineId{
222 Knot: src.Key(),
223 Rkey: msg.Rkey,
224 }
225
226 for _, w := range tpl.Workflows {
227 if w != nil {
228 err := s.db.StatusPending(models.WorkflowId{
229 PipelineId: pipelineId,
230 Name: w.Name,
231 }, s.n)
232 if err != nil {
233 return err
234 }
235 }
236 }
237
238 spl := models.ToPipeline(tpl, *s.cfg)
239
240 ok := s.jq.Enqueue(queue.Job{
241 Run: func() error {
242 s.eng.StartWorkflows(ctx, spl, pipelineId)
243 return nil
244 },
245 OnFail: func(jobError error) {
246 s.l.Error("pipeline run failed", "error", jobError)
247 },
248 })
249 if ok {
250 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
251 } else {
252 s.l.Error("failed to enqueue pipeline: queue is full")
253 }
254 }
255
256 return nil
257}
258
259func (s *Spindle) configureOwner() error {
260 cfgOwner := s.cfg.Server.Owner
261
262 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
263 if err != nil {
264 return err
265 }
266
267 switch len(existing) {
268 case 0:
269 // no owner configured, continue
270 case 1:
271 // find existing owner
272 existingOwner := existing[0]
273
274 // no ownership change, this is okay
275 if existingOwner == s.cfg.Server.Owner {
276 break
277 }
278
279 // remove existing owner
280 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
281 if err != nil {
282 return nil
283 }
284 default:
285 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
286 }
287
288 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
289}