knotclient: introduce EventSource and Message #234

deleted
opened by oppi.li targeting master from push-qoplqnlvlqqo
  • EventSource: a knot's identity (its hostname currently)
  • Message: message struct that follows the event structure to deserialize just rkey and nsid upfront, this could allow consumers to filter by nsid or configure cursor ranges to listen to

Signed-off-by: oppiliappan me@oppi.li

Changed files
+54 -6
knotclient
+54 -6
knotclient/events.go
···
import (
"context"
+
"encoding/json"
"log/slog"
"math/rand"
"net/url"
···
"github.com/gorilla/websocket"
)
-
type ProcessFunc func(source string, message []byte) error
+
type ProcessFunc func(source EventSource, message Message) error
+
+
type Message struct {
+
Rkey string
+
Nsid string
+
// do not full deserialize this portion of the message, processFunc can do that
+
EventJson json.RawMessage
+
}
type ConsumerConfig struct {
-
Sources []string
+
Sources []EventSource
ProcessFunc ProcessFunc
RetryInterval time.Duration
MaxRetryInterval time.Duration
···
WorkerCount int
QueueSize int
Logger *slog.Logger
+
Dev bool
+
}
+
+
type EventSource struct {
+
Knot string
+
}
+
+
func NewEventSource(knot string) EventSource {
+
return EventSource{
+
Knot: knot,
+
}
}
type EventConsumer struct {
···
randSource *rand.Rand
}
+
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
+
scheme := "wss"
+
if e.cfg.Dev {
+
scheme = "ws"
+
}
+
+
u, err := url.Parse(scheme + "://" + s.Knot + "/events")
+
if err != nil {
+
return nil, err
+
}
+
+
if cursor != "" {
+
query := url.Values{}
+
query.Add("cursor", cursor)
+
u.RawQuery = query.Encode()
+
}
+
return u, nil
+
}
+
type job struct {
-
source string
+
source EventSource
message []byte
}
···
if !ok {
return
}
-
if err := c.cfg.ProcessFunc(j.source, j.message); err != nil {
+
+
var msg Message
+
err := json.Unmarshal(j.message, &msg)
+
if err != nil {
+
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
+
return
+
}
+
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
c.logger.Error("error processing message", "source", j.source, "err", err)
}
}
}
}
-
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) {
+
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) {
defer c.wg.Done()
retryInterval := c.cfg.RetryInterval
for {
···
}
}
-
func (c *EventConsumer) runConnection(ctx context.Context, source string) error {
+
func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error {
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
defer cancel()
u, err := url.Parse(source)
+
+
u, err := c.buildUrl(source, cursor)
if err != nil {
return err
}
+
c.logger.Info("connecting", "url", u.String())
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)
if err != nil {
return err