From aa1ad967deba0f6934cb64cd8d43c944fea3f922 Mon Sep 17 00:00:00 2001 From: oppiliappan Date: Mon, 9 Jun 2025 15:01:55 +0100 Subject: [PATCH] knotclient: introduce EventSource and Message Change-Id: lkwzyuuoqunovvqtskotvylnlnmymwzt - EventSource: a knot's identity (its hostname currently) - Message: message struct that follows the event structure to deserialize just rkey and nsid upfront, this could allow consumers to filter by nsid or configure cursor ranges to listen to Signed-off-by: oppiliappan --- knotclient/events.go | 73 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 66 insertions(+), 7 deletions(-) diff --git a/knotclient/events.go b/knotclient/events.go index ac7ef95..f4c1fa8 100644 --- a/knotclient/events.go +++ b/knotclient/events.go @@ -2,6 +2,7 @@ package knotclient import ( "context" + "encoding/json" "log/slog" "math/rand" "net/url" @@ -13,10 +14,17 @@ import ( "github.com/gorilla/websocket" ) -type ProcessFunc func(source string, message []byte) error +type ProcessFunc func(source EventSource, message Message) error + +type Message struct { + Rkey string + Nsid string + // do not full deserialize this portion of the message, processFunc can do that + EventJson json.RawMessage +} type ConsumerConfig struct { - Sources []string + Sources map[EventSource]struct{} ProcessFunc ProcessFunc RetryInterval time.Duration MaxRetryInterval time.Duration @@ -24,6 +32,17 @@ type ConsumerConfig struct { WorkerCount int QueueSize int Logger *slog.Logger + Dev bool +} + +type EventSource struct { + Knot string +} + +func NewEventSource(knot string) EventSource { + return EventSource{ + Knot: knot, + } } type EventConsumer struct { @@ -34,10 +53,32 @@ type EventConsumer struct { jobQueue chan job logger *slog.Logger randSource *rand.Rand + + // rw lock over edits to consumer config + mu sync.RWMutex +} + +func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) { + scheme := "wss" + if e.cfg.Dev { + scheme = "ws" + } + + u, err := url.Parse(scheme + "://" + s.Knot + "/events") + if err != nil { + return nil, err + } + + if cursor != "" { + query := url.Values{} + query.Add("cursor", cursor) + u.RawQuery = query.Encode() + } + return u, nil } type job struct { - source string + source EventSource message []byte } @@ -77,7 +118,7 @@ func (c *EventConsumer) Start(ctx context.Context) { } // start streaming - for _, source := range c.cfg.Sources { + for source := range c.cfg.Sources { c.wg.Add(1) go c.startConnectionLoop(ctx, source) } @@ -94,6 +135,14 @@ func (c *EventConsumer) Stop() { close(c.jobQueue) } +func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) { + c.mu.Lock() + c.cfg.Sources[s] = struct{}{} + c.wg.Add(1) + go c.startConnectionLoop(ctx, s) + c.mu.Unlock() +} + func (c *EventConsumer) worker(ctx context.Context) { defer c.wg.Done() for { @@ -104,14 +153,21 @@ func (c *EventConsumer) worker(ctx context.Context) { if !ok { return } - if err := c.cfg.ProcessFunc(j.source, j.message); err != nil { + + var msg Message + err := json.Unmarshal(j.message, &msg) + if err != nil { + c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err) + return + } + if err := c.cfg.ProcessFunc(j.source, msg); err != nil { c.logger.Error("error processing message", "source", j.source, "err", err) } } } } -func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) { +func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) { defer c.wg.Done() retryInterval := c.cfg.RetryInterval for { @@ -144,15 +200,18 @@ func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) } } -func (c *EventConsumer) runConnection(ctx context.Context, source string) error { +func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error { connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) defer cancel() u, err := url.Parse(source) + + u, err := c.buildUrl(source, cursor) if err != nil { return err } + c.logger.Info("connecting", "url", u.String()) conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil) if err != nil { return err -- 2.43.0