···
···
"github.com/gorilla/websocket"
-
type ProcessFunc func(source string, message []byte) error
type ConsumerConfig struct {
RetryInterval time.Duration
MaxRetryInterval time.Duration
···
type EventConsumer struct {
···
···
-
if err := c.cfg.ProcessFunc(j.source, j.message); err != nil {
c.logger.Error("error processing message", "source", j.source, "err", err)
-
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) {
retryInterval := c.cfg.RetryInterval
···
-
func (c *EventConsumer) runConnection(ctx context.Context, source string) error {
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
u, err := url.Parse(source)
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)
···
···
"github.com/gorilla/websocket"
+
type ProcessFunc func(source EventSource, message Message) error
+
// do not full deserialize this portion of the message, processFunc can do that
+
EventJson json.RawMessage
type ConsumerConfig struct {
RetryInterval time.Duration
MaxRetryInterval time.Duration
···
+
type EventSource struct {
+
func NewEventSource(knot string) EventSource {
type EventConsumer struct {
···
+
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
+
u, err := url.Parse(scheme + "://" + s.Knot + "/events")
+
query.Add("cursor", cursor)
+
u.RawQuery = query.Encode()
···
+
err := json.Unmarshal(j.message, &msg)
+
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
+
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
c.logger.Error("error processing message", "source", j.source, "err", err)
+
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) {
retryInterval := c.cfg.RetryInterval
···
+
func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error {
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
u, err := url.Parse(source)
+
u, err := c.buildUrl(source, cursor)
+
c.logger.Info("connecting", "url", u.String())
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)