1package spindle
2
3import (
4 "encoding/json"
5 "fmt"
6 "log/slog"
7 "net/http"
8
9 "golang.org/x/net/context"
10 "tangled.sh/tangled.sh/core/api/tangled"
11 "tangled.sh/tangled.sh/core/jetstream"
12 "tangled.sh/tangled.sh/core/knotclient"
13 "tangled.sh/tangled.sh/core/knotserver/notifier"
14 "tangled.sh/tangled.sh/core/log"
15 "tangled.sh/tangled.sh/core/rbac"
16 "tangled.sh/tangled.sh/core/spindle/config"
17 "tangled.sh/tangled.sh/core/spindle/db"
18 "tangled.sh/tangled.sh/core/spindle/engine"
19)
20
21type Spindle struct {
22 jc *jetstream.JetstreamClient
23 db *db.DB
24 e *rbac.Enforcer
25 l *slog.Logger
26 n *notifier.Notifier
27 eng *engine.Engine
28}
29
30func Run(ctx context.Context) error {
31 cfg, err := config.Load(ctx)
32 if err != nil {
33 return fmt.Errorf("failed to load config: %w", err)
34 }
35
36 d, err := db.Make(cfg.Server.DBPath)
37 if err != nil {
38 return fmt.Errorf("failed to setup db: %w", err)
39 }
40
41 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
42 if err != nil {
43 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
44 }
45
46 logger := log.FromContext(ctx)
47
48 collections := []string{tangled.SpindleMemberNSID}
49 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false)
50 if err != nil {
51 return fmt.Errorf("failed to setup jetstream client: %w", err)
52 }
53
54 n := notifier.New()
55
56 eng, err := engine.New(ctx, d, &n)
57 if err != nil {
58 return err
59 }
60
61 spindle := Spindle{
62 jc: jc,
63 e: e,
64 db: d,
65 l: logger,
66 n: &n,
67 eng: eng,
68 }
69
70 go func() {
71 logger.Info("starting event consumer")
72 knotEventSource := knotclient.NewEventSource("localhost:5555")
73
74 ccfg := knotclient.NewConsumerConfig()
75 ccfg.Logger = logger
76 ccfg.Dev = cfg.Server.Dev
77 ccfg.ProcessFunc = spindle.exec
78 ccfg.AddEventSource(knotEventSource)
79
80 ec := knotclient.NewEventConsumer(*ccfg)
81
82 ec.Start(ctx)
83 }()
84
85 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
86 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
87
88 return nil
89}
90
91func (s *Spindle) Router() http.Handler {
92 mux := &http.ServeMux{}
93
94 mux.HandleFunc("/events", s.Events)
95 return mux
96}
97
98func (s *Spindle) exec(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
99 if msg.Nsid == tangled.PipelineNSID {
100 pipeline := tangled.Pipeline{}
101 err := json.Unmarshal(msg.EventJson, &pipeline)
102 if err != nil {
103 fmt.Println("error unmarshalling", err)
104 return err
105 }
106
107 // this is a "fake" at uri for now
108 pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey)
109
110 rkey := TID()
111 err = s.eng.SetupPipeline(ctx, &pipeline, pipelineAtUri, rkey)
112 if err != nil {
113 return err
114 }
115 err = s.eng.StartWorkflows(ctx, &pipeline, rkey)
116 if err != nil {
117 return err
118 }
119 }
120
121 return nil
122}