From 541f45bc4bcc15855056e64e367d4f0a8bfbfb22 Mon Sep 17 00:00:00 2001 From: oppiliappan Date: Mon, 9 Jun 2025 15:01:55 +0100 Subject: [PATCH] knotclient: introduce EventSource and Message Change-Id: vkkpslxozxqvntnmnunpprnyyuyspnor - EventSource: a knot's identity (its hostname currently) - Message: message struct that follows the event structure to deserialize just rkey and nsid upfront, this could allow consumers to filter by nsid or configure cursor ranges to listen to Signed-off-by: oppiliappan --- knotclient/events.go | 60 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 54 insertions(+), 6 deletions(-) diff --git a/knotclient/events.go b/knotclient/events.go index ac7ef95..3c8d309 100644 --- a/knotclient/events.go +++ b/knotclient/events.go @@ -2,6 +2,7 @@ package knotclient import ( "context" + "encoding/json" "log/slog" "math/rand" "net/url" @@ -13,10 +14,17 @@ import ( "github.com/gorilla/websocket" ) -type ProcessFunc func(source string, message []byte) error +type ProcessFunc func(source EventSource, message Message) error + +type Message struct { + Rkey string + Nsid string + // do not full deserialize this portion of the message, processFunc can do that + EventJson json.RawMessage +} type ConsumerConfig struct { - Sources []string + Sources []EventSource ProcessFunc ProcessFunc RetryInterval time.Duration MaxRetryInterval time.Duration @@ -24,6 +32,17 @@ type ConsumerConfig struct { WorkerCount int QueueSize int Logger *slog.Logger + Dev bool +} + +type EventSource struct { + Knot string +} + +func NewEventSource(knot string) EventSource { + return EventSource{ + Knot: knot, + } } type EventConsumer struct { @@ -36,8 +55,27 @@ type EventConsumer struct { randSource *rand.Rand } +func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) { + scheme := "wss" + if e.cfg.Dev { + scheme = "ws" + } + + u, err := url.Parse(scheme + "://" + s.Knot + "/events") + if err != nil { + return nil, err + } + + if cursor != "" { + query := url.Values{} + query.Add("cursor", cursor) + u.RawQuery = query.Encode() + } + return u, nil +} + type job struct { - source string + source EventSource message []byte } @@ -104,14 +142,21 @@ func (c *EventConsumer) worker(ctx context.Context) { if !ok { return } - if err := c.cfg.ProcessFunc(j.source, j.message); err != nil { + + var msg Message + err := json.Unmarshal(j.message, &msg) + if err != nil { + c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err) + return + } + if err := c.cfg.ProcessFunc(j.source, msg); err != nil { c.logger.Error("error processing message", "source", j.source, "err", err) } } } } -func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) { +func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) { defer c.wg.Done() retryInterval := c.cfg.RetryInterval for { @@ -144,15 +189,18 @@ func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) } } -func (c *EventConsumer) runConnection(ctx context.Context, source string) error { +func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error { connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) defer cancel() u, err := url.Parse(source) + + u, err := c.buildUrl(source, cursor) if err != nil { return err } + c.logger.Info("connecting", "url", u.String()) conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil) if err != nil { return err -- 2.43.0