1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/jetstream"
13 "tangled.sh/tangled.sh/core/knotclient"
14 "tangled.sh/tangled.sh/core/knotclient/cursor"
15 "tangled.sh/tangled.sh/core/log"
16 "tangled.sh/tangled.sh/core/notifier"
17 "tangled.sh/tangled.sh/core/rbac"
18 "tangled.sh/tangled.sh/core/spindle/config"
19 "tangled.sh/tangled.sh/core/spindle/db"
20 "tangled.sh/tangled.sh/core/spindle/engine"
21 "tangled.sh/tangled.sh/core/spindle/models"
22 "tangled.sh/tangled.sh/core/spindle/queue"
23)
24
25const (
26 rbacDomain = "thisserver"
27)
28
29type Spindle struct {
30 jc *jetstream.JetstreamClient
31 db *db.DB
32 e *rbac.Enforcer
33 l *slog.Logger
34 n *notifier.Notifier
35 eng *engine.Engine
36 jq *queue.Queue
37 cfg *config.Config
38 ks *knotclient.EventConsumer
39}
40
41func Run(ctx context.Context) error {
42 logger := log.FromContext(ctx)
43
44 cfg, err := config.Load(ctx)
45 if err != nil {
46 return fmt.Errorf("failed to load config: %w", err)
47 }
48
49 d, err := db.Make(cfg.Server.DBPath)
50 if err != nil {
51 return fmt.Errorf("failed to setup db: %w", err)
52 }
53
54 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
55 if err != nil {
56 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
57 }
58 e.E.EnableAutoSave(true)
59
60 n := notifier.New()
61
62 eng, err := engine.New(ctx, d, &n)
63 if err != nil {
64 return err
65 }
66
67 jq := queue.NewQueue(100, 2)
68
69 collections := []string{
70 tangled.SpindleMemberNSID,
71 tangled.RepoNSID,
72 }
73 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, false, false)
74 if err != nil {
75 return fmt.Errorf("failed to setup jetstream client: %w", err)
76 }
77
78 spindle := Spindle{
79 jc: jc,
80 e: e,
81 db: d,
82 l: logger,
83 n: &n,
84 eng: eng,
85 jq: jq,
86 cfg: cfg,
87 }
88
89 err = e.AddKnot(rbacDomain)
90 if err != nil {
91 return fmt.Errorf("failed to set rbac domain: %w", err)
92 }
93 err = spindle.configureOwner()
94 if err != nil {
95 return err
96 }
97 logger.Info("owner set", "did", cfg.Server.Owner)
98
99 // starts a job queue runner in the background
100 jq.Start()
101 defer jq.Stop()
102
103 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
104 if err != nil {
105 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
106 }
107
108 err = jc.StartJetstream(ctx, spindle.ingest())
109 if err != nil {
110 return fmt.Errorf("failed to start jetstream consumer: %w", err)
111 }
112
113 // for each incoming sh.tangled.pipeline, we execute
114 // spindle.processPipeline, which in turn enqueues the pipeline
115 // job in the above registered queue.
116 ccfg := knotclient.NewConsumerConfig()
117 ccfg.Logger = logger
118 ccfg.Dev = cfg.Server.Dev
119 ccfg.ProcessFunc = spindle.processPipeline
120 ccfg.CursorStore = cursorStore
121 knotstream := knotclient.NewEventConsumer(*ccfg)
122 knownKnots, err := d.Knots()
123 if err != nil {
124 return err
125 }
126 for _, knot := range knownKnots {
127 knotstream.AddSource(ctx, knotclient.NewEventSource(knot))
128 }
129 spindle.ks = knotstream
130
131 go func() {
132 logger.Info("starting knot event consumer", "knots")
133 knotstream.Start(ctx)
134 }()
135
136 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
137 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
138
139 return nil
140}
141
142func (s *Spindle) Router() http.Handler {
143 mux := chi.NewRouter()
144
145 mux.HandleFunc("/events", s.Events)
146 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
147 w.Write([]byte(s.cfg.Server.Owner))
148 })
149 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
150 return mux
151}
152
153func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
154 if msg.Nsid == tangled.PipelineNSID {
155 pipeline := tangled.Pipeline{}
156 err := json.Unmarshal(msg.EventJson, &pipeline)
157 if err != nil {
158 fmt.Println("error unmarshalling", err)
159 return err
160 }
161
162 pipelineId := models.PipelineId{
163 Knot: src.Knot,
164 Rkey: msg.Rkey,
165 }
166
167 for _, w := range pipeline.Workflows {
168 if w != nil {
169 err := s.db.StatusPending(models.WorkflowId{
170 PipelineId: pipelineId,
171 Name: w.Name,
172 }, s.n)
173 if err != nil {
174 return err
175 }
176 }
177 }
178
179 ok := s.jq.Enqueue(queue.Job{
180 Run: func() error {
181 s.eng.StartWorkflows(ctx, &pipeline, pipelineId)
182 return nil
183 },
184 OnFail: func(jobError error) {
185 s.l.Error("pipeline run failed", "error", jobError)
186 },
187 })
188 if ok {
189 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
190 } else {
191 s.l.Error("failed to enqueue pipeline: queue is full")
192 }
193 }
194
195 return nil
196}
197
198func (s *Spindle) configureOwner() error {
199 cfgOwner := s.cfg.Server.Owner
200 serverOwner, err := s.e.GetUserByRole("server:owner", rbacDomain)
201 if err != nil {
202 return fmt.Errorf("failed to fetch server:owner: %w", err)
203 }
204
205 if len(serverOwner) == 0 {
206 s.e.AddKnotOwner(rbacDomain, cfgOwner)
207 } else {
208 if serverOwner[0] != cfgOwner {
209 return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0])
210 }
211 }
212 return nil
213}