eventconsumer: rework retry mechanism #273

merged
opened by oppi.li targeting master from push-ktsnmppqsnls

the previous retry mechanism had a slight flaw: successful connections did not reset the exponent on the retry interval. this results in constantly growing retry intervals:

attempt #1 - wait 5s
attempt #2 - wait 10s
attempt #3 - success!
.
.
.
disconnect
attempt #4 - wait 20s

what we want to see however, is a pattern like so:

attempt #1 - wait 5s
attempt #2 - wait 10s
attempt #3 - success!
.
.
.
disconnect
attempt #1 - wait 5s

this is solved by slapping the retry logic around DialConnection, which is a more atomic point of connection attempt. retry logic is also offloaded to the github.com/avast-go/retry package

Signed-off-by: oppiliappan me@oppi.li

Changed files
+35 -25
eventconsumer
+31 -24
eventconsumer/consumer.go
···
"tangled.sh/tangled.sh/core/eventconsumer/cursor"
"tangled.sh/tangled.sh/core/log"
"github.com/gorilla/websocket"
)
···
func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) {
defer c.wg.Done()
-
retryInterval := c.cfg.RetryInterval
for {
select {
case <-ctx.Done():
···
default:
err := c.runConnection(ctx, source)
if err != nil {
-
c.logger.Error("connection failed", "source", source, "err", err)
-
}
-
-
// apply jitter
-
jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5))
-
delay := retryInterval + jitter
-
-
if retryInterval < c.cfg.MaxRetryInterval {
-
retryInterval *= 2
-
if retryInterval > c.cfg.MaxRetryInterval {
-
retryInterval = c.cfg.MaxRetryInterval
-
}
-
}
-
c.logger.Info("retrying connection", "source", source, "delay", delay)
-
select {
-
case <-time.After(delay):
-
case <-ctx.Done():
-
return
}
}
}
}
func (c *Consumer) runConnection(ctx context.Context, source Source) error {
-
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
-
defer cancel()
-
cursor := c.cfg.CursorStore.Get(source.Key())
u, err := source.Url(cursor, c.cfg.Dev)
···
}
c.logger.Info("connecting", "url", u.String())
-
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)
if err != nil {
return err
}
-
defer conn.Close()
c.connMap.Store(source, conn)
defer c.connMap.Delete(source)
c.logger.Info("connected", "source", source)
···
"tangled.sh/tangled.sh/core/eventconsumer/cursor"
"tangled.sh/tangled.sh/core/log"
+
"github.com/avast/retry-go/v4"
"github.com/gorilla/websocket"
)
···
func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) {
defer c.wg.Done()
+
for {
select {
case <-ctx.Done():
···
default:
err := c.runConnection(ctx, source)
if err != nil {
+
c.logger.Error("failed to run connection", "err", err)
}
}
}
}
func (c *Consumer) runConnection(ctx context.Context, source Source) error {
cursor := c.cfg.CursorStore.Get(source.Key())
u, err := source.Url(cursor, c.cfg.Dev)
···
}
c.logger.Info("connecting", "url", u.String())
+
+
retryOpts := []retry.Option{
+
retry.Attempts(0), // infinite attempts
+
retry.DelayType(retry.BackOffDelay),
+
retry.Delay(c.cfg.RetryInterval),
+
retry.MaxDelay(c.cfg.MaxRetryInterval),
+
retry.MaxJitter(c.cfg.RetryInterval / 5),
+
retry.OnRetry(func(n uint, err error) {
+
c.logger.Info("retrying connection",
+
"source", source,
+
"url", u.String(),
+
"attempt", n+1,
+
"err", err,
+
)
+
}),
+
retry.Context(ctx),
+
}
+
+
var conn *websocket.Conn
+
+
err = retry.Do(func() error {
+
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
+
defer cancel()
+
conn, _, err = c.dialer.DialContext(connCtx, u.String(), nil)
+
return err
+
}, retryOpts...)
if err != nil {
return err
}
+
c.connMap.Store(source, conn)
+
defer conn.Close()
defer c.connMap.Delete(source)
c.logger.Info("connected", "source", source)
+1 -1
flake.nix
···
inherit (gitignore.lib) gitignoreSource;
in {
overlays.default = final: prev: let
-
goModHash = "sha256-2RUwj16RNaZ/gCOcd7b3LRCHiROCRj9HuzbBdLdgWGo=";
appviewDeps = {
inherit htmx-src htmx-ws-src lucide-src inter-fonts-src ibm-plex-mono-src goModHash gitignoreSource;
};
···
inherit (gitignore.lib) gitignoreSource;
in {
overlays.default = final: prev: let
+
goModHash = "sha256-SLi+nALwCd/Lzn3aljwPqCo2UaM9hl/4OAjcHQLt2Bk=";
appviewDeps = {
inherit htmx-src htmx-ws-src lucide-src inter-fonts-src ibm-plex-mono-src goModHash gitignoreSource;
};
+1
go.mod
···
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/ProtonMail/go-crypto v1.2.0 // indirect
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmatcuk/doublestar/v4 v4.7.1 // indirect
···
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/ProtonMail/go-crypto v1.2.0 // indirect
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
+
github.com/avast/retry-go/v4 v4.6.1 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmatcuk/doublestar/v4 v4.7.1 // indirect
+2
go.sum
···
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
···
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
+
github.com/avast/retry-go/v4 v4.6.1 h1:VkOLRubHdisGrHnTu89g08aQEWEgRU7LVEop3GbIcMk=
+
github.com/avast/retry-go/v4 v4.6.1/go.mod h1:V6oF8njAwxJ5gRo1Q7Cxab24xs5NCWZBeaHHBklR8mA=
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=