From 03269e909d88f0e8e52bb48622ba9cee88a2c34c Mon Sep 17 00:00:00 2001 From: oppiliappan Date: Tue, 17 Jun 2025 11:21:07 +0100 Subject: [PATCH] eventconsumer: extract knotclient/events into its own package Change-Id: mwkwusmyymnorkztqxtquwtkvupozuxm Signed-off-by: oppiliappan --- .../events.go => eventconsumer/consumer.go | 73 +++++++------------ .../cursor/memory.go | 0 {knotclient => eventconsumer}/cursor/redis.go | 0 .../cursor/sqlite.go | 0 {knotclient => eventconsumer}/cursor/store.go | 0 eventconsumer/knot.go | 39 ++++++++++ eventconsumer/spindle.go | 39 ++++++++++ spindle/ingester.go | 4 +- spindle/server.go | 16 ++-- spindle/stream.go | 29 ++++++-- 10 files changed, 137 insertions(+), 63 deletions(-) rename knotclient/events.go => eventconsumer/consumer.go (74%) rename {knotclient => eventconsumer}/cursor/memory.go (100%) rename {knotclient => eventconsumer}/cursor/redis.go (100%) rename {knotclient => eventconsumer}/cursor/sqlite.go (100%) rename {knotclient => eventconsumer}/cursor/store.go (100%) create mode 100644 eventconsumer/knot.go create mode 100644 eventconsumer/spindle.go diff --git a/knotclient/events.go b/eventconsumer/consumer.go similarity index 74% rename from knotclient/events.go rename to eventconsumer/consumer.go index f0ceebf..051f584 100644 --- a/knotclient/events.go +++ b/eventconsumer/consumer.go @@ -1,22 +1,21 @@ -package knotclient +package eventconsumer import ( "context" "encoding/json" - "fmt" "log/slog" "math/rand" "net/url" "sync" "time" - "tangled.sh/tangled.sh/core/knotclient/cursor" + "tangled.sh/tangled.sh/core/eventconsumer/cursor" "tangled.sh/tangled.sh/core/log" "github.com/gorilla/websocket" ) -type ProcessFunc func(ctx context.Context, source EventSource, message Message) error +type ProcessFunc func(ctx context.Context, source Source, message Message) error type Message struct { Rkey string @@ -26,7 +25,7 @@ type Message struct { } type ConsumerConfig struct { - Sources map[EventSource]struct{} + Sources map[Source]struct{} ProcessFunc ProcessFunc RetryInterval time.Duration MaxRetryInterval time.Duration @@ -40,21 +39,18 @@ type ConsumerConfig struct { func NewConsumerConfig() *ConsumerConfig { return &ConsumerConfig{ - Sources: make(map[EventSource]struct{}), + Sources: make(map[Source]struct{}), } } -type EventSource struct { - Knot string +type Source interface { + // url to start streaming events from + Url(cursor int64, dev bool) (*url.URL, error) + // cache key for cursor storage + Key() string } -func NewEventSource(knot string) EventSource { - return EventSource{ - Knot: knot, - } -} - -type EventConsumer struct { +type Consumer struct { wg sync.WaitGroup dialer *websocket.Dialer connMap sync.Map @@ -67,31 +63,12 @@ type EventConsumer struct { cfg ConsumerConfig } -func (e *EventConsumer) buildUrl(s EventSource, cursor int64) (*url.URL, error) { - scheme := "wss" - if e.cfg.Dev { - scheme = "ws" - } - - u, err := url.Parse(scheme + "://" + s.Knot + "/events") - if err != nil { - return nil, err - } - - if cursor != 0 { - query := url.Values{} - query.Add("cursor", fmt.Sprintf("%d", cursor)) - u.RawQuery = query.Encode() - } - return u, nil -} - type job struct { - source EventSource + source Source message []byte } -func NewEventConsumer(cfg ConsumerConfig) *EventConsumer { +func NewConsumer(cfg ConsumerConfig) *Consumer { if cfg.RetryInterval == 0 { cfg.RetryInterval = 15 * time.Minute } @@ -105,7 +82,7 @@ func NewEventConsumer(cfg ConsumerConfig) *EventConsumer { cfg.MaxRetryInterval = 1 * time.Hour } if cfg.Logger == nil { - cfg.Logger = log.New("eventconsumer") + cfg.Logger = log.New("consumer") } if cfg.QueueSize == 0 { cfg.QueueSize = 100 @@ -113,7 +90,7 @@ func NewEventConsumer(cfg ConsumerConfig) *EventConsumer { if cfg.CursorStore == nil { cfg.CursorStore = &cursor.MemoryStore{} } - return &EventConsumer{ + return &Consumer{ cfg: cfg, dialer: websocket.DefaultDialer, jobQueue: make(chan job, cfg.QueueSize), // buffered job queue @@ -122,7 +99,7 @@ func NewEventConsumer(cfg ConsumerConfig) *EventConsumer { } } -func (c *EventConsumer) Start(ctx context.Context) { +func (c *Consumer) Start(ctx context.Context) { c.cfg.Logger.Info("starting consumer", "config", c.cfg) // start workers @@ -138,7 +115,7 @@ func (c *EventConsumer) Start(ctx context.Context) { } } -func (c *EventConsumer) Stop() { +func (c *Consumer) Stop() { c.connMap.Range(func(_, val any) bool { if conn, ok := val.(*websocket.Conn); ok { conn.Close() @@ -149,7 +126,7 @@ func (c *EventConsumer) Stop() { close(c.jobQueue) } -func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) { +func (c *Consumer) AddSource(ctx context.Context, s Source) { // we are already listening to this source if _, ok := c.cfg.Sources[s]; ok { c.logger.Info("source already present", "source", s) @@ -163,7 +140,7 @@ func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) { c.cfgMu.Unlock() } -func (c *EventConsumer) worker(ctx context.Context) { +func (c *Consumer) worker(ctx context.Context) { defer c.wg.Done() for { select { @@ -177,12 +154,12 @@ func (c *EventConsumer) worker(ctx context.Context) { var msg Message err := json.Unmarshal(j.message, &msg) if err != nil { - c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err) + c.logger.Error("error deserializing message", "source", j.source.Key(), "err", err) return } // update cursor - c.cfg.CursorStore.Set(j.source.Knot, time.Now().UnixNano()) + c.cfg.CursorStore.Set(j.source.Key(), time.Now().UnixNano()) if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil { c.logger.Error("error processing message", "source", j.source, "err", err) @@ -191,7 +168,7 @@ func (c *EventConsumer) worker(ctx context.Context) { } } -func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) { +func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) { defer c.wg.Done() retryInterval := c.cfg.RetryInterval for { @@ -224,13 +201,13 @@ func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSou } } -func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error { +func (c *Consumer) runConnection(ctx context.Context, source Source) error { connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) defer cancel() - cursor := c.cfg.CursorStore.Get(source.Knot) + cursor := c.cfg.CursorStore.Get(source.Key()) - u, err := c.buildUrl(source, cursor) + u, err := source.Url(cursor, c.cfg.Dev) if err != nil { return err } diff --git a/knotclient/cursor/memory.go b/eventconsumer/cursor/memory.go similarity index 100% rename from knotclient/cursor/memory.go rename to eventconsumer/cursor/memory.go diff --git a/knotclient/cursor/redis.go b/eventconsumer/cursor/redis.go similarity index 100% rename from knotclient/cursor/redis.go rename to eventconsumer/cursor/redis.go diff --git a/knotclient/cursor/sqlite.go b/eventconsumer/cursor/sqlite.go similarity index 100% rename from knotclient/cursor/sqlite.go rename to eventconsumer/cursor/sqlite.go diff --git a/knotclient/cursor/store.go b/eventconsumer/cursor/store.go similarity index 100% rename from knotclient/cursor/store.go rename to eventconsumer/cursor/store.go diff --git a/eventconsumer/knot.go b/eventconsumer/knot.go new file mode 100644 index 0000000..1474281 --- /dev/null +++ b/eventconsumer/knot.go @@ -0,0 +1,39 @@ +package eventconsumer + +import ( + "fmt" + "net/url" +) + +type KnotSource struct { + Knot string +} + +func (k KnotSource) Key() string { + return k.Knot +} + +func (k KnotSource) Url(cursor int64, dev bool) (*url.URL, error) { + scheme := "wss" + if dev { + scheme = "ws" + } + + u, err := url.Parse(scheme + "://" + k.Knot + "/events") + if err != nil { + return nil, err + } + + if cursor != 0 { + query := url.Values{} + query.Add("cursor", fmt.Sprintf("%d", cursor)) + u.RawQuery = query.Encode() + } + return u, nil +} + +func NewKnotSource(knot string) KnotSource { + return KnotSource{ + Knot: knot, + } +} diff --git a/eventconsumer/spindle.go b/eventconsumer/spindle.go new file mode 100644 index 0000000..419d3ea --- /dev/null +++ b/eventconsumer/spindle.go @@ -0,0 +1,39 @@ +package eventconsumer + +import ( + "fmt" + "net/url" +) + +type SpindleSource struct { + Spindle string +} + +func (s SpindleSource) Key() string { + return s.Spindle +} + +func (s SpindleSource) Url(cursor int64, dev bool) (*url.URL, error) { + scheme := "wss" + if dev { + scheme = "ws" + } + + u, err := url.Parse(scheme + "://" + s.Spindle + "/events") + if err != nil { + return nil, err + } + + if cursor != 0 { + query := url.Values{} + query.Add("cursor", fmt.Sprintf("%d", cursor)) + u.RawQuery = query.Encode() + } + return u, nil +} + +func NewSpindleSource(spindle string) SpindleSource { + return SpindleSource{ + Spindle: spindle, + } +} diff --git a/spindle/ingester.go b/spindle/ingester.go index a144747..c58e7e5 100644 --- a/spindle/ingester.go +++ b/spindle/ingester.go @@ -6,7 +6,7 @@ import ( "fmt" "tangled.sh/tangled.sh/core/api/tangled" - "tangled.sh/tangled.sh/core/knotclient" + "tangled.sh/tangled.sh/core/eventconsumer" "github.com/bluesky-social/jetstream/pkg/models" ) @@ -128,7 +128,7 @@ func (s *Spindle) ingestRepo(_ context.Context, e *models.Event) error { } // add this knot to the event consumer - src := knotclient.NewEventSource(record.Knot) + src := eventconsumer.NewKnotSource(record.Knot) s.ks.AddSource(context.Background(), src) return nil diff --git a/spindle/server.go b/spindle/server.go index e2e6618..3203d65 100644 --- a/spindle/server.go +++ b/spindle/server.go @@ -9,9 +9,9 @@ import ( "github.com/go-chi/chi/v5" "tangled.sh/tangled.sh/core/api/tangled" + "tangled.sh/tangled.sh/core/eventconsumer" + "tangled.sh/tangled.sh/core/eventconsumer/cursor" "tangled.sh/tangled.sh/core/jetstream" - "tangled.sh/tangled.sh/core/knotclient" - "tangled.sh/tangled.sh/core/knotclient/cursor" "tangled.sh/tangled.sh/core/log" "tangled.sh/tangled.sh/core/notifier" "tangled.sh/tangled.sh/core/rbac" @@ -35,7 +35,7 @@ type Spindle struct { eng *engine.Engine jq *queue.Queue cfg *config.Config - ks *knotclient.EventConsumer + ks *eventconsumer.Consumer } func Run(ctx context.Context) error { @@ -114,7 +114,7 @@ func Run(ctx context.Context) error { // for each incoming sh.tangled.pipeline, we execute // spindle.processPipeline, which in turn enqueues the pipeline // job in the above registered queue. - ccfg := knotclient.NewConsumerConfig() + ccfg := eventconsumer.NewConsumerConfig() ccfg.Logger = logger ccfg.Dev = cfg.Server.Dev ccfg.ProcessFunc = spindle.processPipeline @@ -125,9 +125,9 @@ func Run(ctx context.Context) error { } for _, knot := range knownKnots { logger.Info("adding source start", "knot", knot) - ccfg.Sources[knotclient.EventSource{knot}] = struct{}{} + ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} } - spindle.ks = knotclient.NewEventConsumer(*ccfg) + spindle.ks = eventconsumer.NewConsumer(*ccfg) go func() { logger.Info("starting knot event consumer") @@ -151,7 +151,7 @@ func (s *Spindle) Router() http.Handler { return mux } -func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error { +func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { if msg.Nsid == tangled.PipelineNSID { pipeline := tangled.Pipeline{} err := json.Unmarshal(msg.EventJson, &pipeline) @@ -179,7 +179,7 @@ func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSourc } pipelineId := models.PipelineId{ - Knot: src.Knot, + Knot: src.Key(), Rkey: msg.Rkey, } diff --git a/spindle/stream.go b/spindle/stream.go index 6ca7ed0..28c9954 100644 --- a/spindle/stream.go +++ b/spindle/stream.go @@ -2,6 +2,7 @@ package spindle import ( "context" + "encoding/json" "fmt" "net/http" "strconv" @@ -206,19 +207,37 @@ func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, wid mode } func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { - ops, err := s.db.GetEvents(*cursor) + events, err := s.db.GetEvents(*cursor) if err != nil { s.l.Debug("err", "err", err) return err } - s.l.Debug("ops", "ops", ops) + s.l.Debug("ops", "ops", events) + + for _, event := range events { + // first extract the inner json into a map + var eventJson map[string]any + err := json.Unmarshal([]byte(event.EventJson), &eventJson) + if err != nil { + s.l.Error("failed to unmarshal event", "err", err) + return err + } + + jsonMsg, err := json.Marshal(map[string]any{ + "rkey": event.Rkey, + "nsid": event.Nsid, + "event": eventJson, + }) + if err != nil { + s.l.Error("failed to marshal record", "err", err) + return err + } - for _, op := range ops { - if err := conn.WriteJSON(op); err != nil { + if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { s.l.Debug("err", "err", err) return err } - *cursor = op.Created + *cursor = event.Created } return nil -- 2.43.0