1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/eventconsumer"
13 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
14 "tangled.sh/tangled.sh/core/jetstream"
15 "tangled.sh/tangled.sh/core/log"
16 "tangled.sh/tangled.sh/core/notifier"
17 "tangled.sh/tangled.sh/core/rbac"
18 "tangled.sh/tangled.sh/core/spindle/config"
19 "tangled.sh/tangled.sh/core/spindle/db"
20 "tangled.sh/tangled.sh/core/spindle/engine"
21 "tangled.sh/tangled.sh/core/spindle/models"
22 "tangled.sh/tangled.sh/core/spindle/queue"
23)
24
25const (
26 rbacDomain = "thisserver"
27)
28
29type Spindle struct {
30 jc *jetstream.JetstreamClient
31 db *db.DB
32 e *rbac.Enforcer
33 l *slog.Logger
34 n *notifier.Notifier
35 eng *engine.Engine
36 jq *queue.Queue
37 cfg *config.Config
38 ks *eventconsumer.Consumer
39}
40
41func Run(ctx context.Context) error {
42 logger := log.FromContext(ctx)
43
44 cfg, err := config.Load(ctx)
45 if err != nil {
46 return fmt.Errorf("failed to load config: %w", err)
47 }
48
49 d, err := db.Make(cfg.Server.DBPath)
50 if err != nil {
51 return fmt.Errorf("failed to setup db: %w", err)
52 }
53
54 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
55 if err != nil {
56 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
57 }
58 e.E.EnableAutoSave(true)
59
60 n := notifier.New()
61
62 eng, err := engine.New(ctx, cfg, d, &n)
63 if err != nil {
64 return err
65 }
66
67 jq := queue.NewQueue(100, 2)
68
69 collections := []string{
70 tangled.SpindleMemberNSID,
71 tangled.RepoNSID,
72 }
73 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
74 if err != nil {
75 return fmt.Errorf("failed to setup jetstream client: %w", err)
76 }
77 jc.AddDid(cfg.Server.Owner)
78
79 spindle := Spindle{
80 jc: jc,
81 e: e,
82 db: d,
83 l: logger,
84 n: &n,
85 eng: eng,
86 jq: jq,
87 cfg: cfg,
88 }
89
90 err = e.AddKnot(rbacDomain)
91 if err != nil {
92 return fmt.Errorf("failed to set rbac domain: %w", err)
93 }
94 err = spindle.configureOwner()
95 if err != nil {
96 return err
97 }
98 logger.Info("owner set", "did", cfg.Server.Owner)
99
100 // starts a job queue runner in the background
101 jq.Start()
102 defer jq.Stop()
103
104 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
105 if err != nil {
106 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
107 }
108
109 err = jc.StartJetstream(ctx, spindle.ingest())
110 if err != nil {
111 return fmt.Errorf("failed to start jetstream consumer: %w", err)
112 }
113
114 // for each incoming sh.tangled.pipeline, we execute
115 // spindle.processPipeline, which in turn enqueues the pipeline
116 // job in the above registered queue.
117 ccfg := eventconsumer.NewConsumerConfig()
118 ccfg.Logger = logger
119 ccfg.Dev = cfg.Server.Dev
120 ccfg.ProcessFunc = spindle.processPipeline
121 ccfg.CursorStore = cursorStore
122 knownKnots, err := d.Knots()
123 if err != nil {
124 return err
125 }
126 for _, knot := range knownKnots {
127 logger.Info("adding source start", "knot", knot)
128 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
129 }
130 spindle.ks = eventconsumer.NewConsumer(*ccfg)
131
132 go func() {
133 logger.Info("starting knot event consumer")
134 spindle.ks.Start(ctx)
135 }()
136
137 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
138 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
139
140 return nil
141}
142
143func (s *Spindle) Router() http.Handler {
144 mux := chi.NewRouter()
145
146 mux.HandleFunc("/events", s.Events)
147 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
148 w.Write([]byte(s.cfg.Server.Owner))
149 })
150 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
151 return mux
152}
153
154func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
155 if msg.Nsid == tangled.PipelineNSID {
156 tpl := tangled.Pipeline{}
157 err := json.Unmarshal(msg.EventJson, &tpl)
158 if err != nil {
159 fmt.Println("error unmarshalling", err)
160 return err
161 }
162
163 if tpl.TriggerMetadata == nil {
164 return fmt.Errorf("no trigger metadata found")
165 }
166
167 if tpl.TriggerMetadata.Repo == nil {
168 return fmt.Errorf("no repo data found")
169 }
170
171 // filter by repos
172 _, err = s.db.GetRepo(
173 tpl.TriggerMetadata.Repo.Knot,
174 tpl.TriggerMetadata.Repo.Did,
175 tpl.TriggerMetadata.Repo.Repo,
176 )
177 if err != nil {
178 return err
179 }
180
181 pipelineId := models.PipelineId{
182 Knot: src.Key(),
183 Rkey: msg.Rkey,
184 }
185
186 for _, w := range tpl.Workflows {
187 if w != nil {
188 err := s.db.StatusPending(models.WorkflowId{
189 PipelineId: pipelineId,
190 Name: w.Name,
191 }, s.n)
192 if err != nil {
193 return err
194 }
195 }
196 }
197
198 spl := models.ToPipeline(tpl, *s.cfg)
199
200 ok := s.jq.Enqueue(queue.Job{
201 Run: func() error {
202 s.eng.StartWorkflows(ctx, spl, pipelineId)
203 return nil
204 },
205 OnFail: func(jobError error) {
206 s.l.Error("pipeline run failed", "error", jobError)
207 },
208 })
209 if ok {
210 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
211 } else {
212 s.l.Error("failed to enqueue pipeline: queue is full")
213 }
214 }
215
216 return nil
217}
218
219func (s *Spindle) configureOwner() error {
220 cfgOwner := s.cfg.Server.Owner
221 serverOwner, err := s.e.GetUserByRole("server:owner", rbacDomain)
222 if err != nil {
223 return fmt.Errorf("failed to fetch server:owner: %w", err)
224 }
225
226 if len(serverOwner) == 0 {
227 s.e.AddKnotOwner(rbacDomain, cfgOwner)
228 } else {
229 if serverOwner[0] != cfgOwner {
230 return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0])
231 }
232 }
233 return nil
234}