forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle
2
3import (
4 "encoding/json"
5 "fmt"
6 "log/slog"
7 "net/http"
8
9 "golang.org/x/net/context"
10 "tangled.sh/tangled.sh/core/api/tangled"
11 "tangled.sh/tangled.sh/core/jetstream"
12 "tangled.sh/tangled.sh/core/knotclient"
13 "tangled.sh/tangled.sh/core/knotserver/notifier"
14 "tangled.sh/tangled.sh/core/log"
15 "tangled.sh/tangled.sh/core/rbac"
16 "tangled.sh/tangled.sh/core/spindle/config"
17 "tangled.sh/tangled.sh/core/spindle/db"
18 "tangled.sh/tangled.sh/core/spindle/engine"
19)
20
21type Spindle struct {
22 jc *jetstream.JetstreamClient
23 db *db.DB
24 e *rbac.Enforcer
25 l *slog.Logger
26 n *notifier.Notifier
27 eng *engine.Engine
28}
29
30func Run(ctx context.Context) error {
31 cfg, err := config.Load(ctx)
32 if err != nil {
33 return fmt.Errorf("failed to load config: %w", err)
34 }
35
36 d, err := db.Make(cfg.Server.DBPath)
37 if err != nil {
38 return fmt.Errorf("failed to setup db: %w", err)
39 }
40
41 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
42 if err != nil {
43 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
44 }
45
46 logger := log.FromContext(ctx)
47
48 collections := []string{tangled.SpindleMemberNSID}
49 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false)
50 if err != nil {
51 return fmt.Errorf("failed to setup jetstream client: %w", err)
52 }
53
54 n := notifier.New()
55
56 eng, err := engine.New(ctx, d)
57 if err != nil {
58 return err
59 }
60
61 spindle := Spindle{
62 jc: jc,
63 e: e,
64 db: d,
65 l: logger,
66 n: &n,
67 eng: eng,
68 }
69
70 go func() {
71 logger.Info("starting event consumer")
72 knotEventSource := knotclient.NewEventSource("localhost:5555")
73 ccfg := knotclient.ConsumerConfig{
74 Logger: logger,
75 ProcessFunc: spindle.exec,
76 }
77 ccfg.AddEventSource(knotEventSource)
78
79 ec := knotclient.NewEventConsumer(ccfg)
80
81 ec.Start(ctx)
82 }()
83
84 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
85 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
86
87 return nil
88}
89
90func (s *Spindle) Router() http.Handler {
91 mux := &http.ServeMux{}
92
93 mux.HandleFunc("/events", s.Events)
94 return mux
95}
96
97func (s *Spindle) exec(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
98 pipeline := tangled.Pipeline{}
99 err := json.Unmarshal(msg.EventJson, &pipeline)
100 if err != nil {
101 fmt.Println("error unmarshalling", err)
102 return err
103 }
104
105 if msg.Nsid == tangled.PipelineNSID {
106 err = s.eng.SetupPipeline(ctx, &pipeline, msg.Rkey)
107 if err != nil {
108 return err
109 }
110 err = s.eng.StartWorkflows(ctx, &pipeline, msg.Rkey)
111 if err != nil {
112 return err
113 }
114 }
115
116 return nil
117}