forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/eventconsumer"
13 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
14 "tangled.sh/tangled.sh/core/jetstream"
15 "tangled.sh/tangled.sh/core/log"
16 "tangled.sh/tangled.sh/core/notifier"
17 "tangled.sh/tangled.sh/core/rbac"
18 "tangled.sh/tangled.sh/core/spindle/config"
19 "tangled.sh/tangled.sh/core/spindle/db"
20 "tangled.sh/tangled.sh/core/spindle/engine"
21 "tangled.sh/tangled.sh/core/spindle/models"
22 "tangled.sh/tangled.sh/core/spindle/queue"
23)
24
25const (
26 rbacDomain = "thisserver"
27)
28
29type Spindle struct {
30 jc *jetstream.JetstreamClient
31 db *db.DB
32 e *rbac.Enforcer
33 l *slog.Logger
34 n *notifier.Notifier
35 eng *engine.Engine
36 jq *queue.Queue
37 cfg *config.Config
38 ks *eventconsumer.Consumer
39}
40
41func Run(ctx context.Context) error {
42 logger := log.FromContext(ctx)
43
44 cfg, err := config.Load(ctx)
45 if err != nil {
46 return fmt.Errorf("failed to load config: %w", err)
47 }
48
49 d, err := db.Make(cfg.Server.DBPath)
50 if err != nil {
51 return fmt.Errorf("failed to setup db: %w", err)
52 }
53
54 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
55 if err != nil {
56 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
57 }
58 e.E.EnableAutoSave(true)
59
60 n := notifier.New()
61
62 eng, err := engine.New(ctx, cfg, d, &n)
63 if err != nil {
64 return err
65 }
66
67 jq := queue.NewQueue(100, 2)
68
69 collections := []string{
70 tangled.SpindleMemberNSID,
71 tangled.RepoNSID,
72 }
73 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
74 if err != nil {
75 return fmt.Errorf("failed to setup jetstream client: %w", err)
76 }
77 jc.AddDid(cfg.Server.Owner)
78
79 spindle := Spindle{
80 jc: jc,
81 e: e,
82 db: d,
83 l: logger,
84 n: &n,
85 eng: eng,
86 jq: jq,
87 cfg: cfg,
88 }
89
90 err = e.AddKnot(rbacDomain)
91 if err != nil {
92 return fmt.Errorf("failed to set rbac domain: %w", err)
93 }
94 err = spindle.configureOwner()
95 if err != nil {
96 return err
97 }
98 logger.Info("owner set", "did", cfg.Server.Owner)
99
100 // starts a job queue runner in the background
101 jq.Start()
102 defer jq.Stop()
103
104 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
105 if err != nil {
106 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
107 }
108
109 err = jc.StartJetstream(ctx, spindle.ingest())
110 if err != nil {
111 return fmt.Errorf("failed to start jetstream consumer: %w", err)
112 }
113
114 // for each incoming sh.tangled.pipeline, we execute
115 // spindle.processPipeline, which in turn enqueues the pipeline
116 // job in the above registered queue.
117 ccfg := eventconsumer.NewConsumerConfig()
118 ccfg.Logger = logger
119 ccfg.Dev = cfg.Server.Dev
120 ccfg.ProcessFunc = spindle.processPipeline
121 ccfg.CursorStore = cursorStore
122 knownKnots, err := d.Knots()
123 if err != nil {
124 return err
125 }
126 for _, knot := range knownKnots {
127 logger.Info("adding source start", "knot", knot)
128 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
129 }
130 spindle.ks = eventconsumer.NewConsumer(*ccfg)
131
132 go func() {
133 logger.Info("starting knot event consumer")
134 spindle.ks.Start(ctx)
135 }()
136
137 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
138 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
139
140 return nil
141}
142
143func (s *Spindle) Router() http.Handler {
144 mux := chi.NewRouter()
145
146 mux.HandleFunc("/events", s.Events)
147 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
148 w.Write([]byte(s.cfg.Server.Owner))
149 })
150 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
151 mux.HandleFunc("/logs/{knot}/{rkey}/{name}/{idx}", s.StepLogs)
152 return mux
153}
154
155func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
156 if msg.Nsid == tangled.PipelineNSID {
157 tpl := tangled.Pipeline{}
158 err := json.Unmarshal(msg.EventJson, &tpl)
159 if err != nil {
160 fmt.Println("error unmarshalling", err)
161 return err
162 }
163
164 if tpl.TriggerMetadata == nil {
165 return fmt.Errorf("no trigger metadata found")
166 }
167
168 if tpl.TriggerMetadata.Repo == nil {
169 return fmt.Errorf("no repo data found")
170 }
171
172 // filter by repos
173 _, err = s.db.GetRepo(
174 tpl.TriggerMetadata.Repo.Knot,
175 tpl.TriggerMetadata.Repo.Did,
176 tpl.TriggerMetadata.Repo.Repo,
177 )
178 if err != nil {
179 return err
180 }
181
182 pipelineId := models.PipelineId{
183 Knot: src.Key(),
184 Rkey: msg.Rkey,
185 }
186
187 for _, w := range tpl.Workflows {
188 if w != nil {
189 err := s.db.StatusPending(models.WorkflowId{
190 PipelineId: pipelineId,
191 Name: w.Name,
192 }, s.n)
193 if err != nil {
194 return err
195 }
196 }
197 }
198
199 spl := models.ToPipeline(tpl, *s.cfg)
200
201 ok := s.jq.Enqueue(queue.Job{
202 Run: func() error {
203 s.eng.StartWorkflows(ctx, spl, pipelineId)
204 return nil
205 },
206 OnFail: func(jobError error) {
207 s.l.Error("pipeline run failed", "error", jobError)
208 },
209 })
210 if ok {
211 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
212 } else {
213 s.l.Error("failed to enqueue pipeline: queue is full")
214 }
215 }
216
217 return nil
218}
219
220func (s *Spindle) configureOwner() error {
221 cfgOwner := s.cfg.Server.Owner
222 serverOwner, err := s.e.GetUserByRole("server:owner", rbacDomain)
223 if err != nil {
224 return fmt.Errorf("failed to fetch server:owner: %w", err)
225 }
226
227 if len(serverOwner) == 0 {
228 s.e.AddKnotOwner(rbacDomain, cfgOwner)
229 } else {
230 if serverOwner[0] != cfgOwner {
231 return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0])
232 }
233 }
234 return nil
235}