···
···
"github.com/gorilla/websocket"
16
-
type ProcessFunc func(source string, message []byte) error
17
+
type ProcessFunc func(source EventSource, message Message) error
19
+
type Message struct {
22
+
// do not full deserialize this portion of the message, processFunc can do that
23
+
EventJson json.RawMessage
type ConsumerConfig struct {
27
+
Sources []EventSource
RetryInterval time.Duration
MaxRetryInterval time.Duration
···
38
+
type EventSource struct {
42
+
func NewEventSource(knot string) EventSource {
type EventConsumer struct {
···
58
+
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
64
+
u, err := url.Parse(scheme + "://" + s.Knot + "/events")
70
+
query := url.Values{}
71
+
query.Add("cursor", cursor)
72
+
u.RawQuery = query.Encode()
···
107
-
if err := c.cfg.ProcessFunc(j.source, j.message); err != nil {
147
+
err := json.Unmarshal(j.message, &msg)
149
+
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
152
+
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
c.logger.Error("error processing message", "source", j.source, "err", err)
114
-
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) {
159
+
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) {
retryInterval := c.cfg.RetryInterval
···
147
-
func (c *EventConsumer) runConnection(ctx context.Context, source string) error {
192
+
func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error {
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
u, err := url.Parse(source)
198
+
u, err := c.buildUrl(source, cursor)
203
+
c.logger.Info("connecting", "url", u.String())
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)