1package spindle
2
3import (
4 "encoding/json"
5 "fmt"
6 "log/slog"
7 "net/http"
8
9 "github.com/go-chi/chi/v5"
10 "golang.org/x/net/context"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/jetstream"
13 "tangled.sh/tangled.sh/core/knotclient"
14 "tangled.sh/tangled.sh/core/knotserver/notifier"
15 "tangled.sh/tangled.sh/core/log"
16 "tangled.sh/tangled.sh/core/rbac"
17 "tangled.sh/tangled.sh/core/spindle/config"
18 "tangled.sh/tangled.sh/core/spindle/db"
19 "tangled.sh/tangled.sh/core/spindle/engine"
20)
21
22type Spindle struct {
23 jc *jetstream.JetstreamClient
24 db *db.DB
25 e *rbac.Enforcer
26 l *slog.Logger
27 n *notifier.Notifier
28 eng *engine.Engine
29}
30
31func Run(ctx context.Context) error {
32 cfg, err := config.Load(ctx)
33 if err != nil {
34 return fmt.Errorf("failed to load config: %w", err)
35 }
36
37 d, err := db.Make(cfg.Server.DBPath)
38 if err != nil {
39 return fmt.Errorf("failed to setup db: %w", err)
40 }
41
42 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
43 if err != nil {
44 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
45 }
46
47 logger := log.FromContext(ctx)
48
49 collections := []string{tangled.SpindleMemberNSID}
50 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false)
51 if err != nil {
52 return fmt.Errorf("failed to setup jetstream client: %w", err)
53 }
54
55 n := notifier.New()
56 eng, err := engine.New(ctx, d, &n)
57 if err != nil {
58 return err
59 }
60
61 spindle := Spindle{
62 jc: jc,
63 e: e,
64 db: d,
65 l: logger,
66 n: &n,
67 eng: eng,
68 }
69
70 go func() {
71 logger.Info("starting event consumer")
72 knotEventSource := knotclient.NewEventSource("localhost:5555")
73
74 ccfg := knotclient.NewConsumerConfig()
75 ccfg.Logger = logger
76 ccfg.Dev = cfg.Server.Dev
77 ccfg.ProcessFunc = spindle.exec
78 ccfg.AddEventSource(knotEventSource)
79
80 ec := knotclient.NewEventConsumer(*ccfg)
81
82 ec.Start(ctx)
83 }()
84
85 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
86 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
87
88 return nil
89}
90
91func (s *Spindle) Router() http.Handler {
92 mux := chi.NewRouter()
93
94 mux.HandleFunc("/events", s.Events)
95 mux.HandleFunc("/logs/{pipelineID}", s.Logs)
96 return mux
97}
98
99func (s *Spindle) exec(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
100 if msg.Nsid == tangled.PipelineNSID {
101 pipeline := tangled.Pipeline{}
102 err := json.Unmarshal(msg.EventJson, &pipeline)
103 if err != nil {
104 fmt.Println("error unmarshalling", err)
105 return err
106 }
107
108 // this is a "fake" at uri for now
109 pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey)
110
111 rkey := TID()
112 err = s.eng.SetupPipeline(ctx, &pipeline, pipelineAtUri, rkey)
113 if err != nil {
114 return err
115 }
116 err = s.eng.StartWorkflows(ctx, &pipeline, rkey)
117 if err != nil {
118 return err
119 }
120 }
121
122 return nil
123}