···
"tangled.sh/tangled.sh/core/eventconsumer/cursor"
"tangled.sh/tangled.sh/core/log"
15
+
"github.com/avast/retry-go/v4"
"github.com/gorilla/websocket"
···
func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) {
173
-
retryInterval := c.cfg.RetryInterval
···
err := c.runConnection(ctx, source)
181
-
c.logger.Error("connection failed", "source", source, "err", err)
185
-
jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5))
186
-
delay := retryInterval + jitter
188
-
if retryInterval < c.cfg.MaxRetryInterval {
190
-
if retryInterval > c.cfg.MaxRetryInterval {
191
-
retryInterval = c.cfg.MaxRetryInterval
194
-
c.logger.Info("retrying connection", "source", source, "delay", delay)
196
-
case <-time.After(delay):
182
+
c.logger.Error("failed to run connection", "err", err)
func (c *Consumer) runConnection(ctx context.Context, source Source) error {
205
-
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
cursor := c.cfg.CursorStore.Get(source.Key())
u, err := source.Url(cursor, c.cfg.Dev)
···
c.logger.Info("connecting", "url", u.String())
216
-
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)
198
+
retryOpts := []retry.Option{
199
+
retry.Attempts(0), // infinite attempts
200
+
retry.DelayType(retry.BackOffDelay),
201
+
retry.Delay(c.cfg.RetryInterval),
202
+
retry.MaxDelay(c.cfg.MaxRetryInterval),
203
+
retry.MaxJitter(c.cfg.RetryInterval / 5),
204
+
retry.OnRetry(func(n uint, err error) {
205
+
c.logger.Info("retrying connection",
212
+
retry.Context(ctx),
215
+
var conn *websocket.Conn
217
+
err = retry.Do(func() error {
218
+
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
220
+
conn, _, err = c.dialer.DialContext(connCtx, u.String(), nil)
c.connMap.Store(source, conn)
defer c.connMap.Delete(source)
c.logger.Info("connected", "source", source)