From eb6da2c10b2812967ac38e1ae29871f46a692301 Mon Sep 17 00:00:00 2001 From: oppiliappan Date: Fri, 11 Jul 2025 09:57:10 +0100 Subject: [PATCH] eventconsumer: rework retry mechanism Change-Id: ktsnmppqsnlsktkpwvlokknkrmwtnzuk the previous retry mechanism had a slight flaw: successful connections did not reset the exponent on the retry interval. this results in constantly growing retry intervals: attempt #1 - wait 5s attempt #2 - wait 10s attempt #3 - success! . . . disconnect attempt #4 - wait 20s what we want to see however, is a pattern like so: attempt #1 - wait 5s attempt #2 - wait 10s attempt #3 - success! . . . disconnect attempt #1 - wait 5s this is solved by slapping the retry logic around DialConnection, which is a more atomic point of connection attempt. retry logic is also offloaded to the github.com/avast-go/retry package Signed-off-by: oppiliappan --- eventconsumer/consumer.go | 55 ++++++++++++++++++++++----------------- flake.nix | 2 +- go.mod | 1 + go.sum | 2 ++ 4 files changed, 35 insertions(+), 25 deletions(-) diff --git a/eventconsumer/consumer.go b/eventconsumer/consumer.go index 051f584..df98ffb 100644 --- a/eventconsumer/consumer.go +++ b/eventconsumer/consumer.go @@ -12,6 +12,7 @@ import ( "tangled.sh/tangled.sh/core/eventconsumer/cursor" "tangled.sh/tangled.sh/core/log" + "github.com/avast/retry-go/v4" "github.com/gorilla/websocket" ) @@ -170,7 +171,7 @@ func (c *Consumer) worker(ctx context.Context) { func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) { defer c.wg.Done() - retryInterval := c.cfg.RetryInterval + for { select { case <-ctx.Done(): @@ -178,33 +179,13 @@ func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) { default: err := c.runConnection(ctx, source) if err != nil { - c.logger.Error("connection failed", "source", source, "err", err) - } - - // apply jitter - jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5)) - delay := retryInterval + jitter - - if retryInterval < c.cfg.MaxRetryInterval { - retryInterval *= 2 - if retryInterval > c.cfg.MaxRetryInterval { - retryInterval = c.cfg.MaxRetryInterval - } - } - c.logger.Info("retrying connection", "source", source, "delay", delay) - select { - case <-time.After(delay): - case <-ctx.Done(): - return + c.logger.Error("failed to run connection", "err", err) } } } } func (c *Consumer) runConnection(ctx context.Context, source Source) error { - connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) - defer cancel() - cursor := c.cfg.CursorStore.Get(source.Key()) u, err := source.Url(cursor, c.cfg.Dev) @@ -213,12 +194,38 @@ func (c *Consumer) runConnection(ctx context.Context, source Source) error { } c.logger.Info("connecting", "url", u.String()) - conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil) + + retryOpts := []retry.Option{ + retry.Attempts(0), // infinite attempts + retry.DelayType(retry.BackOffDelay), + retry.Delay(c.cfg.RetryInterval), + retry.MaxDelay(c.cfg.MaxRetryInterval), + retry.MaxJitter(c.cfg.RetryInterval / 5), + retry.OnRetry(func(n uint, err error) { + c.logger.Info("retrying connection", + "source", source, + "url", u.String(), + "attempt", n+1, + "err", err, + ) + }), + retry.Context(ctx), + } + + var conn *websocket.Conn + + err = retry.Do(func() error { + connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) + defer cancel() + conn, _, err = c.dialer.DialContext(connCtx, u.String(), nil) + return err + }, retryOpts...) if err != nil { return err } - defer conn.Close() + c.connMap.Store(source, conn) + defer conn.Close() defer c.connMap.Delete(source) c.logger.Info("connected", "source", source) diff --git a/flake.nix b/flake.nix index c5d06ac..c57e487 100644 --- a/flake.nix +++ b/flake.nix @@ -61,7 +61,7 @@ inherit (gitignore.lib) gitignoreSource; in { overlays.default = final: prev: let - goModHash = "sha256-2RUwj16RNaZ/gCOcd7b3LRCHiROCRj9HuzbBdLdgWGo="; + goModHash = "sha256-SLi+nALwCd/Lzn3aljwPqCo2UaM9hl/4OAjcHQLt2Bk="; appviewDeps = { inherit htmx-src htmx-ws-src lucide-src inter-fonts-src ibm-plex-mono-src goModHash gitignoreSource; }; diff --git a/go.mod b/go.mod index c5466cb..3b1c580 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/ProtonMail/go-crypto v1.2.0 // indirect github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect + github.com/avast/retry-go/v4 v4.6.1 // indirect github.com/aymerick/douceur v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmatcuk/doublestar/v4 v4.7.1 // indirect diff --git a/go.sum b/go.sum index 6ecfbfb..279969c 100644 --- a/go.sum +++ b/go.sum @@ -17,6 +17,8 @@ github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFI github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= +github.com/avast/retry-go/v4 v4.6.1 h1:VkOLRubHdisGrHnTu89g08aQEWEgRU7LVEop3GbIcMk= +github.com/avast/retry-go/v4 v4.6.1/go.mod h1:V6oF8njAwxJ5gRo1Q7Cxab24xs5NCWZBeaHHBklR8mA= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= -- 2.43.0