blob: 24f5988986d355951592affde43038bbcf10bdee [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "errors"
20 "fmt"
21 "strings"
22 "sync"
23 "time"
24
25 "github.com/coreos/go-semver/semver"
26 "go.uber.org/zap"
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/codes"
29 grpccredentials "google.golang.org/grpc/credentials"
30 "google.golang.org/grpc/credentials/insecure"
31 "google.golang.org/grpc/keepalive"
32 "google.golang.org/grpc/status"
33
34 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
35 "go.etcd.io/etcd/api/v3/version"
36 "go.etcd.io/etcd/client/pkg/v3/logutil"
37 "go.etcd.io/etcd/client/pkg/v3/verify"
38 "go.etcd.io/etcd/client/v3/credentials"
39 "go.etcd.io/etcd/client/v3/internal/endpoint"
40 "go.etcd.io/etcd/client/v3/internal/resolver"
41)
42
43var (
44 ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
45 ErrOldCluster = errors.New("etcdclient: old cluster version")
46)
47
48// Client provides and manages an etcd v3 client session.
49type Client struct {
50 Cluster
51 KV
52 Lease
53 Watcher
54 Auth
55 Maintenance
56
57 conn *grpc.ClientConn
58
59 cfg Config
60 creds grpccredentials.TransportCredentials
61 resolver *resolver.EtcdManualResolver
62
63 epMu *sync.RWMutex
64 endpoints []string
65
66 ctx context.Context
67 cancel context.CancelFunc
68
69 // Username is a user name for authentication.
70 Username string
71 // Password is a password for authentication.
72 Password string
73 authTokenBundle credentials.PerRPCCredentialsBundle
74
75 callOpts []grpc.CallOption
76
77 lgMu *sync.RWMutex
78 lg *zap.Logger
79}
80
81// New creates a new etcdv3 client from a given configuration.
82func New(cfg Config) (*Client, error) {
83 if len(cfg.Endpoints) == 0 {
84 return nil, ErrNoAvailableEndpoints
85 }
86
87 return newClient(&cfg)
88}
89
90// NewCtxClient creates a client with a context but no underlying grpc
91// connection. This is useful for embedded cases that override the
92// service interface implementations and do not need connection management.
93func NewCtxClient(ctx context.Context, opts ...Option) *Client {
94 cctx, cancel := context.WithCancel(ctx)
95 c := &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex), epMu: new(sync.RWMutex)}
96 for _, opt := range opts {
97 opt(c)
98 }
99 if c.lg == nil {
100 c.lg = zap.NewNop()
101 }
102 return c
103}
104
105// Option is a function type that can be passed as argument to NewCtxClient to configure client
106type Option func(*Client)
107
108// NewFromURL creates a new etcdv3 client from a URL.
109func NewFromURL(url string) (*Client, error) {
110 return New(Config{Endpoints: []string{url}})
111}
112
113// NewFromURLs creates a new etcdv3 client from URLs.
114func NewFromURLs(urls []string) (*Client, error) {
115 return New(Config{Endpoints: urls})
116}
117
118// WithZapLogger is a NewCtxClient option that overrides the logger
119func WithZapLogger(lg *zap.Logger) Option {
120 return func(c *Client) {
121 c.lg = lg
122 }
123}
124
125// WithLogger overrides the logger.
126//
127// Deprecated: Please use WithZapLogger or Logger field in clientv3.Config
128//
129// Does not changes grpcLogger, that can be explicitly configured
130// using grpc_zap.ReplaceGrpcLoggerV2(..) method.
131func (c *Client) WithLogger(lg *zap.Logger) *Client {
132 c.lgMu.Lock()
133 c.lg = lg
134 c.lgMu.Unlock()
135 return c
136}
137
138// GetLogger gets the logger.
139// NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger.
140func (c *Client) GetLogger() *zap.Logger {
141 c.lgMu.RLock()
142 l := c.lg
143 c.lgMu.RUnlock()
144 return l
145}
146
147// Close shuts down the client's etcd connections.
148func (c *Client) Close() error {
149 c.cancel()
150 if c.Watcher != nil {
151 c.Watcher.Close()
152 }
153 if c.Lease != nil {
154 c.Lease.Close()
155 }
156 if c.conn != nil {
157 return ContextError(c.ctx, c.conn.Close())
158 }
159 return c.ctx.Err()
160}
161
162// Ctx is a context for "out of band" messages (e.g., for sending
163// "clean up" message when another context is canceled). It is
164// canceled on client Close().
165func (c *Client) Ctx() context.Context { return c.ctx }
166
167// Endpoints lists the registered endpoints for the client.
168func (c *Client) Endpoints() []string {
169 // copy the slice; protect original endpoints from being changed
170 c.epMu.RLock()
171 defer c.epMu.RUnlock()
172 eps := make([]string, len(c.endpoints))
173 copy(eps, c.endpoints)
174 return eps
175}
176
177// SetEndpoints updates client's endpoints.
178func (c *Client) SetEndpoints(eps ...string) {
179 c.epMu.Lock()
180 defer c.epMu.Unlock()
181 c.endpoints = eps
182
183 c.resolver.SetEndpoints(eps)
184}
185
186// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
187func (c *Client) Sync(ctx context.Context) error {
188 mresp, err := c.MemberList(ctx)
189 if err != nil {
190 return err
191 }
192 var eps []string
193 for _, m := range mresp.Members {
194 if len(m.Name) != 0 && !m.IsLearner {
195 eps = append(eps, m.ClientURLs...)
196 }
197 }
198 // The linearizable `MemberList` returned successfully, so the
199 // endpoints shouldn't be empty.
200 verify.Verify(func() {
201 if len(eps) == 0 {
202 panic("empty endpoints returned from etcd cluster")
203 }
204 })
205 c.SetEndpoints(eps...)
206 c.lg.Debug("set etcd endpoints by autoSync", zap.Strings("endpoints", eps))
207 return nil
208}
209
210func (c *Client) autoSync() {
211 if c.cfg.AutoSyncInterval == time.Duration(0) {
212 return
213 }
214
215 for {
216 select {
217 case <-c.ctx.Done():
218 return
219 case <-time.After(c.cfg.AutoSyncInterval):
220 ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
221 err := c.Sync(ctx)
222 cancel()
223 if err != nil && !errors.Is(err, c.ctx.Err()) {
224 c.lg.Info("Auto sync endpoints failed.", zap.Error(err))
225 }
226 }
227 }
228}
229
230// dialSetupOpts gives the dial opts prior to any authentication.
231func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) []grpc.DialOption {
232 var opts []grpc.DialOption
233
234 if c.cfg.DialKeepAliveTime > 0 {
235 params := keepalive.ClientParameters{
236 Time: c.cfg.DialKeepAliveTime,
237 Timeout: c.cfg.DialKeepAliveTimeout,
238 PermitWithoutStream: c.cfg.PermitWithoutStream,
239 }
240 opts = append(opts, grpc.WithKeepaliveParams(params))
241 }
242 opts = append(opts, dopts...)
243
244 if creds != nil {
245 opts = append(opts, grpc.WithTransportCredentials(creds))
246 } else {
247 opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
248 }
249
250 unaryMaxRetries := defaultUnaryMaxRetries
251 if c.cfg.MaxUnaryRetries > 0 {
252 unaryMaxRetries = c.cfg.MaxUnaryRetries
253 }
254
255 backoffWaitBetween := defaultBackoffWaitBetween
256 if c.cfg.BackoffWaitBetween > 0 {
257 backoffWaitBetween = c.cfg.BackoffWaitBetween
258 }
259
260 backoffJitterFraction := defaultBackoffJitterFraction
261 if c.cfg.BackoffJitterFraction > 0 {
262 backoffJitterFraction = c.cfg.BackoffJitterFraction
263 }
264
265 // Interceptor retry and backoff.
266 // TODO: Replace all of clientv3/retry.go with RetryPolicy:
267 // https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130
268 rrBackoff := withBackoff(c.roundRobinQuorumBackoff(backoffWaitBetween, backoffJitterFraction))
269 opts = append(opts,
270 // Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
271 // Streams that are safe to retry are enabled individually.
272 grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),
273 grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(unaryMaxRetries), rrBackoff)),
274 )
275
276 return opts
277}
278
279// Dial connects to a single endpoint using the client's config.
280func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
281 creds := c.credentialsForEndpoint(ep)
282
283 // Using ad-hoc created resolver, to guarantee only explicitly given
284 // endpoint is used.
285 return c.dial(creds, grpc.WithResolvers(resolver.New(ep)))
286}
287
288func (c *Client) getToken(ctx context.Context) error {
289 var err error // return last error in a case of fail
290
291 if c.Username == "" || c.Password == "" {
292 return nil
293 }
294
295 resp, err := c.Auth.Authenticate(ctx, c.Username, c.Password)
296 if err != nil {
297 if errors.Is(err, rpctypes.ErrAuthNotEnabled) {
298 c.authTokenBundle.UpdateAuthToken("")
299 return nil
300 }
301 return err
302 }
303 c.authTokenBundle.UpdateAuthToken(resp.Token)
304 return nil
305}
306
307// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
308// of the provided endpoint determines the scheme used for all endpoints of the client connection.
309func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
310 creds := c.credentialsForEndpoint(c.Endpoints()[0])
311 opts := append(dopts, grpc.WithResolvers(c.resolver))
312 return c.dial(creds, opts...)
313}
314
315// dial configures and dials any grpc balancer target.
316func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
317 opts := c.dialSetupOpts(creds, dopts...)
318
319 if c.authTokenBundle != nil {
320 opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
321 }
322
323 opts = append(opts, c.cfg.DialOptions...)
324
325 dctx := c.ctx
326 if c.cfg.DialTimeout > 0 {
327 var cancel context.CancelFunc
328 dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
329 defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
330 }
331 target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.endpoints[0]))
332 conn, err := grpc.DialContext(dctx, target, opts...)
333 if err != nil {
334 return nil, err
335 }
336 return conn, nil
337}
338
339func authority(endpoint string) string {
340 spl := strings.SplitN(endpoint, "://", 2)
341 if len(spl) < 2 {
342 if strings.HasPrefix(endpoint, "unix:") {
343 return endpoint[len("unix:"):]
344 }
345 if strings.HasPrefix(endpoint, "unixs:") {
346 return endpoint[len("unixs:"):]
347 }
348 return endpoint
349 }
350 return spl[1]
351}
352
353func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
354 r := endpoint.RequiresCredentials(ep)
355 switch r {
356 case endpoint.CredsDrop:
357 return nil
358 case endpoint.CredsOptional:
359 return c.creds
360 case endpoint.CredsRequire:
361 if c.creds != nil {
362 return c.creds
363 }
364 return credentials.NewTransportCredential(nil)
365 default:
366 panic(fmt.Errorf("unsupported CredsRequirement: %v", r))
367 }
368}
369
370func newClient(cfg *Config) (*Client, error) {
371 if cfg == nil {
372 cfg = &Config{}
373 }
374 var creds grpccredentials.TransportCredentials
375 if cfg.TLS != nil {
376 creds = credentials.NewTransportCredential(cfg.TLS)
377 }
378
379 // use a temporary skeleton client to bootstrap first connection
380 baseCtx := context.TODO()
381 if cfg.Context != nil {
382 baseCtx = cfg.Context
383 }
384
385 ctx, cancel := context.WithCancel(baseCtx)
386 client := &Client{
387 conn: nil,
388 cfg: *cfg,
389 creds: creds,
390 ctx: ctx,
391 cancel: cancel,
392 epMu: new(sync.RWMutex),
393 callOpts: defaultCallOpts,
394 lgMu: new(sync.RWMutex),
395 }
396
397 var err error
398 if cfg.Logger != nil {
399 client.lg = cfg.Logger
400 } else if cfg.LogConfig != nil {
401 client.lg, err = cfg.LogConfig.Build()
402 } else {
403 client.lg, err = logutil.CreateDefaultZapLogger(etcdClientDebugLevel())
404 if client.lg != nil {
405 client.lg = client.lg.Named("etcd-client")
406 }
407 }
408 if err != nil {
409 return nil, err
410 }
411
412 if cfg.Username != "" && cfg.Password != "" {
413 client.Username = cfg.Username
414 client.Password = cfg.Password
415 client.authTokenBundle = credentials.NewPerRPCCredentialBundle()
416 }
417 if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 {
418 if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize {
419 return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize)
420 }
421 callOpts := []grpc.CallOption{
422 defaultWaitForReady,
423 defaultMaxCallSendMsgSize,
424 defaultMaxCallRecvMsgSize,
425 }
426 if cfg.MaxCallSendMsgSize > 0 {
427 callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize)
428 }
429 if cfg.MaxCallRecvMsgSize > 0 {
430 callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize)
431 }
432 client.callOpts = callOpts
433 }
434
435 client.resolver = resolver.New(cfg.Endpoints...)
436
437 if len(cfg.Endpoints) < 1 {
438 client.cancel()
439 return nil, errors.New("at least one Endpoint is required in client config")
440 }
441 client.SetEndpoints(cfg.Endpoints...)
442
443 // Use a provided endpoint target so that for https:// without any tls config given, then
444 // grpc will assume the certificate server name is the endpoint host.
445 conn, err := client.dialWithBalancer()
446 if err != nil {
447 client.cancel()
448 client.resolver.Close()
449 // TODO: Error like `fmt.Errorf(dialing [%s] failed: %v, strings.Join(cfg.Endpoints, ";"), err)` would help with debugging a lot.
450 return nil, err
451 }
452 client.conn = conn
453
454 client.Cluster = NewCluster(client)
455 client.KV = NewKV(client)
456 client.Lease = NewLease(client)
457 client.Watcher = NewWatcher(client)
458 client.Auth = NewAuth(client)
459 client.Maintenance = NewMaintenance(client)
460
461 // get token with established connection
462 ctx, cancel = client.ctx, func() {}
463 if client.cfg.DialTimeout > 0 {
464 ctx, cancel = context.WithTimeout(ctx, client.cfg.DialTimeout)
465 }
466 err = client.getToken(ctx)
467 if err != nil {
468 client.Close()
469 cancel()
470 // TODO: Consider fmt.Errorf("communicating with [%s] failed: %v", strings.Join(cfg.Endpoints, ";"), err)
471 return nil, err
472 }
473 cancel()
474
475 if cfg.RejectOldCluster {
476 if err := client.checkVersion(); err != nil {
477 client.Close()
478 return nil, err
479 }
480 }
481
482 go client.autoSync()
483 return client, nil
484}
485
486// roundRobinQuorumBackoff retries against quorum between each backoff.
487// This is intended for use with a round robin load balancer.
488func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc {
489 return func(attempt uint) time.Duration {
490 // after each round robin across quorum, backoff for our wait between duration
491 n := uint(len(c.Endpoints()))
492 quorum := (n/2 + 1)
493 if attempt%quorum == 0 {
494 c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
495 return jitterUp(waitBetween, jitterFraction)
496 }
497 c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
498 return 0
499 }
500}
501
502// minSupportedVersion returns the minimum version supported, which is the previous minor release.
503func minSupportedVersion() *semver.Version {
504 ver := semver.Must(semver.NewVersion(version.Version))
505 // consider only major and minor version
506 ver = &semver.Version{Major: ver.Major, Minor: ver.Minor}
507 for i := range version.AllVersions {
508 if version.AllVersions[i].Equal(*ver) {
509 if i == 0 {
510 return ver
511 }
512 return &version.AllVersions[i-1]
513 }
514 }
515 panic("current version is not in the version list")
516}
517
518func (c *Client) checkVersion() (err error) {
519 var wg sync.WaitGroup
520
521 eps := c.Endpoints()
522 errc := make(chan error, len(eps))
523 ctx, cancel := context.WithCancel(c.ctx)
524 if c.cfg.DialTimeout > 0 {
525 cancel()
526 ctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
527 }
528
529 wg.Add(len(eps))
530 for _, ep := range eps {
531 // if cluster is current, any endpoint gives a recent version
532 go func(e string) {
533 defer wg.Done()
534 resp, rerr := c.Status(ctx, e)
535 if rerr != nil {
536 errc <- rerr
537 return
538 }
539 vs, serr := semver.NewVersion(resp.Version)
540 if serr != nil {
541 errc <- serr
542 return
543 }
544
545 if vs.LessThan(*minSupportedVersion()) {
546 rerr = ErrOldCluster
547 }
548 errc <- rerr
549 }(ep)
550 }
551 // wait for success
552 for range eps {
553 if err = <-errc; err != nil {
554 break
555 }
556 }
557 cancel()
558 wg.Wait()
559 return err
560}
561
562// ActiveConnection returns the current in-use connection
563func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }
564
565// isHaltErr returns true if the given error and context indicate no forward
566// progress can be made, even after reconnecting.
567func isHaltErr(ctx context.Context, err error) bool {
568 if ctx != nil && ctx.Err() != nil {
569 return true
570 }
571 if err == nil {
572 return false
573 }
574 ev, _ := status.FromError(err)
575 // Unavailable codes mean the system will be right back.
576 // (e.g., can't connect, lost leader)
577 // Treat Internal codes as if something failed, leaving the
578 // system in an inconsistent state, but retrying could make progress.
579 // (e.g., failed in middle of send, corrupted frame)
580 // TODO: are permanent Internal errors possible from grpc?
581 return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal
582}
583
584// isUnavailableErr returns true if the given error is an unavailable error
585func isUnavailableErr(ctx context.Context, err error) bool {
586 if ctx != nil && ctx.Err() != nil {
587 return false
588 }
589 if err == nil {
590 return false
591 }
592 ev, ok := status.FromError(err)
593 if ok {
594 // Unavailable codes mean the system will be right back.
595 // (e.g., can't connect, lost leader)
596 return ev.Code() == codes.Unavailable
597 }
598 return false
599}
600
601// ContextError converts the error into an EtcdError if the error message matches one of
602// the defined messages; otherwise, it tries to retrieve the context error.
603func ContextError(ctx context.Context, err error) error {
604 if err == nil {
605 return nil
606 }
607 err = rpctypes.Error(err)
608 var serverErr rpctypes.EtcdError
609 if errors.As(err, &serverErr) {
610 return err
611 }
612 if ev, ok := status.FromError(err); ok {
613 code := ev.Code()
614 switch code {
615 case codes.DeadlineExceeded:
616 fallthrough
617 case codes.Canceled:
618 if ctx.Err() != nil {
619 err = ctx.Err()
620 }
621 }
622 }
623 return err
624}
625
626func canceledByCaller(stopCtx context.Context, err error) bool {
627 if stopCtx.Err() == nil || err == nil {
628 return false
629 }
630
631 return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
632}
633
634// IsConnCanceled returns true, if error is from a closed gRPC connection.
635// ref. https://github.com/grpc/grpc-go/pull/1854
636func IsConnCanceled(err error) bool {
637 if err == nil {
638 return false
639 }
640
641 // >= gRPC v1.23.x
642 s, ok := status.FromError(err)
643 if ok {
644 // connection is canceled or server has already closed the connection
645 return s.Code() == codes.Canceled || s.Message() == "transport is closing"
646 }
647
648 // >= gRPC v1.10.x
649 if errors.Is(err, context.Canceled) {
650 return true
651 }
652
653 // <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
654 return strings.Contains(err.Error(), "grpc: the client connection is closing")
655}