···
"tangled.sh/tangled.sh/core/eventconsumer/cursor"
"tangled.sh/tangled.sh/core/log"
"github.com/gorilla/websocket"
···
func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) {
-
retryInterval := c.cfg.RetryInterval
···
err := c.runConnection(ctx, source)
-
c.logger.Error("connection failed", "source", source, "err", err)
-
jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5))
-
delay := retryInterval + jitter
-
if retryInterval < c.cfg.MaxRetryInterval {
-
if retryInterval > c.cfg.MaxRetryInterval {
-
retryInterval = c.cfg.MaxRetryInterval
-
c.logger.Info("retrying connection", "source", source, "delay", delay)
-
case <-time.After(delay):
func (c *Consumer) runConnection(ctx context.Context, source Source) error {
-
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
cursor := c.cfg.CursorStore.Get(source.Key())
u, err := source.Url(cursor, c.cfg.Dev)
···
c.logger.Info("connecting", "url", u.String())
-
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)
c.connMap.Store(source, conn)
defer c.connMap.Delete(source)
c.logger.Info("connected", "source", source)
···
"tangled.sh/tangled.sh/core/eventconsumer/cursor"
"tangled.sh/tangled.sh/core/log"
+
"github.com/avast/retry-go/v4"
"github.com/gorilla/websocket"
···
func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) {
···
err := c.runConnection(ctx, source)
+
c.logger.Error("failed to run connection", "err", err)
func (c *Consumer) runConnection(ctx context.Context, source Source) error {
cursor := c.cfg.CursorStore.Get(source.Key())
u, err := source.Url(cursor, c.cfg.Dev)
···
c.logger.Info("connecting", "url", u.String())
+
retryOpts := []retry.Option{
+
retry.Attempts(0), // infinite attempts
+
retry.DelayType(retry.BackOffDelay),
+
retry.Delay(c.cfg.RetryInterval),
+
retry.MaxDelay(c.cfg.MaxRetryInterval),
+
retry.MaxJitter(c.cfg.RetryInterval / 5),
+
retry.OnRetry(func(n uint, err error) {
+
c.logger.Info("retrying connection",
+
var conn *websocket.Conn
+
err = retry.Do(func() error {
+
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
+
conn, _, err = c.dialer.DialContext(connCtx, u.String(), nil)
c.connMap.Store(source, conn)
defer c.connMap.Delete(source)
c.logger.Info("connected", "source", source)