blob: ec6221dc83d4279ed92644209ca9fbfc92a77ef0 [file] [log] [blame]
Joey Armstrong5f51f2e2023-01-17 17:06:26 -05001package redis
2
3import (
4 "context"
5 "crypto/tls"
6 "errors"
7 "net"
8 "strings"
9 "sync"
10 "time"
11
12 "github.com/go-redis/redis/v8/internal"
13 "github.com/go-redis/redis/v8/internal/pool"
14 "github.com/go-redis/redis/v8/internal/rand"
15)
16
17//------------------------------------------------------------------------------
18
19// FailoverOptions are used to configure a failover client and should
20// be passed to NewFailoverClient.
21type FailoverOptions struct {
22 // The master name.
23 MasterName string
24 // A seed list of host:port addresses of sentinel nodes.
25 SentinelAddrs []string
Abhay Kumara2ae5992025-11-10 14:02:24 +000026
27 // If specified with SentinelPassword, enables ACL-based authentication (via
28 // AUTH <user> <pass>).
29 SentinelUsername string
30 // Sentinel password from "requirepass <password>" (if enabled) in Sentinel
31 // configuration, or, if SentinelUsername is also supplied, used for ACL-based
32 // authentication.
Joey Armstrong5f51f2e2023-01-17 17:06:26 -050033 SentinelPassword string
34
35 // Allows routing read-only commands to the closest master or slave node.
36 // This option only works with NewFailoverClusterClient.
37 RouteByLatency bool
38 // Allows routing read-only commands to the random master or slave node.
39 // This option only works with NewFailoverClusterClient.
40 RouteRandomly bool
41
42 // Route all commands to slave read-only nodes.
43 SlaveOnly bool
44
Abhay Kumara2ae5992025-11-10 14:02:24 +000045 // Use slaves disconnected with master when cannot get connected slaves
46 // Now, this option only works in RandomSlaveAddr function.
47 UseDisconnectedSlaves bool
48
Joey Armstrong5f51f2e2023-01-17 17:06:26 -050049 // Following options are copied from Options struct.
50
51 Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
52 OnConnect func(ctx context.Context, cn *Conn) error
53
54 Username string
55 Password string
56 DB int
57
58 MaxRetries int
59 MinRetryBackoff time.Duration
60 MaxRetryBackoff time.Duration
61
62 DialTimeout time.Duration
63 ReadTimeout time.Duration
64 WriteTimeout time.Duration
65
Abhay Kumara2ae5992025-11-10 14:02:24 +000066 // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
67 PoolFIFO bool
68
Joey Armstrong5f51f2e2023-01-17 17:06:26 -050069 PoolSize int
70 MinIdleConns int
71 MaxConnAge time.Duration
72 PoolTimeout time.Duration
73 IdleTimeout time.Duration
74 IdleCheckFrequency time.Duration
75
76 TLSConfig *tls.Config
77}
78
79func (opt *FailoverOptions) clientOptions() *Options {
80 return &Options{
81 Addr: "FailoverClient",
82
83 Dialer: opt.Dialer,
84 OnConnect: opt.OnConnect,
85
86 DB: opt.DB,
87 Username: opt.Username,
88 Password: opt.Password,
89
90 MaxRetries: opt.MaxRetries,
91 MinRetryBackoff: opt.MinRetryBackoff,
92 MaxRetryBackoff: opt.MaxRetryBackoff,
93
94 DialTimeout: opt.DialTimeout,
95 ReadTimeout: opt.ReadTimeout,
96 WriteTimeout: opt.WriteTimeout,
97
Abhay Kumara2ae5992025-11-10 14:02:24 +000098 PoolFIFO: opt.PoolFIFO,
Joey Armstrong5f51f2e2023-01-17 17:06:26 -050099 PoolSize: opt.PoolSize,
100 PoolTimeout: opt.PoolTimeout,
101 IdleTimeout: opt.IdleTimeout,
102 IdleCheckFrequency: opt.IdleCheckFrequency,
103 MinIdleConns: opt.MinIdleConns,
104 MaxConnAge: opt.MaxConnAge,
105
106 TLSConfig: opt.TLSConfig,
107 }
108}
109
110func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
111 return &Options{
112 Addr: addr,
113
114 Dialer: opt.Dialer,
115 OnConnect: opt.OnConnect,
116
117 DB: 0,
Abhay Kumara2ae5992025-11-10 14:02:24 +0000118 Username: opt.SentinelUsername,
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500119 Password: opt.SentinelPassword,
120
121 MaxRetries: opt.MaxRetries,
122 MinRetryBackoff: opt.MinRetryBackoff,
123 MaxRetryBackoff: opt.MaxRetryBackoff,
124
125 DialTimeout: opt.DialTimeout,
126 ReadTimeout: opt.ReadTimeout,
127 WriteTimeout: opt.WriteTimeout,
128
Abhay Kumara2ae5992025-11-10 14:02:24 +0000129 PoolFIFO: opt.PoolFIFO,
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500130 PoolSize: opt.PoolSize,
131 PoolTimeout: opt.PoolTimeout,
132 IdleTimeout: opt.IdleTimeout,
133 IdleCheckFrequency: opt.IdleCheckFrequency,
134 MinIdleConns: opt.MinIdleConns,
135 MaxConnAge: opt.MaxConnAge,
136
137 TLSConfig: opt.TLSConfig,
138 }
139}
140
141func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
142 return &ClusterOptions{
143 Dialer: opt.Dialer,
144 OnConnect: opt.OnConnect,
145
146 Username: opt.Username,
147 Password: opt.Password,
148
149 MaxRedirects: opt.MaxRetries,
150
151 RouteByLatency: opt.RouteByLatency,
152 RouteRandomly: opt.RouteRandomly,
153
154 MinRetryBackoff: opt.MinRetryBackoff,
155 MaxRetryBackoff: opt.MaxRetryBackoff,
156
157 DialTimeout: opt.DialTimeout,
158 ReadTimeout: opt.ReadTimeout,
159 WriteTimeout: opt.WriteTimeout,
160
Abhay Kumara2ae5992025-11-10 14:02:24 +0000161 PoolFIFO: opt.PoolFIFO,
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500162 PoolSize: opt.PoolSize,
163 PoolTimeout: opt.PoolTimeout,
164 IdleTimeout: opt.IdleTimeout,
165 IdleCheckFrequency: opt.IdleCheckFrequency,
166 MinIdleConns: opt.MinIdleConns,
167 MaxConnAge: opt.MaxConnAge,
168
169 TLSConfig: opt.TLSConfig,
170 }
171}
172
173// NewFailoverClient returns a Redis client that uses Redis Sentinel
174// for automatic failover. It's safe for concurrent use by multiple
175// goroutines.
176func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
177 if failoverOpt.RouteByLatency {
178 panic("to route commands by latency, use NewFailoverClusterClient")
179 }
180 if failoverOpt.RouteRandomly {
181 panic("to route commands randomly, use NewFailoverClusterClient")
182 }
183
184 sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
185 copy(sentinelAddrs, failoverOpt.SentinelAddrs)
186
Abhay Kumara2ae5992025-11-10 14:02:24 +0000187 rand.Shuffle(len(sentinelAddrs), func(i, j int) {
188 sentinelAddrs[i], sentinelAddrs[j] = sentinelAddrs[j], sentinelAddrs[i]
189 })
190
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500191 failover := &sentinelFailover{
192 opt: failoverOpt,
193 sentinelAddrs: sentinelAddrs,
194 }
195
196 opt := failoverOpt.clientOptions()
197 opt.Dialer = masterSlaveDialer(failover)
198 opt.init()
199
200 connPool := newConnPool(opt)
Abhay Kumara2ae5992025-11-10 14:02:24 +0000201
202 failover.mu.Lock()
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500203 failover.onFailover = func(ctx context.Context, addr string) {
204 _ = connPool.Filter(func(cn *pool.Conn) bool {
205 return cn.RemoteAddr().String() != addr
206 })
207 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000208 failover.mu.Unlock()
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500209
210 c := Client{
211 baseClient: newBaseClient(opt, connPool),
212 ctx: context.Background(),
213 }
214 c.cmdable = c.Process
215 c.onClose = failover.Close
216
217 return &c
218}
219
220func masterSlaveDialer(
221 failover *sentinelFailover,
222) func(ctx context.Context, network, addr string) (net.Conn, error) {
223 return func(ctx context.Context, network, _ string) (net.Conn, error) {
224 var addr string
225 var err error
226
227 if failover.opt.SlaveOnly {
228 addr, err = failover.RandomSlaveAddr(ctx)
229 } else {
230 addr, err = failover.MasterAddr(ctx)
231 if err == nil {
232 failover.trySwitchMaster(ctx, addr)
233 }
234 }
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500235 if err != nil {
236 return nil, err
237 }
238 if failover.opt.Dialer != nil {
239 return failover.opt.Dialer(ctx, network, addr)
240 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000241
242 netDialer := &net.Dialer{
243 Timeout: failover.opt.DialTimeout,
244 KeepAlive: 5 * time.Minute,
245 }
246 if failover.opt.TLSConfig == nil {
247 return netDialer.DialContext(ctx, network, addr)
248 }
249 return tls.DialWithDialer(netDialer, network, addr, failover.opt.TLSConfig)
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500250 }
251}
252
253//------------------------------------------------------------------------------
254
255// SentinelClient is a client for a Redis Sentinel.
256type SentinelClient struct {
257 *baseClient
258 hooks
259 ctx context.Context
260}
261
262func NewSentinelClient(opt *Options) *SentinelClient {
263 opt.init()
264 c := &SentinelClient{
265 baseClient: &baseClient{
266 opt: opt,
267 connPool: newConnPool(opt),
268 },
269 ctx: context.Background(),
270 }
271 return c
272}
273
274func (c *SentinelClient) Context() context.Context {
275 return c.ctx
276}
277
278func (c *SentinelClient) WithContext(ctx context.Context) *SentinelClient {
279 if ctx == nil {
280 panic("nil context")
281 }
282 clone := *c
283 clone.ctx = ctx
284 return &clone
285}
286
287func (c *SentinelClient) Process(ctx context.Context, cmd Cmder) error {
288 return c.hooks.process(ctx, cmd, c.baseClient.process)
289}
290
291func (c *SentinelClient) pubSub() *PubSub {
292 pubsub := &PubSub{
293 opt: c.opt,
294
295 newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
296 return c.newConn(ctx)
297 },
298 closeConn: c.connPool.CloseConn,
299 }
300 pubsub.init()
301 return pubsub
302}
303
304// Ping is used to test if a connection is still alive, or to
305// measure latency.
306func (c *SentinelClient) Ping(ctx context.Context) *StringCmd {
307 cmd := NewStringCmd(ctx, "ping")
308 _ = c.Process(ctx, cmd)
309 return cmd
310}
311
312// Subscribe subscribes the client to the specified channels.
313// Channels can be omitted to create empty subscription.
314func (c *SentinelClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
315 pubsub := c.pubSub()
316 if len(channels) > 0 {
317 _ = pubsub.Subscribe(ctx, channels...)
318 }
319 return pubsub
320}
321
322// PSubscribe subscribes the client to the given patterns.
323// Patterns can be omitted to create empty subscription.
324func (c *SentinelClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
325 pubsub := c.pubSub()
326 if len(channels) > 0 {
327 _ = pubsub.PSubscribe(ctx, channels...)
328 }
329 return pubsub
330}
331
332func (c *SentinelClient) GetMasterAddrByName(ctx context.Context, name string) *StringSliceCmd {
333 cmd := NewStringSliceCmd(ctx, "sentinel", "get-master-addr-by-name", name)
334 _ = c.Process(ctx, cmd)
335 return cmd
336}
337
338func (c *SentinelClient) Sentinels(ctx context.Context, name string) *SliceCmd {
339 cmd := NewSliceCmd(ctx, "sentinel", "sentinels", name)
340 _ = c.Process(ctx, cmd)
341 return cmd
342}
343
344// Failover forces a failover as if the master was not reachable, and without
345// asking for agreement to other Sentinels.
346func (c *SentinelClient) Failover(ctx context.Context, name string) *StatusCmd {
347 cmd := NewStatusCmd(ctx, "sentinel", "failover", name)
348 _ = c.Process(ctx, cmd)
349 return cmd
350}
351
352// Reset resets all the masters with matching name. The pattern argument is a
353// glob-style pattern. The reset process clears any previous state in a master
354// (including a failover in progress), and removes every slave and sentinel
355// already discovered and associated with the master.
356func (c *SentinelClient) Reset(ctx context.Context, pattern string) *IntCmd {
357 cmd := NewIntCmd(ctx, "sentinel", "reset", pattern)
358 _ = c.Process(ctx, cmd)
359 return cmd
360}
361
362// FlushConfig forces Sentinel to rewrite its configuration on disk, including
363// the current Sentinel state.
364func (c *SentinelClient) FlushConfig(ctx context.Context) *StatusCmd {
365 cmd := NewStatusCmd(ctx, "sentinel", "flushconfig")
366 _ = c.Process(ctx, cmd)
367 return cmd
368}
369
370// Master shows the state and info of the specified master.
371func (c *SentinelClient) Master(ctx context.Context, name string) *StringStringMapCmd {
372 cmd := NewStringStringMapCmd(ctx, "sentinel", "master", name)
373 _ = c.Process(ctx, cmd)
374 return cmd
375}
376
377// Masters shows a list of monitored masters and their state.
378func (c *SentinelClient) Masters(ctx context.Context) *SliceCmd {
379 cmd := NewSliceCmd(ctx, "sentinel", "masters")
380 _ = c.Process(ctx, cmd)
381 return cmd
382}
383
384// Slaves shows a list of slaves for the specified master and their state.
385func (c *SentinelClient) Slaves(ctx context.Context, name string) *SliceCmd {
386 cmd := NewSliceCmd(ctx, "sentinel", "slaves", name)
387 _ = c.Process(ctx, cmd)
388 return cmd
389}
390
391// CkQuorum checks if the current Sentinel configuration is able to reach the
392// quorum needed to failover a master, and the majority needed to authorize the
393// failover. This command should be used in monitoring systems to check if a
394// Sentinel deployment is ok.
395func (c *SentinelClient) CkQuorum(ctx context.Context, name string) *StringCmd {
396 cmd := NewStringCmd(ctx, "sentinel", "ckquorum", name)
397 _ = c.Process(ctx, cmd)
398 return cmd
399}
400
401// Monitor tells the Sentinel to start monitoring a new master with the specified
402// name, ip, port, and quorum.
403func (c *SentinelClient) Monitor(ctx context.Context, name, ip, port, quorum string) *StringCmd {
404 cmd := NewStringCmd(ctx, "sentinel", "monitor", name, ip, port, quorum)
405 _ = c.Process(ctx, cmd)
406 return cmd
407}
408
409// Set is used in order to change configuration parameters of a specific master.
410func (c *SentinelClient) Set(ctx context.Context, name, option, value string) *StringCmd {
411 cmd := NewStringCmd(ctx, "sentinel", "set", name, option, value)
412 _ = c.Process(ctx, cmd)
413 return cmd
414}
415
416// Remove is used in order to remove the specified master: the master will no
417// longer be monitored, and will totally be removed from the internal state of
418// the Sentinel.
419func (c *SentinelClient) Remove(ctx context.Context, name string) *StringCmd {
420 cmd := NewStringCmd(ctx, "sentinel", "remove", name)
421 _ = c.Process(ctx, cmd)
422 return cmd
423}
424
425//------------------------------------------------------------------------------
426
427type sentinelFailover struct {
428 opt *FailoverOptions
429
430 sentinelAddrs []string
431
432 onFailover func(ctx context.Context, addr string)
433 onUpdate func(ctx context.Context)
434
435 mu sync.RWMutex
436 _masterAddr string
437 sentinel *SentinelClient
438 pubsub *PubSub
439}
440
441func (c *sentinelFailover) Close() error {
442 c.mu.Lock()
443 defer c.mu.Unlock()
444 if c.sentinel != nil {
445 return c.closeSentinel()
446 }
447 return nil
448}
449
450func (c *sentinelFailover) closeSentinel() error {
451 firstErr := c.pubsub.Close()
452 c.pubsub = nil
453
454 err := c.sentinel.Close()
455 if err != nil && firstErr == nil {
456 firstErr = err
457 }
458 c.sentinel = nil
459
460 return firstErr
461}
462
463func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000464 if c.opt == nil {
465 return "", errors.New("opt is nil")
466 }
467
468 addresses, err := c.slaveAddrs(ctx, false)
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500469 if err != nil {
470 return "", err
471 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000472
473 if len(addresses) == 0 && c.opt.UseDisconnectedSlaves {
474 addresses, err = c.slaveAddrs(ctx, true)
475 if err != nil {
476 return "", err
477 }
478 }
479
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500480 if len(addresses) == 0 {
481 return c.MasterAddr(ctx)
482 }
483 return addresses[rand.Intn(len(addresses))], nil
484}
485
486func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
487 c.mu.RLock()
488 sentinel := c.sentinel
489 c.mu.RUnlock()
490
491 if sentinel != nil {
492 addr := c.getMasterAddr(ctx, sentinel)
493 if addr != "" {
494 return addr, nil
495 }
496 }
497
498 c.mu.Lock()
499 defer c.mu.Unlock()
500
501 if c.sentinel != nil {
502 addr := c.getMasterAddr(ctx, c.sentinel)
503 if addr != "" {
504 return addr, nil
505 }
506 _ = c.closeSentinel()
507 }
508
509 for i, sentinelAddr := range c.sentinelAddrs {
510 sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
511
512 masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
513 if err != nil {
514 internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
515 c.opt.MasterName, err)
516 _ = sentinel.Close()
517 continue
518 }
519
520 // Push working sentinel to the top.
521 c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
522 c.setSentinel(ctx, sentinel)
523
524 addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
525 return addr, nil
526 }
527
Abhay Kumara2ae5992025-11-10 14:02:24 +0000528 return "", errors.New("redis: all sentinels specified in configuration are unreachable")
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500529}
530
Abhay Kumara2ae5992025-11-10 14:02:24 +0000531func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500532 c.mu.RLock()
533 sentinel := c.sentinel
534 c.mu.RUnlock()
535
536 if sentinel != nil {
537 addrs := c.getSlaveAddrs(ctx, sentinel)
538 if len(addrs) > 0 {
539 return addrs, nil
540 }
541 }
542
543 c.mu.Lock()
544 defer c.mu.Unlock()
545
546 if c.sentinel != nil {
547 addrs := c.getSlaveAddrs(ctx, c.sentinel)
548 if len(addrs) > 0 {
549 return addrs, nil
550 }
551 _ = c.closeSentinel()
552 }
553
Abhay Kumara2ae5992025-11-10 14:02:24 +0000554 var sentinelReachable bool
555
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500556 for i, sentinelAddr := range c.sentinelAddrs {
557 sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
558
559 slaves, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
560 if err != nil {
561 internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s",
562 c.opt.MasterName, err)
563 _ = sentinel.Close()
564 continue
565 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000566 sentinelReachable = true
567 addrs := parseSlaveAddrs(slaves, useDisconnected)
568 if len(addrs) == 0 {
569 continue
570 }
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500571 // Push working sentinel to the top.
572 c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
573 c.setSentinel(ctx, sentinel)
574
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500575 return addrs, nil
576 }
577
Abhay Kumara2ae5992025-11-10 14:02:24 +0000578 if sentinelReachable {
579 return []string{}, nil
580 }
581 return []string{}, errors.New("redis: all sentinels specified in configuration are unreachable")
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500582}
583
584func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string {
585 addr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
586 if err != nil {
587 internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
588 c.opt.MasterName, err)
589 return ""
590 }
591 return net.JoinHostPort(addr[0], addr[1])
592}
593
594func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string {
595 addrs, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
596 if err != nil {
597 internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
598 c.opt.MasterName, err)
599 return []string{}
600 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000601 return parseSlaveAddrs(addrs, false)
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500602}
603
Abhay Kumara2ae5992025-11-10 14:02:24 +0000604func parseSlaveAddrs(addrs []interface{}, keepDisconnected bool) []string {
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500605 nodes := make([]string, 0, len(addrs))
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500606 for _, node := range addrs {
607 ip := ""
608 port := ""
609 flags := []string{}
610 lastkey := ""
611 isDown := false
612
613 for _, key := range node.([]interface{}) {
614 switch lastkey {
615 case "ip":
616 ip = key.(string)
617 case "port":
618 port = key.(string)
619 case "flags":
620 flags = strings.Split(key.(string), ",")
621 }
622 lastkey = key.(string)
623 }
624
625 for _, flag := range flags {
626 switch flag {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000627 case "s_down", "o_down":
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500628 isDown = true
Abhay Kumara2ae5992025-11-10 14:02:24 +0000629 case "disconnected":
630 if !keepDisconnected {
631 isDown = true
632 }
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500633 }
634 }
635
636 if !isDown {
637 nodes = append(nodes, net.JoinHostPort(ip, port))
638 }
639 }
640
641 return nodes
642}
643
644func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) {
645 c.mu.RLock()
Abhay Kumara2ae5992025-11-10 14:02:24 +0000646 currentAddr := c._masterAddr //nolint:ifshort
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500647 c.mu.RUnlock()
648
649 if addr == currentAddr {
650 return
651 }
652
653 c.mu.Lock()
654 defer c.mu.Unlock()
655
656 if addr == c._masterAddr {
657 return
658 }
659 c._masterAddr = addr
660
661 internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
662 c.opt.MasterName, addr)
663 if c.onFailover != nil {
664 c.onFailover(ctx, addr)
665 }
666}
667
668func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelClient) {
669 if c.sentinel != nil {
670 panic("not reached")
671 }
672 c.sentinel = sentinel
673 c.discoverSentinels(ctx)
674
675 c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+slave-reconf-done")
676 go c.listen(c.pubsub)
677}
678
679func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
680 sentinels, err := c.sentinel.Sentinels(ctx, c.opt.MasterName).Result()
681 if err != nil {
682 internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.opt.MasterName, err)
683 return
684 }
685 for _, sentinel := range sentinels {
686 vals := sentinel.([]interface{})
Abhay Kumara2ae5992025-11-10 14:02:24 +0000687 var ip, port string
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500688 for i := 0; i < len(vals); i += 2 {
689 key := vals[i].(string)
Abhay Kumara2ae5992025-11-10 14:02:24 +0000690 switch key {
691 case "ip":
692 ip = vals[i+1].(string)
693 case "port":
694 port = vals[i+1].(string)
695 }
696 }
697 if ip != "" && port != "" {
698 sentinelAddr := net.JoinHostPort(ip, port)
699 if !contains(c.sentinelAddrs, sentinelAddr) {
700 internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q",
701 sentinelAddr, c.opt.MasterName)
702 c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500703 }
704 }
705 }
706}
707
708func (c *sentinelFailover) listen(pubsub *PubSub) {
709 ctx := context.TODO()
Abhay Kumara2ae5992025-11-10 14:02:24 +0000710
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500711 if c.onUpdate != nil {
712 c.onUpdate(ctx)
713 }
714
715 ch := pubsub.Channel()
716 for msg := range ch {
717 if msg.Channel == "+switch-master" {
718 parts := strings.Split(msg.Payload, " ")
719 if parts[0] != c.opt.MasterName {
720 internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0])
721 continue
722 }
723 addr := net.JoinHostPort(parts[3], parts[4])
724 c.trySwitchMaster(pubsub.getContext(), addr)
725 }
726
727 if c.onUpdate != nil {
728 c.onUpdate(ctx)
729 }
730 }
731}
732
733func contains(slice []string, str string) bool {
734 for _, s := range slice {
735 if s == str {
736 return true
737 }
738 }
739 return false
740}
741
742//------------------------------------------------------------------------------
743
744// NewFailoverClusterClient returns a client that supports routing read-only commands
745// to a slave node.
746func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
747 sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
748 copy(sentinelAddrs, failoverOpt.SentinelAddrs)
749
750 failover := &sentinelFailover{
751 opt: failoverOpt,
752 sentinelAddrs: sentinelAddrs,
753 }
754
755 opt := failoverOpt.clusterOptions()
756 opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
757 masterAddr, err := failover.MasterAddr(ctx)
758 if err != nil {
759 return nil, err
760 }
761
762 nodes := []ClusterNode{{
763 Addr: masterAddr,
764 }}
765
Abhay Kumara2ae5992025-11-10 14:02:24 +0000766 slaveAddrs, err := failover.slaveAddrs(ctx, false)
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500767 if err != nil {
768 return nil, err
769 }
770
771 for _, slaveAddr := range slaveAddrs {
772 nodes = append(nodes, ClusterNode{
773 Addr: slaveAddr,
774 })
775 }
776
777 slots := []ClusterSlot{
778 {
779 Start: 0,
780 End: 16383,
781 Nodes: nodes,
782 },
783 }
784 return slots, nil
785 }
786
787 c := NewClusterClient(opt)
Abhay Kumara2ae5992025-11-10 14:02:24 +0000788
789 failover.mu.Lock()
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500790 failover.onUpdate = func(ctx context.Context) {
791 c.ReloadState(ctx)
792 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000793 failover.mu.Unlock()
Joey Armstrong5f51f2e2023-01-17 17:06:26 -0500794
795 return c
796}