[VOL-5486] Fix deprecated versions
Change-Id: I3e03ea246020547ae75fa92ce8cf5cbba7e8f3bb
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/github.com/go-redis/redis/v8/cluster.go b/vendor/github.com/go-redis/redis/v8/cluster.go
index a6ce5c5..a54f2f3 100644
--- a/vendor/github.com/go-redis/redis/v8/cluster.go
+++ b/vendor/github.com/go-redis/redis/v8/cluster.go
@@ -68,6 +68,9 @@
ReadTimeout time.Duration
WriteTimeout time.Duration
+ // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
+ PoolFIFO bool
+
// PoolSize applies per cluster node and not for the whole cluster.
PoolSize int
MinIdleConns int
@@ -86,12 +89,12 @@
opt.MaxRedirects = 3
}
- if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
+ if opt.RouteByLatency || opt.RouteRandomly {
opt.ReadOnly = true
}
if opt.PoolSize == 0 {
- opt.PoolSize = 5 * runtime.NumCPU()
+ opt.PoolSize = 5 * runtime.GOMAXPROCS(0)
}
switch opt.ReadTimeout {
@@ -146,6 +149,7 @@
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
+ PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
@@ -153,9 +157,13 @@
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: disableIdleCheck,
- readOnly: opt.ReadOnly,
-
TLSConfig: opt.TLSConfig,
+ // If ClusterSlots is populated, then we probably have an artificial
+ // cluster whose nodes are not in clustering mode (otherwise there isn't
+ // much use for ClusterSlots config). This means we cannot execute the
+ // READONLY command against that node -- setting readOnly to false in such
+ // situations in the options below will prevent that from happening.
+ readOnly: opt.ReadOnly && opt.ClusterSlots == nil,
}
}
@@ -291,8 +299,9 @@
func (c *clusterNodes) Addrs() ([]string, error) {
var addrs []string
+
c.mu.RLock()
- closed := c.closed
+ closed := c.closed //nolint:ifshort
if !closed {
if len(c.activeAddrs) > 0 {
addrs = c.activeAddrs
@@ -343,7 +352,7 @@
}
}
-func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
+func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
node, err := c.get(addr)
if err != nil {
return nil, err
@@ -407,7 +416,7 @@
}
n := rand.Intn(len(addrs))
- return c.Get(addrs[n])
+ return c.GetOrCreate(addrs[n])
}
//------------------------------------------------------------------------------
@@ -465,7 +474,7 @@
addr = replaceLoopbackHost(addr, originHost)
}
- node, err := c.nodes.Get(addr)
+ node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return nil, err
}
@@ -586,8 +595,16 @@
if len(nodes) == 0 {
return c.nodes.Random()
}
- n := rand.Intn(len(nodes))
- return nodes[n], nil
+ if len(nodes) == 1 {
+ return nodes[0], nil
+ }
+ randomNodes := rand.Perm(len(nodes))
+ for _, idx := range randomNodes {
+ if node := nodes[idx]; !node.Failing() {
+ return node, nil
+ }
+ }
+ return nodes[randomNodes[0]], nil
}
func (c *clusterState) slotNodes(slot int) []*clusterNode {
@@ -628,14 +645,14 @@
return state, nil
}
-func (c *clusterStateHolder) LazyReload(ctx context.Context) {
+func (c *clusterStateHolder) LazyReload() {
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
return
}
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
- _, err := c.Reload(ctx)
+ _, err := c.Reload(context.Background())
if err != nil {
return
}
@@ -645,14 +662,15 @@
func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) {
v := c.state.Load()
- if v != nil {
- state := v.(*clusterState)
- if time.Since(state.createdAt) > 10*time.Second {
- c.LazyReload(ctx)
- }
- return state, nil
+ if v == nil {
+ return c.Reload(ctx)
}
- return c.Reload(ctx)
+
+ state := v.(*clusterState)
+ if time.Since(state.createdAt) > 10*time.Second {
+ c.LazyReload()
+ }
+ return state, nil
}
func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, error) {
@@ -728,7 +746,7 @@
// ReloadState reloads cluster state. If available it calls ClusterSlots func
// to get cluster slots information.
func (c *ClusterClient) ReloadState(ctx context.Context) {
- c.state.LazyReload(ctx)
+ c.state.LazyReload()
}
// Close closes the cluster client, releasing any open resources.
@@ -789,7 +807,7 @@
}
if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
if isReadOnly {
- c.state.LazyReload(ctx)
+ c.state.LazyReload()
}
node = nil
continue
@@ -806,8 +824,10 @@
var addr string
moved, ask, addr = isMovedError(lastErr)
if moved || ask {
+ c.state.LazyReload()
+
var err error
- node, err = c.nodes.Get(addr)
+ node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
@@ -1004,7 +1024,7 @@
for _, idx := range rand.Perm(len(addrs)) {
addr := addrs[idx]
- node, err := c.nodes.Get(addr)
+ node, err := c.nodes.GetOrCreate(addr)
if err != nil {
if firstErr == nil {
firstErr = err
@@ -1218,13 +1238,13 @@
return false
}
- node, err := c.nodes.Get(addr)
+ node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return false
}
if moved {
- c.state.LazyReload(ctx)
+ c.state.LazyReload()
failedCmds.Add(node, cmd)
return true
}
@@ -1252,10 +1272,13 @@
}
func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
- return c.hooks.processPipeline(ctx, cmds, c._processTxPipeline)
+ return c.hooks.processTxPipeline(ctx, cmds, c._processTxPipeline)
}
func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error {
+ // Trim multi .. exec.
+ cmds = cmds[1 : len(cmds)-1]
+
state, err := c.state.Get(ctx)
if err != nil {
setCmdsErr(cmds, err)
@@ -1291,6 +1314,7 @@
if err == nil {
return
}
+
if attempt < c.opt.MaxRedirects {
if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
setCmdsErr(cmds, err)
@@ -1400,13 +1424,13 @@
addr string,
failedCmds *cmdsMap,
) error {
- node, err := c.nodes.Get(addr)
+ node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
if moved {
- c.state.LazyReload(ctx)
+ c.state.LazyReload()
for _, cmd := range cmds {
failedCmds.Add(node, cmd)
}
@@ -1455,7 +1479,7 @@
moved, ask, addr := isMovedError(err)
if moved || ask {
- node, err = c.nodes.Get(addr)
+ node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
@@ -1464,7 +1488,7 @@
if isReadOnly := isReadOnlyError(err); isReadOnly || err == pool.ErrClosed {
if isReadOnly {
- c.state.LazyReload(ctx)
+ c.state.LazyReload()
}
node, err = c.slotMasterNode(ctx, slot)
if err != nil {
@@ -1567,7 +1591,7 @@
for _, idx := range perm {
addr := addrs[idx]
- node, err := c.nodes.Get(addr)
+ node, err := c.nodes.GetOrCreate(addr)
if err != nil {
if firstErr == nil {
firstErr = err
@@ -1631,7 +1655,7 @@
return nil, err
}
- if (c.opt.RouteByLatency || c.opt.RouteRandomly) && cmdInfo != nil && cmdInfo.ReadOnly {
+ if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
return c.slotReadOnlyNode(state, slot)
}
return state.slotMasterNode(slot)
@@ -1655,6 +1679,35 @@
return state.slotMasterNode(slot)
}
+// SlaveForKey gets a client for a replica node to run any command on it.
+// This is especially useful if we want to run a particular lua script which has
+// only read only commands on the replica.
+// This is because other redis commands generally have a flag that points that
+// they are read only and automatically run on the replica nodes
+// if ClusterOptions.ReadOnly flag is set to true.
+func (c *ClusterClient) SlaveForKey(ctx context.Context, key string) (*Client, error) {
+ state, err := c.state.Get(ctx)
+ if err != nil {
+ return nil, err
+ }
+ slot := hashtag.Slot(key)
+ node, err := c.slotReadOnlyNode(state, slot)
+ if err != nil {
+ return nil, err
+ }
+ return node.Client, err
+}
+
+// MasterForKey return a client to the master node for a particular key.
+func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client, error) {
+ slot := hashtag.Slot(key)
+ node, err := c.slotMasterNode(ctx, slot)
+ if err != nil {
+ return nil, err
+ }
+ return node.Client, err
+}
+
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
for _, n := range nodes {
if n == node {