[VOL-5486] Upgrade library versions
Change-Id: I8b4e88699e03f44ee13e467867f45ae3f0a63c4b
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/github.com/go-redis/redis/v8/redis.go b/vendor/github.com/go-redis/redis/v8/redis.go
index efad7f1..bcf8a2a 100644
--- a/vendor/github.com/go-redis/redis/v8/redis.go
+++ b/vendor/github.com/go-redis/redis/v8/redis.go
@@ -2,14 +2,14 @@
import (
"context"
+ "errors"
"fmt"
+ "sync/atomic"
"time"
"github.com/go-redis/redis/v8/internal"
"github.com/go-redis/redis/v8/internal/pool"
"github.com/go-redis/redis/v8/internal/proto"
- "go.opentelemetry.io/otel/api/trace"
- "go.opentelemetry.io/otel/label"
)
// Nil reply returned by Redis when key does not exist.
@@ -51,9 +51,7 @@
ctx context.Context, cmd Cmder, fn func(context.Context, Cmder) error,
) error {
if len(hs.hooks) == 0 {
- err := hs.withContext(ctx, func() error {
- return fn(ctx, cmd)
- })
+ err := fn(ctx, cmd)
cmd.SetErr(err)
return err
}
@@ -69,9 +67,7 @@
}
if retErr == nil {
- retErr = hs.withContext(ctx, func() error {
- return fn(ctx, cmd)
- })
+ retErr = fn(ctx, cmd)
cmd.SetErr(retErr)
}
@@ -89,9 +85,7 @@
ctx context.Context, cmds []Cmder, fn func(context.Context, []Cmder) error,
) error {
if len(hs.hooks) == 0 {
- err := hs.withContext(ctx, func() error {
- return fn(ctx, cmds)
- })
+ err := fn(ctx, cmds)
return err
}
@@ -106,9 +100,7 @@
}
if retErr == nil {
- retErr = hs.withContext(ctx, func() error {
- return fn(ctx, cmds)
- })
+ retErr = fn(ctx, cmds)
}
for hookIndex--; hookIndex >= 0; hookIndex-- {
@@ -128,23 +120,6 @@
return hs.processPipeline(ctx, cmds, fn)
}
-func (hs hooks) withContext(ctx context.Context, fn func() error) error {
- done := ctx.Done()
- if done == nil {
- return fn()
- }
-
- errc := make(chan error, 1)
- go func() { errc <- fn() }()
-
- select {
- case <-done:
- return ctx.Err()
- case err := <-errc:
- return err
- }
-}
-
//------------------------------------------------------------------------------
type baseClient struct {
@@ -225,12 +200,9 @@
return cn, nil
}
- err = internal.WithSpan(ctx, "redis.init_conn", func(ctx context.Context, span trace.Span) error {
- return c.initConn(ctx, cn)
- })
- if err != nil {
+ if err := c.initConn(ctx, cn); err != nil {
c.connPool.Remove(ctx, cn, err)
- if err := internal.Unwrap(err); err != nil {
+ if err := errors.Unwrap(err); err != nil {
return nil, err
}
return nil, err
@@ -289,7 +261,7 @@
c.opt.Limiter.ReportResult(err)
}
- if isBadConn(err, false) {
+ if isBadConn(err, false, c.opt.Addr) {
c.connPool.Remove(ctx, cn, err)
} else {
c.connPool.Put(ctx, cn)
@@ -299,25 +271,36 @@
func (c *baseClient) withConn(
ctx context.Context, fn func(context.Context, *pool.Conn) error,
) error {
- return internal.WithSpan(ctx, "redis.with_conn", func(ctx context.Context, span trace.Span) error {
- cn, err := c.getConn(ctx)
- if err != nil {
- return err
- }
+ cn, err := c.getConn(ctx)
+ if err != nil {
+ return err
+ }
- if span.IsRecording() {
- if remoteAddr := cn.RemoteAddr(); remoteAddr != nil {
- span.SetAttributes(label.String("net.peer.ip", remoteAddr.String()))
- }
- }
+ defer func() {
+ c.releaseConn(ctx, cn, err)
+ }()
- defer func() {
- c.releaseConn(ctx, cn, err)
- }()
+ done := ctx.Done() //nolint:ifshort
+ if done == nil {
err = fn(ctx, cn)
return err
- })
+ }
+
+ errc := make(chan error, 1)
+ go func() { errc <- fn(ctx, cn) }()
+
+ select {
+ case <-done:
+ _ = cn.Close()
+ // Wait for the goroutine to finish and send something.
+ <-errc
+
+ err = ctx.Err()
+ return err
+ case err = <-errc:
+ return err
+ }
}
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
@@ -325,45 +308,50 @@
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
attempt := attempt
- var retry bool
- err := internal.WithSpan(ctx, "redis.process", func(ctx context.Context, span trace.Span) error {
- if attempt > 0 {
- if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
- return err
- }
- }
-
- retryTimeout := true
- err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
- err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
- return writeCmd(wr, cmd)
- })
- if err != nil {
- return err
- }
-
- err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)
- if err != nil {
- retryTimeout = cmd.readTimeout() == nil
- return err
- }
-
- return nil
- })
- if err == nil {
- return nil
- }
- retry = shouldRetry(err, retryTimeout)
- return err
- })
+ retry, err := c._process(ctx, cmd, attempt)
if err == nil || !retry {
return err
}
+
lastErr = err
}
return lastErr
}
+func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
+ if attempt > 0 {
+ if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
+ return false, err
+ }
+ }
+
+ retryTimeout := uint32(1)
+ err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
+ err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmd)
+ })
+ if err != nil {
+ return err
+ }
+
+ err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)
+ if err != nil {
+ if cmd.readTimeout() == nil {
+ atomic.StoreUint32(&retryTimeout, 1)
+ }
+ return err
+ }
+
+ return nil
+ })
+ if err == nil {
+ return false, nil
+ }
+
+ retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
+ return retry, err
+}
+
func (c *baseClient) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
}
@@ -722,7 +710,9 @@
hooks // TODO: inherit hooks
}
-// Conn is like Client, but its pool contains single connection.
+// Conn represents a single Redis connection rather than a pool of connections.
+// Prefer running commands from Client unless there is a specific need
+// for a continuous single Redis connection.
type Conn struct {
*conn
ctx context.Context