[VOL-5486] Fix deprecated versions

Change-Id: I3e03ea246020547ae75fa92ce8cf5cbba7e8f3bb
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/github.com/go-redis/redis/v8/pubsub.go b/vendor/github.com/go-redis/redis/v8/pubsub.go
index c56270b..efc2354 100644
--- a/vendor/github.com/go-redis/redis/v8/pubsub.go
+++ b/vendor/github.com/go-redis/redis/v8/pubsub.go
@@ -2,7 +2,6 @@
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"strings"
 	"sync"
@@ -13,13 +12,6 @@
 	"github.com/go-redis/redis/v8/internal/proto"
 )
 
-const (
-	pingTimeout     = time.Second
-	chanSendTimeout = time.Minute
-)
-
-var errPingTimeout = errors.New("redis: ping timeout")
-
 // PubSub implements Pub/Sub commands as described in
 // http://redis.io/topics/pubsub. Message receiving is NOT safe
 // for concurrent use by multiple goroutines.
@@ -43,9 +35,12 @@
 	cmd *Cmd
 
 	chOnce sync.Once
-	msgCh  chan *Message
-	allCh  chan interface{}
-	ping   chan struct{}
+	msgCh  *channel
+	allCh  *channel
+}
+
+func (c *PubSub) init() {
+	c.exit = make(chan struct{})
 }
 
 func (c *PubSub) String() string {
@@ -54,10 +49,6 @@
 	return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", "))
 }
 
-func (c *PubSub) init() {
-	c.exit = make(chan struct{})
-}
-
 func (c *PubSub) connWithLock(ctx context.Context) (*pool.Conn, error) {
 	c.mu.Lock()
 	cn, err := c.conn(ctx, nil)
@@ -150,7 +141,7 @@
 	if c.cn != cn {
 		return
 	}
-	if isBadConn(err, allowTimeout) {
+	if isBadConn(err, allowTimeout, c.opt.Addr) {
 		c.reconnect(ctx, err)
 	}
 }
@@ -261,13 +252,16 @@
 	}
 	cmd := NewCmd(ctx, args...)
 
-	cn, err := c.connWithLock(ctx)
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	cn, err := c.conn(ctx, nil)
 	if err != nil {
 		return err
 	}
 
 	err = c.writeCmd(ctx, cn, cmd)
-	c.releaseConnWithLock(ctx, cn, err, false)
+	c.releaseConn(ctx, cn, err, false)
 	return err
 }
 
@@ -370,6 +364,8 @@
 		c.cmd = NewCmd(ctx)
 	}
 
+	// Don't hold the lock to allow subscriptions and pings.
+
 	cn, err := c.connWithLock(ctx)
 	if err != nil {
 		return nil, err
@@ -380,6 +376,7 @@
 	})
 
 	c.releaseConnWithLock(ctx, cn, err, timeout > 0)
+
 	if err != nil {
 		return nil, err
 	}
@@ -418,56 +415,6 @@
 	}
 }
 
-// Channel returns a Go channel for concurrently receiving messages.
-// The channel is closed together with the PubSub. If the Go channel
-// is blocked full for 30 seconds the message is dropped.
-// Receive* APIs can not be used after channel is created.
-//
-// go-redis periodically sends ping messages to test connection health
-// and re-subscribes if ping can not not received for 30 seconds.
-func (c *PubSub) Channel() <-chan *Message {
-	return c.ChannelSize(100)
-}
-
-// ChannelSize is like Channel, but creates a Go channel
-// with specified buffer size.
-func (c *PubSub) ChannelSize(size int) <-chan *Message {
-	c.chOnce.Do(func() {
-		c.initPing()
-		c.initMsgChan(size)
-	})
-	if c.msgCh == nil {
-		err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
-		panic(err)
-	}
-	if cap(c.msgCh) != size {
-		err := fmt.Errorf("redis: PubSub.Channel size can not be changed once created")
-		panic(err)
-	}
-	return c.msgCh
-}
-
-// ChannelWithSubscriptions is like Channel, but message type can be either
-// *Subscription or *Message. Subscription messages can be used to detect
-// reconnections.
-//
-// ChannelWithSubscriptions can not be used together with Channel or ChannelSize.
-func (c *PubSub) ChannelWithSubscriptions(ctx context.Context, size int) <-chan interface{} {
-	c.chOnce.Do(func() {
-		c.initPing()
-		c.initAllChan(size)
-	})
-	if c.allCh == nil {
-		err := fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel")
-		panic(err)
-	}
-	if cap(c.allCh) != size {
-		err := fmt.Errorf("redis: PubSub.Channel size can not be changed once created")
-		panic(err)
-	}
-	return c.allCh
-}
-
 func (c *PubSub) getContext() context.Context {
 	if c.cmd != nil {
 		return c.cmd.ctx
@@ -475,36 +422,135 @@
 	return context.Background()
 }
 
-func (c *PubSub) initPing() {
+//------------------------------------------------------------------------------
+
+// Channel returns a Go channel for concurrently receiving messages.
+// The channel is closed together with the PubSub. If the Go channel
+// is blocked full for 30 seconds the message is dropped.
+// Receive* APIs can not be used after channel is created.
+//
+// go-redis periodically sends ping messages to test connection health
+// and re-subscribes if ping can not not received for 30 seconds.
+func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message {
+	c.chOnce.Do(func() {
+		c.msgCh = newChannel(c, opts...)
+		c.msgCh.initMsgChan()
+	})
+	if c.msgCh == nil {
+		err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
+		panic(err)
+	}
+	return c.msgCh.msgCh
+}
+
+// ChannelSize is like Channel, but creates a Go channel
+// with specified buffer size.
+//
+// Deprecated: use Channel(WithChannelSize(size)), remove in v9.
+func (c *PubSub) ChannelSize(size int) <-chan *Message {
+	return c.Channel(WithChannelSize(size))
+}
+
+// ChannelWithSubscriptions is like Channel, but message type can be either
+// *Subscription or *Message. Subscription messages can be used to detect
+// reconnections.
+//
+// ChannelWithSubscriptions can not be used together with Channel or ChannelSize.
+func (c *PubSub) ChannelWithSubscriptions(_ context.Context, size int) <-chan interface{} {
+	c.chOnce.Do(func() {
+		c.allCh = newChannel(c, WithChannelSize(size))
+		c.allCh.initAllChan()
+	})
+	if c.allCh == nil {
+		err := fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel")
+		panic(err)
+	}
+	return c.allCh.allCh
+}
+
+type ChannelOption func(c *channel)
+
+// WithChannelSize specifies the Go chan size that is used to buffer incoming messages.
+//
+// The default is 100 messages.
+func WithChannelSize(size int) ChannelOption {
+	return func(c *channel) {
+		c.chanSize = size
+	}
+}
+
+// WithChannelHealthCheckInterval specifies the health check interval.
+// PubSub will ping Redis Server if it does not receive any messages within the interval.
+// To disable health check, use zero interval.
+//
+// The default is 3 seconds.
+func WithChannelHealthCheckInterval(d time.Duration) ChannelOption {
+	return func(c *channel) {
+		c.checkInterval = d
+	}
+}
+
+// WithChannelSendTimeout specifies the channel send timeout after which
+// the message is dropped.
+//
+// The default is 60 seconds.
+func WithChannelSendTimeout(d time.Duration) ChannelOption {
+	return func(c *channel) {
+		c.chanSendTimeout = d
+	}
+}
+
+type channel struct {
+	pubSub *PubSub
+
+	msgCh chan *Message
+	allCh chan interface{}
+	ping  chan struct{}
+
+	chanSize        int
+	chanSendTimeout time.Duration
+	checkInterval   time.Duration
+}
+
+func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
+	c := &channel{
+		pubSub: pubSub,
+
+		chanSize:        100,
+		chanSendTimeout: time.Minute,
+		checkInterval:   3 * time.Second,
+	}
+	for _, opt := range opts {
+		opt(c)
+	}
+	if c.checkInterval > 0 {
+		c.initHealthCheck()
+	}
+	return c
+}
+
+func (c *channel) initHealthCheck() {
 	ctx := context.TODO()
 	c.ping = make(chan struct{}, 1)
+
 	go func() {
 		timer := time.NewTimer(time.Minute)
 		timer.Stop()
 
-		healthy := true
 		for {
-			timer.Reset(pingTimeout)
+			timer.Reset(c.checkInterval)
 			select {
 			case <-c.ping:
-				healthy = true
 				if !timer.Stop() {
 					<-timer.C
 				}
 			case <-timer.C:
-				pingErr := c.Ping(ctx)
-				if healthy {
-					healthy = false
-				} else {
-					if pingErr == nil {
-						pingErr = errPingTimeout
-					}
-					c.mu.Lock()
-					c.reconnect(ctx, pingErr)
-					healthy = true
-					c.mu.Unlock()
+				if pingErr := c.pubSub.Ping(ctx); pingErr != nil {
+					c.pubSub.mu.Lock()
+					c.pubSub.reconnect(ctx, pingErr)
+					c.pubSub.mu.Unlock()
 				}
-			case <-c.exit:
+			case <-c.pubSub.exit:
 				return
 			}
 		}
@@ -512,16 +558,17 @@
 }
 
 // initMsgChan must be in sync with initAllChan.
-func (c *PubSub) initMsgChan(size int) {
+func (c *channel) initMsgChan() {
 	ctx := context.TODO()
-	c.msgCh = make(chan *Message, size)
+	c.msgCh = make(chan *Message, c.chanSize)
+
 	go func() {
 		timer := time.NewTimer(time.Minute)
 		timer.Stop()
 
 		var errCount int
 		for {
-			msg, err := c.Receive(ctx)
+			msg, err := c.pubSub.Receive(ctx)
 			if err != nil {
 				if err == pool.ErrClosed {
 					close(c.msgCh)
@@ -548,7 +595,7 @@
 			case *Pong:
 				// Ignore.
 			case *Message:
-				timer.Reset(chanSendTimeout)
+				timer.Reset(c.chanSendTimeout)
 				select {
 				case c.msgCh <- msg:
 					if !timer.Stop() {
@@ -556,30 +603,28 @@
 					}
 				case <-timer.C:
 					internal.Logger.Printf(
-						c.getContext(),
-						"redis: %s channel is full for %s (message is dropped)",
-						c,
-						chanSendTimeout,
-					)
+						ctx, "redis: %s channel is full for %s (message is dropped)",
+						c, c.chanSendTimeout)
 				}
 			default:
-				internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg)
+				internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
 			}
 		}
 	}()
 }
 
 // initAllChan must be in sync with initMsgChan.
-func (c *PubSub) initAllChan(size int) {
+func (c *channel) initAllChan() {
 	ctx := context.TODO()
-	c.allCh = make(chan interface{}, size)
+	c.allCh = make(chan interface{}, c.chanSize)
+
 	go func() {
-		timer := time.NewTimer(pingTimeout)
+		timer := time.NewTimer(time.Minute)
 		timer.Stop()
 
 		var errCount int
 		for {
-			msg, err := c.Receive(ctx)
+			msg, err := c.pubSub.Receive(ctx)
 			if err != nil {
 				if err == pool.ErrClosed {
 					close(c.allCh)
@@ -601,29 +646,23 @@
 			}
 
 			switch msg := msg.(type) {
-			case *Subscription:
-				c.sendMessage(msg, timer)
 			case *Pong:
 				// Ignore.
-			case *Message:
-				c.sendMessage(msg, timer)
+			case *Subscription, *Message:
+				timer.Reset(c.chanSendTimeout)
+				select {
+				case c.allCh <- msg:
+					if !timer.Stop() {
+						<-timer.C
+					}
+				case <-timer.C:
+					internal.Logger.Printf(
+						ctx, "redis: %s channel is full for %s (message is dropped)",
+						c, c.chanSendTimeout)
+				}
 			default:
-				internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg)
+				internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
 			}
 		}
 	}()
 }
-
-func (c *PubSub) sendMessage(msg interface{}, timer *time.Timer) {
-	timer.Reset(pingTimeout)
-	select {
-	case c.allCh <- msg:
-		if !timer.Stop() {
-			<-timer.C
-		}
-	case <-timer.C:
-		internal.Logger.Printf(
-			c.getContext(),
-			"redis: %s channel is full for %s (message is dropped)", c, pingTimeout)
-	}
-}