[VOL-5486] Fix deprecated versions
Change-Id: I3e03ea246020547ae75fa92ce8cf5cbba7e8f3bb
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/github.com/go-redis/redis/v8/pubsub.go b/vendor/github.com/go-redis/redis/v8/pubsub.go
index c56270b..efc2354 100644
--- a/vendor/github.com/go-redis/redis/v8/pubsub.go
+++ b/vendor/github.com/go-redis/redis/v8/pubsub.go
@@ -2,7 +2,6 @@
import (
"context"
- "errors"
"fmt"
"strings"
"sync"
@@ -13,13 +12,6 @@
"github.com/go-redis/redis/v8/internal/proto"
)
-const (
- pingTimeout = time.Second
- chanSendTimeout = time.Minute
-)
-
-var errPingTimeout = errors.New("redis: ping timeout")
-
// PubSub implements Pub/Sub commands as described in
// http://redis.io/topics/pubsub. Message receiving is NOT safe
// for concurrent use by multiple goroutines.
@@ -43,9 +35,12 @@
cmd *Cmd
chOnce sync.Once
- msgCh chan *Message
- allCh chan interface{}
- ping chan struct{}
+ msgCh *channel
+ allCh *channel
+}
+
+func (c *PubSub) init() {
+ c.exit = make(chan struct{})
}
func (c *PubSub) String() string {
@@ -54,10 +49,6 @@
return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", "))
}
-func (c *PubSub) init() {
- c.exit = make(chan struct{})
-}
-
func (c *PubSub) connWithLock(ctx context.Context) (*pool.Conn, error) {
c.mu.Lock()
cn, err := c.conn(ctx, nil)
@@ -150,7 +141,7 @@
if c.cn != cn {
return
}
- if isBadConn(err, allowTimeout) {
+ if isBadConn(err, allowTimeout, c.opt.Addr) {
c.reconnect(ctx, err)
}
}
@@ -261,13 +252,16 @@
}
cmd := NewCmd(ctx, args...)
- cn, err := c.connWithLock(ctx)
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ cn, err := c.conn(ctx, nil)
if err != nil {
return err
}
err = c.writeCmd(ctx, cn, cmd)
- c.releaseConnWithLock(ctx, cn, err, false)
+ c.releaseConn(ctx, cn, err, false)
return err
}
@@ -370,6 +364,8 @@
c.cmd = NewCmd(ctx)
}
+ // Don't hold the lock to allow subscriptions and pings.
+
cn, err := c.connWithLock(ctx)
if err != nil {
return nil, err
@@ -380,6 +376,7 @@
})
c.releaseConnWithLock(ctx, cn, err, timeout > 0)
+
if err != nil {
return nil, err
}
@@ -418,56 +415,6 @@
}
}
-// Channel returns a Go channel for concurrently receiving messages.
-// The channel is closed together with the PubSub. If the Go channel
-// is blocked full for 30 seconds the message is dropped.
-// Receive* APIs can not be used after channel is created.
-//
-// go-redis periodically sends ping messages to test connection health
-// and re-subscribes if ping can not not received for 30 seconds.
-func (c *PubSub) Channel() <-chan *Message {
- return c.ChannelSize(100)
-}
-
-// ChannelSize is like Channel, but creates a Go channel
-// with specified buffer size.
-func (c *PubSub) ChannelSize(size int) <-chan *Message {
- c.chOnce.Do(func() {
- c.initPing()
- c.initMsgChan(size)
- })
- if c.msgCh == nil {
- err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
- panic(err)
- }
- if cap(c.msgCh) != size {
- err := fmt.Errorf("redis: PubSub.Channel size can not be changed once created")
- panic(err)
- }
- return c.msgCh
-}
-
-// ChannelWithSubscriptions is like Channel, but message type can be either
-// *Subscription or *Message. Subscription messages can be used to detect
-// reconnections.
-//
-// ChannelWithSubscriptions can not be used together with Channel or ChannelSize.
-func (c *PubSub) ChannelWithSubscriptions(ctx context.Context, size int) <-chan interface{} {
- c.chOnce.Do(func() {
- c.initPing()
- c.initAllChan(size)
- })
- if c.allCh == nil {
- err := fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel")
- panic(err)
- }
- if cap(c.allCh) != size {
- err := fmt.Errorf("redis: PubSub.Channel size can not be changed once created")
- panic(err)
- }
- return c.allCh
-}
-
func (c *PubSub) getContext() context.Context {
if c.cmd != nil {
return c.cmd.ctx
@@ -475,36 +422,135 @@
return context.Background()
}
-func (c *PubSub) initPing() {
+//------------------------------------------------------------------------------
+
+// Channel returns a Go channel for concurrently receiving messages.
+// The channel is closed together with the PubSub. If the Go channel
+// is blocked full for 30 seconds the message is dropped.
+// Receive* APIs can not be used after channel is created.
+//
+// go-redis periodically sends ping messages to test connection health
+// and re-subscribes if ping can not not received for 30 seconds.
+func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message {
+ c.chOnce.Do(func() {
+ c.msgCh = newChannel(c, opts...)
+ c.msgCh.initMsgChan()
+ })
+ if c.msgCh == nil {
+ err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
+ panic(err)
+ }
+ return c.msgCh.msgCh
+}
+
+// ChannelSize is like Channel, but creates a Go channel
+// with specified buffer size.
+//
+// Deprecated: use Channel(WithChannelSize(size)), remove in v9.
+func (c *PubSub) ChannelSize(size int) <-chan *Message {
+ return c.Channel(WithChannelSize(size))
+}
+
+// ChannelWithSubscriptions is like Channel, but message type can be either
+// *Subscription or *Message. Subscription messages can be used to detect
+// reconnections.
+//
+// ChannelWithSubscriptions can not be used together with Channel or ChannelSize.
+func (c *PubSub) ChannelWithSubscriptions(_ context.Context, size int) <-chan interface{} {
+ c.chOnce.Do(func() {
+ c.allCh = newChannel(c, WithChannelSize(size))
+ c.allCh.initAllChan()
+ })
+ if c.allCh == nil {
+ err := fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel")
+ panic(err)
+ }
+ return c.allCh.allCh
+}
+
+type ChannelOption func(c *channel)
+
+// WithChannelSize specifies the Go chan size that is used to buffer incoming messages.
+//
+// The default is 100 messages.
+func WithChannelSize(size int) ChannelOption {
+ return func(c *channel) {
+ c.chanSize = size
+ }
+}
+
+// WithChannelHealthCheckInterval specifies the health check interval.
+// PubSub will ping Redis Server if it does not receive any messages within the interval.
+// To disable health check, use zero interval.
+//
+// The default is 3 seconds.
+func WithChannelHealthCheckInterval(d time.Duration) ChannelOption {
+ return func(c *channel) {
+ c.checkInterval = d
+ }
+}
+
+// WithChannelSendTimeout specifies the channel send timeout after which
+// the message is dropped.
+//
+// The default is 60 seconds.
+func WithChannelSendTimeout(d time.Duration) ChannelOption {
+ return func(c *channel) {
+ c.chanSendTimeout = d
+ }
+}
+
+type channel struct {
+ pubSub *PubSub
+
+ msgCh chan *Message
+ allCh chan interface{}
+ ping chan struct{}
+
+ chanSize int
+ chanSendTimeout time.Duration
+ checkInterval time.Duration
+}
+
+func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
+ c := &channel{
+ pubSub: pubSub,
+
+ chanSize: 100,
+ chanSendTimeout: time.Minute,
+ checkInterval: 3 * time.Second,
+ }
+ for _, opt := range opts {
+ opt(c)
+ }
+ if c.checkInterval > 0 {
+ c.initHealthCheck()
+ }
+ return c
+}
+
+func (c *channel) initHealthCheck() {
ctx := context.TODO()
c.ping = make(chan struct{}, 1)
+
go func() {
timer := time.NewTimer(time.Minute)
timer.Stop()
- healthy := true
for {
- timer.Reset(pingTimeout)
+ timer.Reset(c.checkInterval)
select {
case <-c.ping:
- healthy = true
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
- pingErr := c.Ping(ctx)
- if healthy {
- healthy = false
- } else {
- if pingErr == nil {
- pingErr = errPingTimeout
- }
- c.mu.Lock()
- c.reconnect(ctx, pingErr)
- healthy = true
- c.mu.Unlock()
+ if pingErr := c.pubSub.Ping(ctx); pingErr != nil {
+ c.pubSub.mu.Lock()
+ c.pubSub.reconnect(ctx, pingErr)
+ c.pubSub.mu.Unlock()
}
- case <-c.exit:
+ case <-c.pubSub.exit:
return
}
}
@@ -512,16 +558,17 @@
}
// initMsgChan must be in sync with initAllChan.
-func (c *PubSub) initMsgChan(size int) {
+func (c *channel) initMsgChan() {
ctx := context.TODO()
- c.msgCh = make(chan *Message, size)
+ c.msgCh = make(chan *Message, c.chanSize)
+
go func() {
timer := time.NewTimer(time.Minute)
timer.Stop()
var errCount int
for {
- msg, err := c.Receive(ctx)
+ msg, err := c.pubSub.Receive(ctx)
if err != nil {
if err == pool.ErrClosed {
close(c.msgCh)
@@ -548,7 +595,7 @@
case *Pong:
// Ignore.
case *Message:
- timer.Reset(chanSendTimeout)
+ timer.Reset(c.chanSendTimeout)
select {
case c.msgCh <- msg:
if !timer.Stop() {
@@ -556,30 +603,28 @@
}
case <-timer.C:
internal.Logger.Printf(
- c.getContext(),
- "redis: %s channel is full for %s (message is dropped)",
- c,
- chanSendTimeout,
- )
+ ctx, "redis: %s channel is full for %s (message is dropped)",
+ c, c.chanSendTimeout)
}
default:
- internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg)
+ internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
}
}
}()
}
// initAllChan must be in sync with initMsgChan.
-func (c *PubSub) initAllChan(size int) {
+func (c *channel) initAllChan() {
ctx := context.TODO()
- c.allCh = make(chan interface{}, size)
+ c.allCh = make(chan interface{}, c.chanSize)
+
go func() {
- timer := time.NewTimer(pingTimeout)
+ timer := time.NewTimer(time.Minute)
timer.Stop()
var errCount int
for {
- msg, err := c.Receive(ctx)
+ msg, err := c.pubSub.Receive(ctx)
if err != nil {
if err == pool.ErrClosed {
close(c.allCh)
@@ -601,29 +646,23 @@
}
switch msg := msg.(type) {
- case *Subscription:
- c.sendMessage(msg, timer)
case *Pong:
// Ignore.
- case *Message:
- c.sendMessage(msg, timer)
+ case *Subscription, *Message:
+ timer.Reset(c.chanSendTimeout)
+ select {
+ case c.allCh <- msg:
+ if !timer.Stop() {
+ <-timer.C
+ }
+ case <-timer.C:
+ internal.Logger.Printf(
+ ctx, "redis: %s channel is full for %s (message is dropped)",
+ c, c.chanSendTimeout)
+ }
default:
- internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg)
+ internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
}
}
}()
}
-
-func (c *PubSub) sendMessage(msg interface{}, timer *time.Timer) {
- timer.Reset(pingTimeout)
- select {
- case c.allCh <- msg:
- if !timer.Stop() {
- <-timer.C
- }
- case <-timer.C:
- internal.Logger.Printf(
- c.getContext(),
- "redis: %s channel is full for %s (message is dropped)", c, pingTimeout)
- }
-}