1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/jetstream"
13 "tangled.sh/tangled.sh/core/knotclient"
14 "tangled.sh/tangled.sh/core/log"
15 "tangled.sh/tangled.sh/core/notifier"
16 "tangled.sh/tangled.sh/core/rbac"
17 "tangled.sh/tangled.sh/core/spindle/config"
18 "tangled.sh/tangled.sh/core/spindle/db"
19 "tangled.sh/tangled.sh/core/spindle/engine"
20 "tangled.sh/tangled.sh/core/spindle/queue"
21)
22
23type Spindle struct {
24 jc *jetstream.JetstreamClient
25 db *db.DB
26 e *rbac.Enforcer
27 l *slog.Logger
28 n *notifier.Notifier
29 eng *engine.Engine
30 jq *queue.Queue
31}
32
33func Run(ctx context.Context) error {
34 cfg, err := config.Load(ctx)
35 if err != nil {
36 return fmt.Errorf("failed to load config: %w", err)
37 }
38
39 d, err := db.Make(cfg.Server.DBPath)
40 if err != nil {
41 return fmt.Errorf("failed to setup db: %w", err)
42 }
43
44 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
45 if err != nil {
46 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
47 }
48
49 logger := log.FromContext(ctx)
50
51 collections := []string{tangled.SpindleMemberNSID}
52 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false)
53 if err != nil {
54 return fmt.Errorf("failed to setup jetstream client: %w", err)
55 }
56
57 n := notifier.New()
58 eng, err := engine.New(ctx, d, &n)
59 if err != nil {
60 return err
61 }
62
63 jq := queue.NewQueue(100)
64
65 // starts a job queue runner in the background
66 jq.StartRunner()
67
68 spindle := Spindle{
69 jc: jc,
70 e: e,
71 db: d,
72 l: logger,
73 n: &n,
74 eng: eng,
75 jq: jq,
76 }
77
78 // for each incoming sh.tangled.pipeline, we execute
79 // spindle.processPipeline, which in turn enqueues the pipeline
80 // job in the above registered queue.
81 go func() {
82 logger.Info("starting event consumer")
83 knotEventSource := knotclient.NewEventSource("localhost:5555")
84
85 ccfg := knotclient.NewConsumerConfig()
86 ccfg.Logger = logger
87 ccfg.Dev = cfg.Server.Dev
88 ccfg.ProcessFunc = spindle.processPipeline
89 ccfg.AddEventSource(knotEventSource)
90
91 ec := knotclient.NewEventConsumer(*ccfg)
92
93 ec.Start(ctx)
94 }()
95
96 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
97 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
98
99 return nil
100}
101
102func (s *Spindle) Router() http.Handler {
103 mux := chi.NewRouter()
104
105 mux.HandleFunc("/events", s.Events)
106 mux.HandleFunc("/logs/{pipelineID}", s.Logs)
107 return mux
108}
109
110func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
111 if msg.Nsid == tangled.PipelineNSID {
112 pipeline := tangled.Pipeline{}
113 err := json.Unmarshal(msg.EventJson, &pipeline)
114 if err != nil {
115 fmt.Println("error unmarshalling", err)
116 return err
117 }
118
119 ok := s.jq.Enqueue(queue.Job{
120 Run: func() error {
121 // this is a "fake" at uri for now
122 pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey)
123
124 rkey := TID()
125
126 err = s.db.CreatePipeline(rkey, pipelineAtUri, s.n)
127 if err != nil {
128 return err
129 }
130
131 return s.eng.StartWorkflows(ctx, &pipeline, rkey)
132 },
133 OnFail: func(error) {
134 s.l.Error("pipeline run failed", "error", err)
135 },
136 })
137 if ok {
138 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
139 } else {
140 s.l.Error("failed to enqueue pipeline: queue is full")
141 }
142 }
143
144 return nil
145}