blob: a3c315f2d76ebf2ccce920be46861ec8a17fb756 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
Abhay Kumara2ae5992025-11-10 14:02:24 +000026 "net/url"
27 "slices"
khenaidooac637102019-01-14 15:44:34 -050028 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
Andrea Campanella3614a922021-02-25 12:40:42 +010034 "google.golang.org/grpc/balancer/base"
Abhay Kumara2ae5992025-11-10 14:02:24 +000035 "google.golang.org/grpc/balancer/pickfirst"
khenaidooac637102019-01-14 15:44:34 -050036 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/connectivity"
Abhay Kumara2ae5992025-11-10 14:02:24 +000038 "google.golang.org/grpc/internal"
khenaidooac637102019-01-14 15:44:34 -050039 "google.golang.org/grpc/internal/channelz"
khenaidooac637102019-01-14 15:44:34 -050040 "google.golang.org/grpc/internal/grpcsync"
Abhay Kumara2ae5992025-11-10 14:02:24 +000041 "google.golang.org/grpc/internal/idle"
42 iresolver "google.golang.org/grpc/internal/resolver"
43 "google.golang.org/grpc/internal/stats"
khenaidooac637102019-01-14 15:44:34 -050044 "google.golang.org/grpc/internal/transport"
45 "google.golang.org/grpc/keepalive"
khenaidooac637102019-01-14 15:44:34 -050046 "google.golang.org/grpc/resolver"
Scott Baker8461e152019-10-01 14:44:30 -070047 "google.golang.org/grpc/serviceconfig"
khenaidooac637102019-01-14 15:44:34 -050048 "google.golang.org/grpc/status"
Andrea Campanella3614a922021-02-25 12:40:42 +010049
50 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
Andrea Campanella3614a922021-02-25 12:40:42 +010051 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
Abhay Kumara2ae5992025-11-10 14:02:24 +000052 _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
53 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
khenaidooac637102019-01-14 15:44:34 -050054)
55
56const (
57 // minimum time to give a connection to complete
58 minConnectTimeout = 20 * time.Second
khenaidooac637102019-01-14 15:44:34 -050059)
60
61var (
62 // ErrClientConnClosing indicates that the operation is illegal because
63 // the ClientConn is closing.
64 //
65 // Deprecated: this error should not be relied upon by users; use the status
66 // code of Canceled instead.
67 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
68 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
69 errConnDrain = errors.New("grpc: the connection is drained")
70 // errConnClosing indicates that the connection is closing.
71 errConnClosing = errors.New("grpc: the connection is closing")
Abhay Kumara2ae5992025-11-10 14:02:24 +000072 // errConnIdling indicates the connection is being closed as the channel
73 // is moving to an idle mode due to inactivity.
74 errConnIdling = errors.New("grpc: the connection is closing due to channel idleness")
Scott Baker8461e152019-10-01 14:44:30 -070075 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
76 // service config.
77 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
Abhay Kumara2ae5992025-11-10 14:02:24 +000078 // PickFirstBalancerName is the name of the pick_first balancer.
79 PickFirstBalancerName = pickfirst.Name
khenaidooac637102019-01-14 15:44:34 -050080)
81
82// The following errors are returned from Dial and DialContext
83var (
84 // errNoTransportSecurity indicates that there is no transport security
85 // being set for ClientConn. Users should either set one or explicitly
86 // call WithInsecure DialOption to disable security.
Abhay Kumara2ae5992025-11-10 14:02:24 +000087 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithTransportCredentials(insecure.NewCredentials()) explicitly or set credentials)")
khenaidooac637102019-01-14 15:44:34 -050088 // errTransportCredsAndBundle indicates that creds bundle is used together
89 // with other individual Transport Credentials.
90 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
Abhay Kumara2ae5992025-11-10 14:02:24 +000091 // errNoTransportCredsInBundle indicated that the configured creds bundle
92 // returned a transport credentials which was nil.
93 errNoTransportCredsInBundle = errors.New("grpc: credentials.Bundle must return non-nil transport credentials")
94 // errTransportCredentialsMissing indicates that users want to transmit
95 // security information (e.g., OAuth2 token) which requires secure
96 // connection on an insecure connection.
khenaidooac637102019-01-14 15:44:34 -050097 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
khenaidooac637102019-01-14 15:44:34 -050098)
99
100const (
101 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
102 defaultClientMaxSendMessageSize = math.MaxInt32
103 // http2IOBufSize specifies the buffer size for sending frames.
104 defaultWriteBufSize = 32 * 1024
105 defaultReadBufSize = 32 * 1024
106)
107
Abhay Kumara2ae5992025-11-10 14:02:24 +0000108type defaultConfigSelector struct {
109 sc *ServiceConfig
khenaidooac637102019-01-14 15:44:34 -0500110}
111
Abhay Kumara2ae5992025-11-10 14:02:24 +0000112func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
113 return &iresolver.RPCConfig{
114 Context: rpcInfo.Context,
115 MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
116 }, nil
117}
118
119// NewClient creates a new gRPC "channel" for the target URI provided. No I/O
120// is performed. Use of the ClientConn for RPCs will automatically cause it to
121// connect. The Connect method may be called to manually create a connection,
122// but for most users this should be unnecessary.
khenaidooac637102019-01-14 15:44:34 -0500123//
124// The target name syntax is defined in
Abhay Kumara2ae5992025-11-10 14:02:24 +0000125// https://github.com/grpc/grpc/blob/master/doc/naming.md. E.g. to use the dns
126// name resolver, a "dns:///" prefix may be applied to the target. The default
127// name resolver will be used if no scheme is detected, or if the parsed scheme
128// is not a registered name resolver. The default resolver is "dns" but can be
129// overridden using the resolver package's SetDefaultScheme.
130//
131// Examples:
132//
133// - "foo.googleapis.com:8080"
134// - "dns:///foo.googleapis.com:8080"
135// - "dns:///foo.googleapis.com"
136// - "dns:///10.0.0.213:8080"
137// - "dns:///%5B2001:db8:85a3:8d3:1319:8a2e:370:7348%5D:443"
138// - "dns://8.8.8.8/foo.googleapis.com:8080"
139// - "dns://8.8.8.8/foo.googleapis.com"
140// - "zookeeper://zk.example.com:9900/example_service"
141//
142// The DialOptions returned by WithBlock, WithTimeout,
143// WithReturnConnectionError, and FailOnNonTempDialError are ignored by this
144// function.
145func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
khenaidooac637102019-01-14 15:44:34 -0500146 cc := &ClientConn{
Abhay Kumara2ae5992025-11-10 14:02:24 +0000147 target: target,
148 conns: make(map[*addrConn]struct{}),
149 dopts: defaultDialOptions(),
khenaidooac637102019-01-14 15:44:34 -0500150 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000151
khenaidooac637102019-01-14 15:44:34 -0500152 cc.retryThrottler.Store((*retryThrottler)(nil))
Abhay Kumara2ae5992025-11-10 14:02:24 +0000153 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
khenaidooac637102019-01-14 15:44:34 -0500154 cc.ctx, cc.cancel = context.WithCancel(context.Background())
155
Abhay Kumara2ae5992025-11-10 14:02:24 +0000156 // Apply dial options.
157 disableGlobalOpts := false
158 for _, opt := range opts {
159 if _, ok := opt.(*disableGlobalDialOptions); ok {
160 disableGlobalOpts = true
161 break
162 }
163 }
164
165 if !disableGlobalOpts {
166 for _, opt := range globalDialOptions {
167 opt.apply(&cc.dopts)
168 }
169 }
170
khenaidooac637102019-01-14 15:44:34 -0500171 for _, opt := range opts {
172 opt.apply(&cc.dopts)
173 }
174
Abhay Kumara2ae5992025-11-10 14:02:24 +0000175 // Determine the resolver to use.
176 if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
177 return nil, err
178 }
179
180 for _, opt := range globalPerTargetDialOptions {
181 opt.DialOptionForTarget(cc.parsedTarget.URL).apply(&cc.dopts)
182 }
183
Scott Baker8461e152019-10-01 14:44:30 -0700184 chainUnaryClientInterceptors(cc)
185 chainStreamClientInterceptors(cc)
186
Abhay Kumara2ae5992025-11-10 14:02:24 +0000187 if err := cc.validateTransportCredentials(); err != nil {
188 return nil, err
189 }
190
191 if cc.dopts.defaultServiceConfigRawJSON != nil {
192 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
193 if scpr.Err != nil {
194 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
195 }
196 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
197 }
198 cc.keepaliveParams = cc.dopts.copts.KeepaliveParams
199
200 if err = cc.initAuthority(); err != nil {
201 return nil, err
202 }
203
204 // Register ClientConn with channelz. Note that this is only done after
205 // channel creation cannot fail.
206 cc.channelzRegistration(target)
207 channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget)
208 channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
209
210 cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
211 cc.pickerWrapper = newPickerWrapper()
212
213 cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
214
215 cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
216 cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
217
218 return cc, nil
219}
220
221// Dial calls DialContext(context.Background(), target, opts...).
222//
223// Deprecated: use NewClient instead. Will be supported throughout 1.x.
224func Dial(target string, opts ...DialOption) (*ClientConn, error) {
225 return DialContext(context.Background(), target, opts...)
226}
227
228// DialContext calls NewClient and then exits idle mode. If WithBlock(true) is
229// used, it calls Connect and WaitForStateChange until either the context
230// expires or the state of the ClientConn is Ready.
231//
232// One subtle difference between NewClient and Dial and DialContext is that the
233// former uses "dns" as the default name resolver, while the latter use
234// "passthrough" for backward compatibility. This distinction should not matter
235// to most users, but could matter to legacy users that specify a custom dialer
236// and expect it to receive the target string directly.
237//
238// Deprecated: use NewClient instead. Will be supported throughout 1.x.
239func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
240 // At the end of this method, we kick the channel out of idle, rather than
241 // waiting for the first rpc.
242 //
243 // WithLocalDNSResolution dial option in `grpc.Dial` ensures that it
244 // preserves behavior: when default scheme passthrough is used, skip
245 // hostname resolution, when "dns" is used for resolution, perform
246 // resolution on the client.
247 opts = append([]DialOption{withDefaultScheme("passthrough"), WithLocalDNSResolution()}, opts...)
248 cc, err := NewClient(target, opts...)
249 if err != nil {
250 return nil, err
251 }
252
253 // We start the channel off in idle mode, but kick it out of idle now,
254 // instead of waiting for the first RPC. This is the legacy behavior of
255 // Dial.
Scott Baker8461e152019-10-01 14:44:30 -0700256 defer func() {
257 if err != nil {
258 cc.Close()
259 }
260 }()
261
Abhay Kumara2ae5992025-11-10 14:02:24 +0000262 // This creates the name resolver, load balancer, etc.
263 if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
264 return nil, err
khenaidooac637102019-01-14 15:44:34 -0500265 }
266
Abhay Kumara2ae5992025-11-10 14:02:24 +0000267 // Return now for non-blocking dials.
268 if !cc.dopts.block {
269 return cc, nil
khenaidooac637102019-01-14 15:44:34 -0500270 }
271
272 if cc.dopts.timeout > 0 {
273 var cancel context.CancelFunc
274 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
275 defer cancel()
276 }
khenaidooac637102019-01-14 15:44:34 -0500277 defer func() {
278 select {
279 case <-ctx.Done():
Abhay Kumara2ae5992025-11-10 14:02:24 +0000280 switch {
281 case ctx.Err() == err:
282 conn = nil
283 case err == nil || !cc.dopts.returnLastError:
284 conn, err = nil, ctx.Err()
285 default:
286 conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
287 }
khenaidooac637102019-01-14 15:44:34 -0500288 default:
289 }
khenaidooac637102019-01-14 15:44:34 -0500290 }()
291
Abhay Kumara2ae5992025-11-10 14:02:24 +0000292 // A blocking dial blocks until the clientConn is ready.
293 for {
294 s := cc.GetState()
295 if s == connectivity.Idle {
296 cc.Connect()
khenaidooac637102019-01-14 15:44:34 -0500297 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000298 if s == connectivity.Ready {
299 return cc, nil
300 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
301 if err = cc.connectionError(); err != nil {
302 terr, ok := err.(interface {
303 Temporary() bool
304 })
305 if ok && !terr.Temporary() {
306 return nil, err
307 }
khenaidooac637102019-01-14 15:44:34 -0500308 }
khenaidooac637102019-01-14 15:44:34 -0500309 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000310 if !cc.WaitForStateChange(ctx, s) {
311 // ctx got timeout or canceled.
312 if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
313 return nil, err
khenaidooac637102019-01-14 15:44:34 -0500314 }
khenaidooac637102019-01-14 15:44:34 -0500315 return nil, ctx.Err()
316 }
317 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000318}
khenaidooac637102019-01-14 15:44:34 -0500319
Abhay Kumara2ae5992025-11-10 14:02:24 +0000320// addTraceEvent is a helper method to add a trace event on the channel. If the
321// channel is a nested one, the same event is also added on the parent channel.
322func (cc *ClientConn) addTraceEvent(msg string) {
323 ted := &channelz.TraceEvent{
324 Desc: fmt.Sprintf("Channel %s", msg),
325 Severity: channelz.CtInfo,
khenaidooac637102019-01-14 15:44:34 -0500326 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000327 if cc.dopts.channelzParent != nil {
328 ted.Parent = &channelz.TraceEvent{
329 Desc: fmt.Sprintf("Nested channel(id:%d) %s", cc.channelz.ID, msg),
330 Severity: channelz.CtInfo,
331 }
khenaidooac637102019-01-14 15:44:34 -0500332 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000333 channelz.AddTraceEvent(logger, cc.channelz, 0, ted)
334}
khenaidooac637102019-01-14 15:44:34 -0500335
Abhay Kumara2ae5992025-11-10 14:02:24 +0000336type idler ClientConn
khenaidooac637102019-01-14 15:44:34 -0500337
Abhay Kumara2ae5992025-11-10 14:02:24 +0000338func (i *idler) EnterIdleMode() {
339 (*ClientConn)(i).enterIdleMode()
340}
341
342func (i *idler) ExitIdleMode() error {
343 return (*ClientConn)(i).exitIdleMode()
344}
345
346// exitIdleMode moves the channel out of idle mode by recreating the name
347// resolver and load balancer. This should never be called directly; use
348// cc.idlenessMgr.ExitIdleMode instead.
349func (cc *ClientConn) exitIdleMode() (err error) {
khenaidooac637102019-01-14 15:44:34 -0500350 cc.mu.Lock()
Abhay Kumara2ae5992025-11-10 14:02:24 +0000351 if cc.conns == nil {
352 cc.mu.Unlock()
353 return errConnClosing
354 }
khenaidooac637102019-01-14 15:44:34 -0500355 cc.mu.Unlock()
Abhay Kumara2ae5992025-11-10 14:02:24 +0000356
357 // This needs to be called without cc.mu because this builds a new resolver
358 // which might update state or report error inline, which would then need to
359 // acquire cc.mu.
360 if err := cc.resolverWrapper.start(); err != nil {
361 return err
362 }
363
364 cc.addTraceEvent("exiting idle mode")
365 return nil
366}
367
368// initIdleStateLocked initializes common state to how it should be while idle.
369func (cc *ClientConn) initIdleStateLocked() {
370 cc.resolverWrapper = newCCResolverWrapper(cc)
371 cc.balancerWrapper = newCCBalancerWrapper(cc)
372 cc.firstResolveEvent = grpcsync.NewEvent()
373 // cc.conns == nil is a proxy for the ClientConn being closed. So, instead
374 // of setting it to nil here, we recreate the map. This also means that we
375 // don't have to do this when exiting idle mode.
376 cc.conns = make(map[*addrConn]struct{})
377}
378
379// enterIdleMode puts the channel in idle mode, and as part of it shuts down the
380// name resolver, load balancer, and any subchannels. This should never be
381// called directly; use cc.idlenessMgr.EnterIdleMode instead.
382func (cc *ClientConn) enterIdleMode() {
383 cc.mu.Lock()
384
385 if cc.conns == nil {
386 cc.mu.Unlock()
387 return
388 }
389
390 conns := cc.conns
391
392 rWrapper := cc.resolverWrapper
393 rWrapper.close()
394 cc.pickerWrapper.reset()
395 bWrapper := cc.balancerWrapper
396 bWrapper.close()
397 cc.csMgr.updateState(connectivity.Idle)
398 cc.addTraceEvent("entering idle mode")
399
400 cc.initIdleStateLocked()
401
402 cc.mu.Unlock()
403
404 // Block until the name resolver and LB policy are closed.
405 <-rWrapper.serializer.Done()
406 <-bWrapper.serializer.Done()
407
408 // Close all subchannels after the LB policy is closed.
409 for ac := range conns {
410 ac.tearDown(errConnIdling)
411 }
412}
413
414// validateTransportCredentials performs a series of checks on the configured
415// transport credentials. It returns a non-nil error if any of these conditions
416// are met:
417// - no transport creds and no creds bundle is configured
418// - both transport creds and creds bundle are configured
419// - creds bundle is configured, but it lacks a transport credentials
420// - insecure transport creds configured alongside call creds that require
421// transport level security
422//
423// If none of the above conditions are met, the configured credentials are
424// deemed valid and a nil error is returned.
425func (cc *ClientConn) validateTransportCredentials() error {
426 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
427 return errNoTransportSecurity
428 }
429 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
430 return errTransportCredsAndBundle
431 }
432 if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
433 return errNoTransportCredsInBundle
434 }
435 transportCreds := cc.dopts.copts.TransportCredentials
436 if transportCreds == nil {
437 transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
438 }
439 if transportCreds.Info().SecurityProtocol == "insecure" {
440 for _, cd := range cc.dopts.copts.PerRPCCredentials {
441 if cd.RequireTransportSecurity() {
442 return errTransportCredentialsMissing
khenaidooac637102019-01-14 15:44:34 -0500443 }
444 }
445 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000446 return nil
447}
khenaidooac637102019-01-14 15:44:34 -0500448
Abhay Kumara2ae5992025-11-10 14:02:24 +0000449// channelzRegistration registers the newly created ClientConn with channelz and
450// stores the returned identifier in `cc.channelz`. A channelz trace event is
451// emitted for ClientConn creation. If the newly created ClientConn is a nested
452// one, i.e a valid parent ClientConn ID is specified via a dial option, the
453// trace event is also added to the parent.
454//
455// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
456func (cc *ClientConn) channelzRegistration(target string) {
457 parentChannel, _ := cc.dopts.channelzParent.(*channelz.Channel)
458 cc.channelz = channelz.RegisterChannel(parentChannel, target)
459 cc.addTraceEvent(fmt.Sprintf("created for target %q", target))
khenaidooac637102019-01-14 15:44:34 -0500460}
461
Scott Baker8461e152019-10-01 14:44:30 -0700462// chainUnaryClientInterceptors chains all unary client interceptors into one.
463func chainUnaryClientInterceptors(cc *ClientConn) {
464 interceptors := cc.dopts.chainUnaryInts
465 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
466 // be executed before any other chained interceptors.
467 if cc.dopts.unaryInt != nil {
468 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
469 }
470 var chainedInt UnaryClientInterceptor
471 if len(interceptors) == 0 {
472 chainedInt = nil
473 } else if len(interceptors) == 1 {
474 chainedInt = interceptors[0]
475 } else {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000476 chainedInt = func(ctx context.Context, method string, req, reply any, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
Scott Baker8461e152019-10-01 14:44:30 -0700477 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
478 }
479 }
480 cc.dopts.unaryInt = chainedInt
481}
482
483// getChainUnaryInvoker recursively generate the chained unary invoker.
484func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
485 if curr == len(interceptors)-1 {
486 return finalInvoker
487 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000488 return func(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {
Scott Baker8461e152019-10-01 14:44:30 -0700489 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
490 }
491}
492
493// chainStreamClientInterceptors chains all stream client interceptors into one.
494func chainStreamClientInterceptors(cc *ClientConn) {
495 interceptors := cc.dopts.chainStreamInts
496 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
497 // be executed before any other chained interceptors.
498 if cc.dopts.streamInt != nil {
499 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
500 }
501 var chainedInt StreamClientInterceptor
502 if len(interceptors) == 0 {
503 chainedInt = nil
504 } else if len(interceptors) == 1 {
505 chainedInt = interceptors[0]
506 } else {
507 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
508 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
509 }
510 }
511 cc.dopts.streamInt = chainedInt
512}
513
514// getChainStreamer recursively generate the chained client stream constructor.
515func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
516 if curr == len(interceptors)-1 {
517 return finalStreamer
518 }
519 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
520 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
521 }
522}
523
Abhay Kumara2ae5992025-11-10 14:02:24 +0000524// newConnectivityStateManager creates an connectivityStateManager with
525// the specified channel.
526func newConnectivityStateManager(ctx context.Context, channel *channelz.Channel) *connectivityStateManager {
527 return &connectivityStateManager{
528 channelz: channel,
529 pubSub: grpcsync.NewPubSub(ctx),
530 }
531}
532
khenaidooac637102019-01-14 15:44:34 -0500533// connectivityStateManager keeps the connectivity.State of ClientConn.
534// This struct will eventually be exported so the balancers can access it.
Abhay Kumara2ae5992025-11-10 14:02:24 +0000535//
536// TODO: If possible, get rid of the `connectivityStateManager` type, and
537// provide this functionality using the `PubSub`, to avoid keeping track of
538// the connectivity state at two places.
khenaidooac637102019-01-14 15:44:34 -0500539type connectivityStateManager struct {
540 mu sync.Mutex
541 state connectivity.State
542 notifyChan chan struct{}
Abhay Kumara2ae5992025-11-10 14:02:24 +0000543 channelz *channelz.Channel
544 pubSub *grpcsync.PubSub
khenaidooac637102019-01-14 15:44:34 -0500545}
546
547// updateState updates the connectivity.State of ClientConn.
548// If there's a change it notifies goroutines waiting on state change to
549// happen.
550func (csm *connectivityStateManager) updateState(state connectivity.State) {
551 csm.mu.Lock()
552 defer csm.mu.Unlock()
553 if csm.state == connectivity.Shutdown {
554 return
555 }
556 if csm.state == state {
557 return
558 }
559 csm.state = state
Abhay Kumara2ae5992025-11-10 14:02:24 +0000560 csm.channelz.ChannelMetrics.State.Store(&state)
561 csm.pubSub.Publish(state)
562
563 channelz.Infof(logger, csm.channelz, "Channel Connectivity change to %v", state)
khenaidooac637102019-01-14 15:44:34 -0500564 if csm.notifyChan != nil {
565 // There are other goroutines waiting on this channel.
566 close(csm.notifyChan)
567 csm.notifyChan = nil
568 }
569}
570
571func (csm *connectivityStateManager) getState() connectivity.State {
572 csm.mu.Lock()
573 defer csm.mu.Unlock()
574 return csm.state
575}
576
577func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
578 csm.mu.Lock()
579 defer csm.mu.Unlock()
580 if csm.notifyChan == nil {
581 csm.notifyChan = make(chan struct{})
582 }
583 return csm.notifyChan
584}
585
Abhay Kumara2ae5992025-11-10 14:02:24 +0000586// ClientConnInterface defines the functions clients need to perform unary and
587// streaming RPCs. It is implemented by *ClientConn, and is only intended to
588// be referenced by generated code.
589type ClientConnInterface interface {
590 // Invoke performs a unary RPC and returns after the response is received
591 // into reply.
592 Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error
593 // NewStream begins a streaming RPC.
594 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
595}
596
597// Assert *ClientConn implements ClientConnInterface.
598var _ ClientConnInterface = (*ClientConn)(nil)
599
Andrea Campanella3614a922021-02-25 12:40:42 +0100600// ClientConn represents a virtual connection to a conceptual endpoint, to
601// perform RPCs.
602//
603// A ClientConn is free to have zero or more actual connections to the endpoint
604// based on configuration, load, etc. It is also free to determine which actual
605// endpoints to use and may change it every RPC, permitting client-side load
606// balancing.
607//
608// A ClientConn encapsulates a range of functionality including name
609// resolution, TCP connection establishment (with retries and backoff) and TLS
610// handshakes. It also handles errors on established connections by
611// re-resolving the name and reconnecting.
khenaidooac637102019-01-14 15:44:34 -0500612type ClientConn struct {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000613 ctx context.Context // Initialized using the background context at dial time.
614 cancel context.CancelFunc // Cancelled on close.
khenaidooac637102019-01-14 15:44:34 -0500615
Abhay Kumara2ae5992025-11-10 14:02:24 +0000616 // The following are initialized at dial time, and are read-only after that.
617 target string // User's dial target.
618 parsedTarget resolver.Target // See initParsedTargetAndResolverBuilder().
619 authority string // See initAuthority().
620 dopts dialOptions // Default and user specified dial options.
621 channelz *channelz.Channel // Channelz object.
622 resolverBuilder resolver.Builder // See initParsedTargetAndResolverBuilder().
623 idlenessMgr *idle.Manager
624 metricsRecorderList *stats.MetricsRecorderList
khenaidooac637102019-01-14 15:44:34 -0500625
Abhay Kumara2ae5992025-11-10 14:02:24 +0000626 // The following provide their own synchronization, and therefore don't
627 // require cc.mu to be held to access them.
628 csMgr *connectivityStateManager
629 pickerWrapper *pickerWrapper
630 safeConfigSelector iresolver.SafeConfigSelector
631 retryThrottler atomic.Value // Updated from service config.
khenaidooac637102019-01-14 15:44:34 -0500632
Abhay Kumara2ae5992025-11-10 14:02:24 +0000633 // mu protects the following fields.
634 // TODO: split mu so the same mutex isn't used for everything.
khenaidooac637102019-01-14 15:44:34 -0500635 mu sync.RWMutex
Abhay Kumara2ae5992025-11-10 14:02:24 +0000636 resolverWrapper *ccResolverWrapper // Always recreated whenever entering idle to simplify Close.
637 balancerWrapper *ccBalancerWrapper // Always recreated whenever entering idle to simplify Close.
638 sc *ServiceConfig // Latest service config received from the resolver.
639 conns map[*addrConn]struct{} // Set to nil on close.
640 keepaliveParams keepalive.ClientParameters // May be updated upon receipt of a GoAway.
641 // firstResolveEvent is used to track whether the name resolver sent us at
642 // least one update. RPCs block on this event. May be accessed without mu
643 // if we know we cannot be asked to enter idle mode while accessing it (e.g.
644 // when the idle manager has already been closed, or if we are already
645 // entering idle mode).
khenaidooac637102019-01-14 15:44:34 -0500646 firstResolveEvent *grpcsync.Event
647
Abhay Kumara2ae5992025-11-10 14:02:24 +0000648 lceMu sync.Mutex // protects lastConnectionError
649 lastConnectionError error
khenaidooac637102019-01-14 15:44:34 -0500650}
651
652// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
653// ctx expires. A true value is returned in former case and false in latter.
khenaidooac637102019-01-14 15:44:34 -0500654func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
655 ch := cc.csMgr.getNotifyChan()
656 if cc.csMgr.getState() != sourceState {
657 return true
658 }
659 select {
660 case <-ctx.Done():
661 return false
662 case <-ch:
663 return true
664 }
665}
666
667// GetState returns the connectivity.State of ClientConn.
khenaidooac637102019-01-14 15:44:34 -0500668func (cc *ClientConn) GetState() connectivity.State {
669 return cc.csMgr.getState()
670}
671
Abhay Kumara2ae5992025-11-10 14:02:24 +0000672// Connect causes all subchannels in the ClientConn to attempt to connect if
673// the channel is idle. Does not wait for the connection attempts to begin
674// before returning.
675//
676// # Experimental
677//
678// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
679// release.
680func (cc *ClientConn) Connect() {
681 if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
682 cc.addTraceEvent(err.Error())
683 return
khenaidooac637102019-01-14 15:44:34 -0500684 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000685 // If the ClientConn was not in idle mode, we need to call ExitIdle on the
686 // LB policy so that connections can be created.
687 cc.mu.Lock()
688 cc.balancerWrapper.exitIdle()
689 cc.mu.Unlock()
khenaidooac637102019-01-14 15:44:34 -0500690}
691
Abhay Kumara2ae5992025-11-10 14:02:24 +0000692// waitForResolvedAddrs blocks until the resolver provides addresses or the
693// context expires, whichever happens first.
694//
695// Error is nil unless the context expires first; otherwise returns a status
696// error based on the context.
697//
698// The returned boolean indicates whether it did block or not. If the
699// resolution has already happened once before, it returns false without
700// blocking. Otherwise, it wait for the resolution and return true if
701// resolution has succeeded or return false along with error if resolution has
702// failed.
703func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) (bool, error) {
khenaidooac637102019-01-14 15:44:34 -0500704 // This is on the RPC path, so we use a fast path to avoid the
705 // more-expensive "select" below after the resolver has returned once.
706 if cc.firstResolveEvent.HasFired() {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000707 return false, nil
khenaidooac637102019-01-14 15:44:34 -0500708 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000709 internal.NewStreamWaitingForResolver()
khenaidooac637102019-01-14 15:44:34 -0500710 select {
711 case <-cc.firstResolveEvent.Done():
Abhay Kumara2ae5992025-11-10 14:02:24 +0000712 return true, nil
khenaidooac637102019-01-14 15:44:34 -0500713 case <-ctx.Done():
Abhay Kumara2ae5992025-11-10 14:02:24 +0000714 return false, status.FromContextError(ctx.Err()).Err()
khenaidooac637102019-01-14 15:44:34 -0500715 case <-cc.ctx.Done():
Abhay Kumara2ae5992025-11-10 14:02:24 +0000716 return false, ErrClientConnClosing
khenaidooac637102019-01-14 15:44:34 -0500717 }
718}
719
Andrea Campanella3614a922021-02-25 12:40:42 +0100720var emptyServiceConfig *ServiceConfig
721
722func init() {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000723 cfg := parseServiceConfig("{}", defaultMaxCallAttempts)
Andrea Campanella3614a922021-02-25 12:40:42 +0100724 if cfg.Err != nil {
725 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
726 }
727 emptyServiceConfig = cfg.Config.(*ServiceConfig)
Abhay Kumara2ae5992025-11-10 14:02:24 +0000728
729 internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
730 return cc.csMgr.pubSub.Subscribe(s)
731 }
732 internal.EnterIdleModeForTesting = func(cc *ClientConn) {
733 cc.idlenessMgr.EnterIdleModeForTesting()
734 }
735 internal.ExitIdleModeForTesting = func(cc *ClientConn) error {
736 return cc.idlenessMgr.ExitIdleMode()
737 }
Andrea Campanella3614a922021-02-25 12:40:42 +0100738}
739
Abhay Kumara2ae5992025-11-10 14:02:24 +0000740func (cc *ClientConn) maybeApplyDefaultServiceConfig() {
Andrea Campanella3614a922021-02-25 12:40:42 +0100741 if cc.sc != nil {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000742 cc.applyServiceConfigAndBalancer(cc.sc, nil)
Andrea Campanella3614a922021-02-25 12:40:42 +0100743 return
744 }
745 if cc.dopts.defaultServiceConfig != nil {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000746 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig})
Andrea Campanella3614a922021-02-25 12:40:42 +0100747 } else {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000748 cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig})
Andrea Campanella3614a922021-02-25 12:40:42 +0100749 }
750}
751
Abhay Kumara2ae5992025-11-10 14:02:24 +0000752func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error {
Andrea Campanella3614a922021-02-25 12:40:42 +0100753 defer cc.firstResolveEvent.Fire()
Scott Baker8461e152019-10-01 14:44:30 -0700754 // Check if the ClientConn is already closed. Some fields (e.g.
755 // balancerWrapper) are set to nil when closing the ClientConn, and could
756 // cause nil pointer panic if we don't have this check.
khenaidooac637102019-01-14 15:44:34 -0500757 if cc.conns == nil {
Andrea Campanella3614a922021-02-25 12:40:42 +0100758 cc.mu.Unlock()
Scott Baker8461e152019-10-01 14:44:30 -0700759 return nil
khenaidooac637102019-01-14 15:44:34 -0500760 }
761
Andrea Campanella3614a922021-02-25 12:40:42 +0100762 if err != nil {
763 // May need to apply the initial service config in case the resolver
764 // doesn't support service configs, or doesn't provide a service config
765 // with the new addresses.
Abhay Kumara2ae5992025-11-10 14:02:24 +0000766 cc.maybeApplyDefaultServiceConfig()
Andrea Campanella3614a922021-02-25 12:40:42 +0100767
Abhay Kumara2ae5992025-11-10 14:02:24 +0000768 cc.balancerWrapper.resolverError(err)
Andrea Campanella3614a922021-02-25 12:40:42 +0100769
770 // No addresses are valid with err set; return early.
771 cc.mu.Unlock()
772 return balancer.ErrBadResolverState
773 }
774
775 var ret error
Abhay Kumara2ae5992025-11-10 14:02:24 +0000776 if cc.dopts.disableServiceConfig {
777 channelz.Infof(logger, cc.channelz, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
778 cc.maybeApplyDefaultServiceConfig()
779 } else if s.ServiceConfig == nil {
780 cc.maybeApplyDefaultServiceConfig()
Andrea Campanella3614a922021-02-25 12:40:42 +0100781 // TODO: do we need to apply a failing LB policy if there is no
782 // default, per the error handling design?
783 } else {
784 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000785 configSelector := iresolver.GetConfigSelector(s)
786 if configSelector != nil {
787 if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
788 channelz.Infof(logger, cc.channelz, "method configs in service config will be ignored due to presence of config selector")
789 }
790 } else {
791 configSelector = &defaultConfigSelector{sc}
792 }
793 cc.applyServiceConfigAndBalancer(sc, configSelector)
Andrea Campanella3614a922021-02-25 12:40:42 +0100794 } else {
795 ret = balancer.ErrBadResolverState
Abhay Kumara2ae5992025-11-10 14:02:24 +0000796 if cc.sc == nil {
797 // Apply the failing LB only if we haven't received valid service config
798 // from the name resolver in the past.
799 cc.applyFailingLBLocked(s.ServiceConfig)
Andrea Campanella3614a922021-02-25 12:40:42 +0100800 cc.mu.Unlock()
801 return ret
802 }
803 }
khenaidooac637102019-01-14 15:44:34 -0500804 }
805
Abhay Kumara2ae5992025-11-10 14:02:24 +0000806 balCfg := cc.sc.lbConfig
Andrea Campanella3614a922021-02-25 12:40:42 +0100807 bw := cc.balancerWrapper
808 cc.mu.Unlock()
Abhay Kumara2ae5992025-11-10 14:02:24 +0000809
Andrea Campanella3614a922021-02-25 12:40:42 +0100810 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
811 if ret == nil {
812 ret = uccsErr // prefer ErrBadResolver state since any other error is
813 // currently meaningless to the caller.
814 }
815 return ret
khenaidooac637102019-01-14 15:44:34 -0500816}
817
Abhay Kumara2ae5992025-11-10 14:02:24 +0000818// applyFailingLBLocked is akin to configuring an LB policy on the channel which
819// always fails RPCs. Here, an actual LB policy is not configured, but an always
820// erroring picker is configured, which returns errors with information about
821// what was invalid in the received service config. A config selector with no
822// service config is configured, and the connectivity state of the channel is
823// set to TransientFailure.
824func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) {
825 var err error
826 if sc.Err != nil {
827 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err)
828 } else {
829 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
khenaidooac637102019-01-14 15:44:34 -0500830 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000831 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
832 cc.pickerWrapper.updatePicker(base.NewErrPicker(err))
833 cc.csMgr.updateState(connectivity.TransientFailure)
khenaidooac637102019-01-14 15:44:34 -0500834}
835
Abhay Kumara2ae5992025-11-10 14:02:24 +0000836// Makes a copy of the input addresses slice. Addresses are passed during
837// subconn creation and address update operations.
838func copyAddresses(in []resolver.Address) []resolver.Address {
839 out := make([]resolver.Address, len(in))
840 copy(out, in)
841 return out
khenaidooac637102019-01-14 15:44:34 -0500842}
843
Abhay Kumara2ae5992025-11-10 14:02:24 +0000844// newAddrConnLocked creates an addrConn for addrs and adds it to cc.conns.
khenaidooac637102019-01-14 15:44:34 -0500845//
846// Caller needs to make sure len(addrs) > 0.
Abhay Kumara2ae5992025-11-10 14:02:24 +0000847func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
848 if cc.conns == nil {
849 return nil, ErrClientConnClosing
850 }
851
khenaidooac637102019-01-14 15:44:34 -0500852 ac := &addrConn{
Abhay Kumara2ae5992025-11-10 14:02:24 +0000853 state: connectivity.Idle,
Stephane Barbarie260a5632019-02-26 16:12:49 -0500854 cc: cc,
Abhay Kumara2ae5992025-11-10 14:02:24 +0000855 addrs: copyAddresses(addrs),
Stephane Barbarie260a5632019-02-26 16:12:49 -0500856 scopts: opts,
857 dopts: cc.dopts,
Abhay Kumara2ae5992025-11-10 14:02:24 +0000858 channelz: channelz.RegisterSubChannel(cc.channelz, ""),
Stephane Barbarie260a5632019-02-26 16:12:49 -0500859 resetBackoff: make(chan struct{}),
khenaidooac637102019-01-14 15:44:34 -0500860 }
861 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
Abhay Kumara2ae5992025-11-10 14:02:24 +0000862 // Start with our address set to the first address; this may be updated if
863 // we connect to different addresses.
864 ac.channelz.ChannelMetrics.Target.Store(&addrs[0].Addr)
865
866 channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
867 Desc: "Subchannel created",
868 Severity: channelz.CtInfo,
869 Parent: &channelz.TraceEvent{
870 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelz.ID),
871 Severity: channelz.CtInfo,
872 },
873 })
874
khenaidooac637102019-01-14 15:44:34 -0500875 // Track ac in cc. This needs to be done before any getTransport(...) is called.
khenaidooac637102019-01-14 15:44:34 -0500876 cc.conns[ac] = struct{}{}
khenaidooac637102019-01-14 15:44:34 -0500877 return ac, nil
878}
879
880// removeAddrConn removes the addrConn in the subConn from clientConn.
881// It also tears down the ac with the given error.
882func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
883 cc.mu.Lock()
884 if cc.conns == nil {
885 cc.mu.Unlock()
886 return
887 }
888 delete(cc.conns, ac)
889 cc.mu.Unlock()
890 ac.tearDown(err)
891}
892
khenaidooac637102019-01-14 15:44:34 -0500893// Target returns the target string of the ClientConn.
khenaidooac637102019-01-14 15:44:34 -0500894func (cc *ClientConn) Target() string {
895 return cc.target
896}
897
Abhay Kumara2ae5992025-11-10 14:02:24 +0000898// CanonicalTarget returns the canonical target string used when creating cc.
899//
900// This always has the form "<scheme>://[authority]/<endpoint>". For example:
901//
902// - "dns:///example.com:42"
903// - "dns://8.8.8.8/example.com:42"
904// - "unix:///path/to/socket"
905func (cc *ClientConn) CanonicalTarget() string {
906 return cc.parsedTarget.String()
907}
908
khenaidooac637102019-01-14 15:44:34 -0500909func (cc *ClientConn) incrCallsStarted() {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000910 cc.channelz.ChannelMetrics.CallsStarted.Add(1)
911 cc.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
khenaidooac637102019-01-14 15:44:34 -0500912}
913
914func (cc *ClientConn) incrCallsSucceeded() {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000915 cc.channelz.ChannelMetrics.CallsSucceeded.Add(1)
khenaidooac637102019-01-14 15:44:34 -0500916}
917
918func (cc *ClientConn) incrCallsFailed() {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000919 cc.channelz.ChannelMetrics.CallsFailed.Add(1)
khenaidooac637102019-01-14 15:44:34 -0500920}
921
922// connect starts creating a transport.
923// It does nothing if the ac is not IDLE.
924// TODO(bar) Move this to the addrConn section.
925func (ac *addrConn) connect() error {
926 ac.mu.Lock()
927 if ac.state == connectivity.Shutdown {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000928 if logger.V(2) {
929 logger.Infof("connect called on shutdown addrConn; ignoring.")
930 }
khenaidooac637102019-01-14 15:44:34 -0500931 ac.mu.Unlock()
932 return errConnClosing
933 }
934 if ac.state != connectivity.Idle {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000935 if logger.V(2) {
936 logger.Infof("connect called on addrConn in non-idle state (%v); ignoring.", ac.state)
937 }
khenaidooac637102019-01-14 15:44:34 -0500938 ac.mu.Unlock()
939 return nil
940 }
khenaidooac637102019-01-14 15:44:34 -0500941
Abhay Kumara2ae5992025-11-10 14:02:24 +0000942 ac.resetTransportAndUnlock()
khenaidooac637102019-01-14 15:44:34 -0500943 return nil
944}
945
Abhay Kumara2ae5992025-11-10 14:02:24 +0000946// equalAddressIgnoringBalAttributes returns true is a and b are considered equal.
947// This is different from the Equal method on the resolver.Address type which
948// considers all fields to determine equality. Here, we only consider fields
949// that are meaningful to the subConn.
950func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
951 return a.Addr == b.Addr && a.ServerName == b.ServerName &&
952 a.Attributes.Equal(b.Attributes) &&
953 a.Metadata == b.Metadata
954}
955
956func equalAddressesIgnoringBalAttributes(a, b []resolver.Address) bool {
957 return slices.EqualFunc(a, b, func(a, b resolver.Address) bool { return equalAddressIgnoringBalAttributes(&a, &b) })
958}
959
960// updateAddrs updates ac.addrs with the new addresses list and handles active
961// connections or connection attempts.
962func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
963 addrs = copyAddresses(addrs)
964 limit := len(addrs)
965 if limit > 5 {
966 limit = 5
967 }
968 channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), addrs[:limit])
969
khenaidooac637102019-01-14 15:44:34 -0500970 ac.mu.Lock()
Abhay Kumara2ae5992025-11-10 14:02:24 +0000971 if equalAddressesIgnoringBalAttributes(ac.addrs, addrs) {
972 ac.mu.Unlock()
973 return
974 }
975
976 ac.addrs = addrs
977
Scott Baker8461e152019-10-01 14:44:30 -0700978 if ac.state == connectivity.Shutdown ||
979 ac.state == connectivity.TransientFailure ||
980 ac.state == connectivity.Idle {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000981 // We were not connecting, so do nothing but update the addresses.
982 ac.mu.Unlock()
983 return
khenaidooac637102019-01-14 15:44:34 -0500984 }
985
Abhay Kumara2ae5992025-11-10 14:02:24 +0000986 if ac.state == connectivity.Ready {
987 // Try to find the connected address.
988 for _, a := range addrs {
989 a.ServerName = ac.cc.getServerName(a)
990 if equalAddressIgnoringBalAttributes(&a, &ac.curAddr) {
991 // We are connected to a valid address, so do nothing but
992 // update the addresses.
993 ac.mu.Unlock()
994 return
995 }
khenaidooac637102019-01-14 15:44:34 -0500996 }
997 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000998
999 // We are either connected to the wrong address or currently connecting.
1000 // Stop the current iteration and restart.
1001
1002 ac.cancel()
1003 ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)
1004
1005 // We have to defer here because GracefulClose => onClose, which requires
1006 // locking ac.mu.
1007 if ac.transport != nil {
1008 defer ac.transport.GracefulClose()
1009 ac.transport = nil
khenaidooac637102019-01-14 15:44:34 -05001010 }
1011
Abhay Kumara2ae5992025-11-10 14:02:24 +00001012 if len(addrs) == 0 {
1013 ac.updateConnectivityState(connectivity.Idle, nil)
1014 }
1015
1016 // Since we were connecting/connected, we should start a new connection
1017 // attempt.
1018 go ac.resetTransportAndUnlock()
1019}
1020
1021// getServerName determines the serverName to be used in the connection
1022// handshake. The default value for the serverName is the authority on the
1023// ClientConn, which either comes from the user's dial target or through an
1024// authority override specified using the WithAuthority dial option. Name
1025// resolvers can specify a per-address override for the serverName through the
1026// resolver.Address.ServerName field which is used only if the WithAuthority
1027// dial option was not used. The rationale is that per-address authority
1028// overrides specified by the name resolver can represent a security risk, while
1029// an override specified by the user is more dependable since they probably know
1030// what they are doing.
1031func (cc *ClientConn) getServerName(addr resolver.Address) string {
1032 if cc.dopts.authority != "" {
1033 return cc.dopts.authority
1034 }
1035 if addr.ServerName != "" {
1036 return addr.ServerName
1037 }
1038 return cc.authority
1039}
1040
1041func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
1042 if sc == nil {
1043 return MethodConfig{}
1044 }
1045 if m, ok := sc.Methods[method]; ok {
1046 return m
1047 }
1048 i := strings.LastIndex(method, "/")
1049 if m, ok := sc.Methods[method[:i+1]]; ok {
1050 return m
1051 }
1052 return sc.Methods[""]
khenaidooac637102019-01-14 15:44:34 -05001053}
1054
1055// GetMethodConfig gets the method config of the input method.
1056// If there's an exact match for input method (i.e. /service/method), we return
1057// the corresponding MethodConfig.
Abhay Kumara2ae5992025-11-10 14:02:24 +00001058// If there isn't an exact match for the input method, we look for the service's default
1059// config under the service (i.e /service/) and then for the default for all services (empty string).
1060//
1061// If there is a default MethodConfig for the service, we return it.
khenaidooac637102019-01-14 15:44:34 -05001062// Otherwise, we return an empty MethodConfig.
1063func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
1064 // TODO: Avoid the locking here.
1065 cc.mu.RLock()
1066 defer cc.mu.RUnlock()
Abhay Kumara2ae5992025-11-10 14:02:24 +00001067 return getMethodConfig(cc.sc, method)
khenaidooac637102019-01-14 15:44:34 -05001068}
1069
1070func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
1071 cc.mu.RLock()
1072 defer cc.mu.RUnlock()
Scott Baker8461e152019-10-01 14:44:30 -07001073 if cc.sc == nil {
1074 return nil
1075 }
khenaidooac637102019-01-14 15:44:34 -05001076 return cc.sc.healthCheckConfig
1077}
1078
Abhay Kumara2ae5992025-11-10 14:02:24 +00001079func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector) {
Scott Baker8461e152019-10-01 14:44:30 -07001080 if sc == nil {
1081 // should never reach here.
Andrea Campanella3614a922021-02-25 12:40:42 +01001082 return
khenaidooac637102019-01-14 15:44:34 -05001083 }
khenaidooac637102019-01-14 15:44:34 -05001084 cc.sc = sc
Abhay Kumara2ae5992025-11-10 14:02:24 +00001085 if configSelector != nil {
1086 cc.safeConfigSelector.UpdateConfigSelector(configSelector)
1087 }
khenaidooac637102019-01-14 15:44:34 -05001088
Scott Baker8461e152019-10-01 14:44:30 -07001089 if cc.sc.retryThrottling != nil {
khenaidooac637102019-01-14 15:44:34 -05001090 newThrottler := &retryThrottler{
Scott Baker8461e152019-10-01 14:44:30 -07001091 tokens: cc.sc.retryThrottling.MaxTokens,
1092 max: cc.sc.retryThrottling.MaxTokens,
1093 thresh: cc.sc.retryThrottling.MaxTokens / 2,
1094 ratio: cc.sc.retryThrottling.TokenRatio,
khenaidooac637102019-01-14 15:44:34 -05001095 }
1096 cc.retryThrottler.Store(newThrottler)
1097 } else {
1098 cc.retryThrottler.Store((*retryThrottler)(nil))
1099 }
khenaidooac637102019-01-14 15:44:34 -05001100}
1101
Abhay Kumara2ae5992025-11-10 14:02:24 +00001102func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
khenaidooac637102019-01-14 15:44:34 -05001103 cc.mu.RLock()
Abhay Kumara2ae5992025-11-10 14:02:24 +00001104 cc.resolverWrapper.resolveNow(o)
khenaidooac637102019-01-14 15:44:34 -05001105 cc.mu.RUnlock()
Abhay Kumara2ae5992025-11-10 14:02:24 +00001106}
1107
1108func (cc *ClientConn) resolveNowLocked(o resolver.ResolveNowOptions) {
1109 cc.resolverWrapper.resolveNow(o)
khenaidooac637102019-01-14 15:44:34 -05001110}
1111
1112// ResetConnectBackoff wakes up all subchannels in transient failure and causes
1113// them to attempt another connection immediately. It also resets the backoff
1114// times used for subsequent attempts regardless of the current state.
1115//
1116// In general, this function should not be used. Typical service or network
1117// outages result in a reasonable client reconnection strategy by default.
1118// However, if a previously unavailable network becomes available, this may be
1119// used to trigger an immediate reconnect.
1120//
Abhay Kumara2ae5992025-11-10 14:02:24 +00001121// # Experimental
1122//
1123// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1124// later release.
khenaidooac637102019-01-14 15:44:34 -05001125func (cc *ClientConn) ResetConnectBackoff() {
1126 cc.mu.Lock()
Andrea Campanella3614a922021-02-25 12:40:42 +01001127 conns := cc.conns
1128 cc.mu.Unlock()
1129 for ac := range conns {
khenaidooac637102019-01-14 15:44:34 -05001130 ac.resetConnectBackoff()
1131 }
1132}
1133
1134// Close tears down the ClientConn and all underlying connections.
1135func (cc *ClientConn) Close() error {
Abhay Kumara2ae5992025-11-10 14:02:24 +00001136 defer func() {
1137 cc.cancel()
1138 <-cc.csMgr.pubSub.Done()
1139 }()
1140
1141 // Prevent calls to enter/exit idle immediately, and ensure we are not
1142 // currently entering/exiting idle mode.
1143 cc.idlenessMgr.Close()
khenaidooac637102019-01-14 15:44:34 -05001144
1145 cc.mu.Lock()
1146 if cc.conns == nil {
1147 cc.mu.Unlock()
1148 return ErrClientConnClosing
1149 }
Abhay Kumara2ae5992025-11-10 14:02:24 +00001150
khenaidooac637102019-01-14 15:44:34 -05001151 conns := cc.conns
1152 cc.conns = nil
1153 cc.csMgr.updateState(connectivity.Shutdown)
1154
Abhay Kumara2ae5992025-11-10 14:02:24 +00001155 // We can safely unlock and continue to access all fields now as
1156 // cc.conns==nil, preventing any further operations on cc.
khenaidooac637102019-01-14 15:44:34 -05001157 cc.mu.Unlock()
1158
Abhay Kumara2ae5992025-11-10 14:02:24 +00001159 cc.resolverWrapper.close()
1160 // The order of closing matters here since the balancer wrapper assumes the
1161 // picker is closed before it is closed.
1162 cc.pickerWrapper.close()
1163 cc.balancerWrapper.close()
khenaidooac637102019-01-14 15:44:34 -05001164
Abhay Kumara2ae5992025-11-10 14:02:24 +00001165 <-cc.resolverWrapper.serializer.Done()
1166 <-cc.balancerWrapper.serializer.Done()
1167 var wg sync.WaitGroup
khenaidooac637102019-01-14 15:44:34 -05001168 for ac := range conns {
Abhay Kumara2ae5992025-11-10 14:02:24 +00001169 wg.Add(1)
1170 go func(ac *addrConn) {
1171 defer wg.Done()
1172 ac.tearDown(ErrClientConnClosing)
1173 }(ac)
khenaidooac637102019-01-14 15:44:34 -05001174 }
Abhay Kumara2ae5992025-11-10 14:02:24 +00001175 wg.Wait()
1176 cc.addTraceEvent("deleted")
1177 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
1178 // trace reference to the entity being deleted, and thus prevent it from being
1179 // deleted right away.
1180 channelz.RemoveEntry(cc.channelz.ID)
1181
khenaidooac637102019-01-14 15:44:34 -05001182 return nil
1183}
1184
1185// addrConn is a network connection to a given address.
1186type addrConn struct {
1187 ctx context.Context
1188 cancel context.CancelFunc
1189
1190 cc *ClientConn
1191 dopts dialOptions
Abhay Kumara2ae5992025-11-10 14:02:24 +00001192 acbw *acBalancerWrapper
khenaidooac637102019-01-14 15:44:34 -05001193 scopts balancer.NewSubConnOptions
1194
1195 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1196 // health checking may require server to report healthy to set ac to READY), and is reset
1197 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1198 // is received, transport is closed, ac has been torn down).
1199 transport transport.ClientTransport // The current transport.
1200
Abhay Kumara2ae5992025-11-10 14:02:24 +00001201 // This mutex is used on the RPC path, so its usage should be minimized as
1202 // much as possible.
1203 // TODO: Find a lock-free way to retrieve the transport and state from the
1204 // addrConn.
khenaidooac637102019-01-14 15:44:34 -05001205 mu sync.Mutex
khenaidooac637102019-01-14 15:44:34 -05001206 curAddr resolver.Address // The current address.
1207 addrs []resolver.Address // All addresses that the resolver resolved to.
1208
1209 // Use updateConnectivityState for updating addrConn's connectivity state.
1210 state connectivity.State
1211
Stephane Barbarie260a5632019-02-26 16:12:49 -05001212 backoffIdx int // Needs to be stateful for resetConnectBackoff.
khenaidooac637102019-01-14 15:44:34 -05001213 resetBackoff chan struct{}
1214
Abhay Kumara2ae5992025-11-10 14:02:24 +00001215 channelz *channelz.SubChannel
khenaidooac637102019-01-14 15:44:34 -05001216}
1217
1218// Note: this requires a lock on ac.mu.
Abhay Kumara2ae5992025-11-10 14:02:24 +00001219func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
Stephane Barbarie260a5632019-02-26 16:12:49 -05001220 if ac.state == s {
1221 return
1222 }
khenaidooac637102019-01-14 15:44:34 -05001223 ac.state = s
Abhay Kumara2ae5992025-11-10 14:02:24 +00001224 ac.channelz.ChannelMetrics.State.Store(&s)
1225 if lastErr == nil {
1226 channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v", s)
1227 } else {
1228 channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
khenaidooac637102019-01-14 15:44:34 -05001229 }
Abhay Kumara2ae5992025-11-10 14:02:24 +00001230 ac.acbw.updateState(s, ac.curAddr, lastErr)
khenaidooac637102019-01-14 15:44:34 -05001231}
1232
1233// adjustParams updates parameters used to create transports upon
1234// receiving a GoAway.
1235func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
Abhay Kumara2ae5992025-11-10 14:02:24 +00001236 if r == transport.GoAwayTooManyPings {
khenaidooac637102019-01-14 15:44:34 -05001237 v := 2 * ac.dopts.copts.KeepaliveParams.Time
1238 ac.cc.mu.Lock()
Abhay Kumara2ae5992025-11-10 14:02:24 +00001239 if v > ac.cc.keepaliveParams.Time {
1240 ac.cc.keepaliveParams.Time = v
khenaidooac637102019-01-14 15:44:34 -05001241 }
1242 ac.cc.mu.Unlock()
1243 }
1244}
1245
Abhay Kumara2ae5992025-11-10 14:02:24 +00001246// resetTransportAndUnlock unconditionally connects the addrConn.
1247//
1248// ac.mu must be held by the caller, and this function will guarantee it is released.
1249func (ac *addrConn) resetTransportAndUnlock() {
1250 acCtx := ac.ctx
1251 if acCtx.Err() != nil {
khenaidooac637102019-01-14 15:44:34 -05001252 ac.mu.Unlock()
Abhay Kumara2ae5992025-11-10 14:02:24 +00001253 return
khenaidooac637102019-01-14 15:44:34 -05001254 }
Abhay Kumara2ae5992025-11-10 14:02:24 +00001255
1256 addrs := ac.addrs
1257 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1258 // This will be the duration that dial gets to finish.
1259 dialDuration := minConnectTimeout
1260 if ac.dopts.minConnectTimeout != nil {
1261 dialDuration = ac.dopts.minConnectTimeout()
1262 }
1263
1264 if dialDuration < backoffFor {
1265 // Give dial more time as we keep failing to connect.
1266 dialDuration = backoffFor
1267 }
1268 // We can potentially spend all the time trying the first address, and
1269 // if the server accepts the connection and then hangs, the following
1270 // addresses will never be tried.
1271 //
1272 // The spec doesn't mention what should be done for multiple addresses.
1273 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1274 connectDeadline := time.Now().Add(dialDuration)
1275
1276 ac.updateConnectivityState(connectivity.Connecting, nil)
1277 ac.mu.Unlock()
1278
1279 if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
1280 // TODO: #7534 - Move re-resolution requests into the pick_first LB policy
1281 // to ensure one resolution request per pass instead of per subconn failure.
1282 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1283 ac.mu.Lock()
1284 if acCtx.Err() != nil {
1285 // addrConn was torn down.
1286 ac.mu.Unlock()
1287 return
1288 }
1289 // After exhausting all addresses, the addrConn enters
1290 // TRANSIENT_FAILURE.
1291 ac.updateConnectivityState(connectivity.TransientFailure, err)
1292
1293 // Backoff.
1294 b := ac.resetBackoff
1295 ac.mu.Unlock()
1296
1297 timer := time.NewTimer(backoffFor)
1298 select {
1299 case <-timer.C:
1300 ac.mu.Lock()
1301 ac.backoffIdx++
1302 ac.mu.Unlock()
1303 case <-b:
1304 timer.Stop()
1305 case <-acCtx.Done():
1306 timer.Stop()
1307 return
1308 }
1309
1310 ac.mu.Lock()
1311 if acCtx.Err() == nil {
1312 ac.updateConnectivityState(connectivity.Idle, err)
1313 }
1314 ac.mu.Unlock()
1315 return
1316 }
1317 // Success; reset backoff.
1318 ac.mu.Lock()
1319 ac.backoffIdx = 0
1320 ac.mu.Unlock()
khenaidooac637102019-01-14 15:44:34 -05001321}
1322
Abhay Kumara2ae5992025-11-10 14:02:24 +00001323// tryAllAddrs tries to create a connection to the addresses, and stop when at
1324// the first successful one. It returns an error if no address was successfully
1325// connected, or updates ac appropriately with the new transport.
1326func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
1327 var firstConnErr error
Scott Baker8461e152019-10-01 14:44:30 -07001328 for _, addr := range addrs {
Abhay Kumara2ae5992025-11-10 14:02:24 +00001329 ac.channelz.ChannelMetrics.Target.Store(&addr.Addr)
1330 if ctx.Err() != nil {
1331 return errConnClosing
Scott Baker8461e152019-10-01 14:44:30 -07001332 }
Abhay Kumara2ae5992025-11-10 14:02:24 +00001333 ac.mu.Lock()
Scott Baker8461e152019-10-01 14:44:30 -07001334
1335 ac.cc.mu.RLock()
Abhay Kumara2ae5992025-11-10 14:02:24 +00001336 ac.dopts.copts.KeepaliveParams = ac.cc.keepaliveParams
Scott Baker8461e152019-10-01 14:44:30 -07001337 ac.cc.mu.RUnlock()
1338
1339 copts := ac.dopts.copts
1340 if ac.scopts.CredsBundle != nil {
1341 copts.CredsBundle = ac.scopts.CredsBundle
1342 }
1343 ac.mu.Unlock()
1344
Abhay Kumara2ae5992025-11-10 14:02:24 +00001345 channelz.Infof(logger, ac.channelz, "Subchannel picks a new address %q to connect", addr.Addr)
Scott Baker8461e152019-10-01 14:44:30 -07001346
Abhay Kumara2ae5992025-11-10 14:02:24 +00001347 err := ac.createTransport(ctx, addr, copts, connectDeadline)
Scott Baker8461e152019-10-01 14:44:30 -07001348 if err == nil {
Abhay Kumara2ae5992025-11-10 14:02:24 +00001349 return nil
Scott Baker8461e152019-10-01 14:44:30 -07001350 }
Abhay Kumara2ae5992025-11-10 14:02:24 +00001351 if firstConnErr == nil {
1352 firstConnErr = err
1353 }
1354 ac.cc.updateConnectionError(err)
Scott Baker8461e152019-10-01 14:44:30 -07001355 }
1356
1357 // Couldn't connect to any address.
Abhay Kumara2ae5992025-11-10 14:02:24 +00001358 return firstConnErr
Scott Baker8461e152019-10-01 14:44:30 -07001359}
1360
Abhay Kumara2ae5992025-11-10 14:02:24 +00001361// createTransport creates a connection to addr. It returns an error if the
1362// address was not successfully connected, or updates ac appropriately with the
1363// new transport.
1364func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
1365 addr.ServerName = ac.cc.getServerName(addr)
1366 hctx, hcancel := context.WithCancel(ctx)
khenaidooac637102019-01-14 15:44:34 -05001367
Abhay Kumara2ae5992025-11-10 14:02:24 +00001368 onClose := func(r transport.GoAwayReason) {
Stephane Barbarie260a5632019-02-26 16:12:49 -05001369 ac.mu.Lock()
Abhay Kumara2ae5992025-11-10 14:02:24 +00001370 defer ac.mu.Unlock()
1371 // adjust params based on GoAwayReason
Stephane Barbarie260a5632019-02-26 16:12:49 -05001372 ac.adjustParams(r)
Abhay Kumara2ae5992025-11-10 14:02:24 +00001373 if ctx.Err() != nil {
1374 // Already shut down or connection attempt canceled. tearDown() or
1375 // updateAddrs() already cleared the transport and canceled hctx
1376 // via ac.ctx, and we expected this connection to be closed, so do
1377 // nothing here.
1378 return
1379 }
1380 hcancel()
1381 if ac.transport == nil {
1382 // We're still connecting to this address, which could error. Do
1383 // not update the connectivity state or resolve; these will happen
1384 // at the end of the tryAllAddrs connection loop in the event of an
1385 // error.
1386 return
1387 }
1388 ac.transport = nil
1389 // Refresh the name resolver on any connection loss.
1390 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1391 // Always go idle and wait for the LB policy to initiate a new
1392 // connection attempt.
1393 ac.updateConnectivityState(connectivity.Idle, nil)
Stephane Barbarie260a5632019-02-26 16:12:49 -05001394 }
1395
Abhay Kumara2ae5992025-11-10 14:02:24 +00001396 connectCtx, cancel := context.WithDeadline(ctx, connectDeadline)
khenaidooac637102019-01-14 15:44:34 -05001397 defer cancel()
Abhay Kumara2ae5992025-11-10 14:02:24 +00001398 copts.ChannelzParent = ac.channelz
khenaidooac637102019-01-14 15:44:34 -05001399
Abhay Kumara2ae5992025-11-10 14:02:24 +00001400 newTr, err := transport.NewHTTP2Client(connectCtx, ac.cc.ctx, addr, copts, onClose)
khenaidooac637102019-01-14 15:44:34 -05001401 if err != nil {
Abhay Kumara2ae5992025-11-10 14:02:24 +00001402 if logger.V(2) {
1403 logger.Infof("Creating new client transport to %q: %v", addr, err)
1404 }
khenaidooac637102019-01-14 15:44:34 -05001405 // newTr is either nil, or closed.
Abhay Kumara2ae5992025-11-10 14:02:24 +00001406 hcancel()
1407 channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
1408 return err
khenaidooac637102019-01-14 15:44:34 -05001409 }
1410
Abhay Kumara2ae5992025-11-10 14:02:24 +00001411 ac.mu.Lock()
1412 defer ac.mu.Unlock()
1413 if ctx.Err() != nil {
1414 // This can happen if the subConn was removed while in `Connecting`
1415 // state. tearDown() would have set the state to `Shutdown`, but
1416 // would not have closed the transport since ac.transport would not
1417 // have been set at that point.
1418 //
1419 // We run this in a goroutine because newTr.Close() calls onClose()
1420 // inline, which requires locking ac.mu.
1421 //
1422 // The error we pass to Close() is immaterial since there are no open
1423 // streams at this point, so no trailers with error details will be sent
1424 // out. We just need to pass a non-nil error.
1425 //
1426 // This can also happen when updateAddrs is called during a connection
1427 // attempt.
1428 go newTr.Close(transport.ErrConnClosing)
1429 return nil
khenaidooac637102019-01-14 15:44:34 -05001430 }
Abhay Kumara2ae5992025-11-10 14:02:24 +00001431 if hctx.Err() != nil {
1432 // onClose was already called for this connection, but the connection
1433 // was successfully established first. Consider it a success and set
1434 // the new state to Idle.
1435 ac.updateConnectivityState(connectivity.Idle, nil)
1436 return nil
1437 }
1438 ac.curAddr = addr
1439 ac.transport = newTr
1440 ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
1441 return nil
khenaidooac637102019-01-14 15:44:34 -05001442}
1443
Scott Baker8461e152019-10-01 14:44:30 -07001444// startHealthCheck starts the health checking stream (RPC) to watch the health
1445// stats of this connection if health checking is requested and configured.
1446//
1447// LB channel health checking is enabled when all requirements below are met:
1448// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
Abhay Kumara2ae5992025-11-10 14:02:24 +00001449// 2. internal.HealthCheckFunc is set by importing the grpc/health package
Scott Baker8461e152019-10-01 14:44:30 -07001450// 3. a service config with non-empty healthCheckConfig field is provided
1451// 4. the load balancer requests it
1452//
1453// It sets addrConn to READY if the health checking stream is not started.
1454//
1455// Caller must hold ac.mu.
1456func (ac *addrConn) startHealthCheck(ctx context.Context) {
1457 var healthcheckManagingState bool
1458 defer func() {
1459 if !healthcheckManagingState {
Abhay Kumara2ae5992025-11-10 14:02:24 +00001460 ac.updateConnectivityState(connectivity.Ready, nil)
Scott Baker8461e152019-10-01 14:44:30 -07001461 }
1462 }()
1463
1464 if ac.cc.dopts.disableHealthCheck {
1465 return
khenaidooac637102019-01-14 15:44:34 -05001466 }
Scott Baker8461e152019-10-01 14:44:30 -07001467 healthCheckConfig := ac.cc.healthCheckConfig()
1468 if healthCheckConfig == nil {
1469 return
1470 }
1471 if !ac.scopts.HealthCheckEnabled {
1472 return
1473 }
Abhay Kumara2ae5992025-11-10 14:02:24 +00001474 healthCheckFunc := internal.HealthCheckFunc
Scott Baker8461e152019-10-01 14:44:30 -07001475 if healthCheckFunc == nil {
1476 // The health package is not imported to set health check function.
1477 //
1478 // TODO: add a link to the health check doc in the error message.
Abhay Kumara2ae5992025-11-10 14:02:24 +00001479 channelz.Error(logger, ac.channelz, "Health check is requested but health check function is not set.")
Scott Baker8461e152019-10-01 14:44:30 -07001480 return
1481 }
1482
1483 healthcheckManagingState = true
1484
1485 // Set up the health check helper functions.
1486 currentTr := ac.transport
Abhay Kumara2ae5992025-11-10 14:02:24 +00001487 newStream := func(method string) (any, error) {
Scott Baker8461e152019-10-01 14:44:30 -07001488 ac.mu.Lock()
1489 if ac.transport != currentTr {
1490 ac.mu.Unlock()
1491 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1492 }
1493 ac.mu.Unlock()
1494 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1495 }
Abhay Kumara2ae5992025-11-10 14:02:24 +00001496 setConnectivityState := func(s connectivity.State, lastErr error) {
khenaidooac637102019-01-14 15:44:34 -05001497 ac.mu.Lock()
1498 defer ac.mu.Unlock()
Scott Baker8461e152019-10-01 14:44:30 -07001499 if ac.transport != currentTr {
khenaidooac637102019-01-14 15:44:34 -05001500 return
1501 }
Abhay Kumara2ae5992025-11-10 14:02:24 +00001502 ac.updateConnectivityState(s, lastErr)
khenaidooac637102019-01-14 15:44:34 -05001503 }
Scott Baker8461e152019-10-01 14:44:30 -07001504 // Start the health checking stream.
1505 go func() {
Abhay Kumara2ae5992025-11-10 14:02:24 +00001506 err := healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
Scott Baker8461e152019-10-01 14:44:30 -07001507 if err != nil {
1508 if status.Code(err) == codes.Unimplemented {
Abhay Kumara2ae5992025-11-10 14:02:24 +00001509 channelz.Error(logger, ac.channelz, "Subchannel health check is unimplemented at server side, thus health check is disabled")
Scott Baker8461e152019-10-01 14:44:30 -07001510 } else {
Abhay Kumara2ae5992025-11-10 14:02:24 +00001511 channelz.Errorf(logger, ac.channelz, "Health checking failed: %v", err)
khenaidooac637102019-01-14 15:44:34 -05001512 }
khenaidooac637102019-01-14 15:44:34 -05001513 }
Scott Baker8461e152019-10-01 14:44:30 -07001514 }()
khenaidooac637102019-01-14 15:44:34 -05001515}
1516
khenaidooac637102019-01-14 15:44:34 -05001517func (ac *addrConn) resetConnectBackoff() {
1518 ac.mu.Lock()
1519 close(ac.resetBackoff)
1520 ac.backoffIdx = 0
1521 ac.resetBackoff = make(chan struct{})
1522 ac.mu.Unlock()
1523}
1524
Abhay Kumara2ae5992025-11-10 14:02:24 +00001525// getReadyTransport returns the transport if ac's state is READY or nil if not.
1526func (ac *addrConn) getReadyTransport() transport.ClientTransport {
khenaidooac637102019-01-14 15:44:34 -05001527 ac.mu.Lock()
Abhay Kumara2ae5992025-11-10 14:02:24 +00001528 defer ac.mu.Unlock()
1529 if ac.state == connectivity.Ready {
1530 return ac.transport
khenaidooac637102019-01-14 15:44:34 -05001531 }
Abhay Kumara2ae5992025-11-10 14:02:24 +00001532 return nil
khenaidooac637102019-01-14 15:44:34 -05001533}
1534
1535// tearDown starts to tear down the addrConn.
Abhay Kumara2ae5992025-11-10 14:02:24 +00001536//
1537// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
1538// will leak. In most cases, call cc.removeAddrConn() instead.
khenaidooac637102019-01-14 15:44:34 -05001539func (ac *addrConn) tearDown(err error) {
1540 ac.mu.Lock()
1541 if ac.state == connectivity.Shutdown {
1542 ac.mu.Unlock()
1543 return
1544 }
1545 curTr := ac.transport
1546 ac.transport = nil
1547 // We have to set the state to Shutdown before anything else to prevent races
Andrea Campanella3614a922021-02-25 12:40:42 +01001548 // between setting the state and logic that waits on context cancellation / etc.
Abhay Kumara2ae5992025-11-10 14:02:24 +00001549 ac.updateConnectivityState(connectivity.Shutdown, nil)
khenaidooac637102019-01-14 15:44:34 -05001550 ac.cancel()
khenaidooac637102019-01-14 15:44:34 -05001551 ac.curAddr = resolver.Address{}
Abhay Kumara2ae5992025-11-10 14:02:24 +00001552
1553 channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
1554 Desc: "Subchannel deleted",
1555 Severity: channelz.CtInfo,
1556 Parent: &channelz.TraceEvent{
1557 Desc: fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelz.ID),
1558 Severity: channelz.CtInfo,
1559 },
1560 })
1561 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
1562 // trace reference to the entity being deleted, and thus prevent it from
1563 // being deleted right away.
1564 channelz.RemoveEntry(ac.channelz.ID)
khenaidooac637102019-01-14 15:44:34 -05001565 ac.mu.Unlock()
khenaidooac637102019-01-14 15:44:34 -05001566
Abhay Kumara2ae5992025-11-10 14:02:24 +00001567 // We have to release the lock before the call to GracefulClose/Close here
1568 // because both of them call onClose(), which requires locking ac.mu.
1569 if curTr != nil {
1570 if err == errConnDrain {
1571 // Close the transport gracefully when the subConn is being shutdown.
1572 //
1573 // GracefulClose() may be executed multiple times if:
1574 // - multiple GoAway frames are received from the server
1575 // - there are concurrent name resolver or balancer triggered
1576 // address removal and GoAway
1577 curTr.GracefulClose()
1578 } else {
1579 // Hard close the transport when the channel is entering idle or is
1580 // being shutdown. In the case where the channel is being shutdown,
1581 // closing of transports is also taken care of by cancellation of cc.ctx.
1582 // But in the case where the channel is entering idle, we need to
1583 // explicitly close the transports here. Instead of distinguishing
1584 // between these two cases, it is simpler to close the transport
1585 // unconditionally here.
1586 curTr.Close(err)
1587 }
khenaidooac637102019-01-14 15:44:34 -05001588 }
1589}
1590
khenaidooac637102019-01-14 15:44:34 -05001591type retryThrottler struct {
1592 max float64
1593 thresh float64
1594 ratio float64
1595
1596 mu sync.Mutex
1597 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1598}
1599
1600// throttle subtracts a retry token from the pool and returns whether a retry
1601// should be throttled (disallowed) based upon the retry throttling policy in
1602// the service config.
1603func (rt *retryThrottler) throttle() bool {
1604 if rt == nil {
1605 return false
1606 }
1607 rt.mu.Lock()
1608 defer rt.mu.Unlock()
1609 rt.tokens--
1610 if rt.tokens < 0 {
1611 rt.tokens = 0
1612 }
1613 return rt.tokens <= rt.thresh
1614}
1615
1616func (rt *retryThrottler) successfulRPC() {
1617 if rt == nil {
1618 return
1619 }
1620 rt.mu.Lock()
1621 defer rt.mu.Unlock()
1622 rt.tokens += rt.ratio
1623 if rt.tokens > rt.max {
1624 rt.tokens = rt.max
1625 }
1626}
1627
Abhay Kumara2ae5992025-11-10 14:02:24 +00001628func (ac *addrConn) incrCallsStarted() {
1629 ac.channelz.ChannelMetrics.CallsStarted.Add(1)
1630 ac.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
khenaidooac637102019-01-14 15:44:34 -05001631}
1632
Abhay Kumara2ae5992025-11-10 14:02:24 +00001633func (ac *addrConn) incrCallsSucceeded() {
1634 ac.channelz.ChannelMetrics.CallsSucceeded.Add(1)
1635}
1636
1637func (ac *addrConn) incrCallsFailed() {
1638 ac.channelz.ChannelMetrics.CallsFailed.Add(1)
khenaidooac637102019-01-14 15:44:34 -05001639}
1640
1641// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1642// underlying connections within the specified timeout.
1643//
1644// Deprecated: This error is never returned by grpc and should not be
1645// referenced by users.
1646var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
Abhay Kumara2ae5992025-11-10 14:02:24 +00001647
1648// getResolver finds the scheme in the cc's resolvers or the global registry.
1649// scheme should always be lowercase (typically by virtue of url.Parse()
1650// performing proper RFC3986 behavior).
1651func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1652 for _, rb := range cc.dopts.resolvers {
1653 if scheme == rb.Scheme() {
1654 return rb
1655 }
1656 }
1657 return resolver.Get(scheme)
1658}
1659
1660func (cc *ClientConn) updateConnectionError(err error) {
1661 cc.lceMu.Lock()
1662 cc.lastConnectionError = err
1663 cc.lceMu.Unlock()
1664}
1665
1666func (cc *ClientConn) connectionError() error {
1667 cc.lceMu.Lock()
1668 defer cc.lceMu.Unlock()
1669 return cc.lastConnectionError
1670}
1671
1672// initParsedTargetAndResolverBuilder parses the user's dial target and stores
1673// the parsed target in `cc.parsedTarget`.
1674//
1675// The resolver to use is determined based on the scheme in the parsed target
1676// and the same is stored in `cc.resolverBuilder`.
1677//
1678// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
1679func (cc *ClientConn) initParsedTargetAndResolverBuilder() error {
1680 logger.Infof("original dial target is: %q", cc.target)
1681
1682 var rb resolver.Builder
1683 parsedTarget, err := parseTarget(cc.target)
1684 if err == nil {
1685 rb = cc.getResolver(parsedTarget.URL.Scheme)
1686 if rb != nil {
1687 cc.parsedTarget = parsedTarget
1688 cc.resolverBuilder = rb
1689 return nil
1690 }
1691 }
1692
1693 // We are here because the user's dial target did not contain a scheme or
1694 // specified an unregistered scheme. We should fallback to the default
1695 // scheme, except when a custom dialer is specified in which case, we should
1696 // always use passthrough scheme. For either case, we need to respect any overridden
1697 // global defaults set by the user.
1698 defScheme := cc.dopts.defaultScheme
1699 if internal.UserSetDefaultScheme {
1700 defScheme = resolver.GetDefaultScheme()
1701 }
1702
1703 canonicalTarget := defScheme + ":///" + cc.target
1704
1705 parsedTarget, err = parseTarget(canonicalTarget)
1706 if err != nil {
1707 return err
1708 }
1709 rb = cc.getResolver(parsedTarget.URL.Scheme)
1710 if rb == nil {
1711 return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
1712 }
1713 cc.parsedTarget = parsedTarget
1714 cc.resolverBuilder = rb
1715 return nil
1716}
1717
1718// parseTarget uses RFC 3986 semantics to parse the given target into a
1719// resolver.Target struct containing url. Query params are stripped from the
1720// endpoint.
1721func parseTarget(target string) (resolver.Target, error) {
1722 u, err := url.Parse(target)
1723 if err != nil {
1724 return resolver.Target{}, err
1725 }
1726
1727 return resolver.Target{URL: *u}, nil
1728}
1729
1730// encodeAuthority escapes the authority string based on valid chars defined in
1731// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
1732func encodeAuthority(authority string) string {
1733 const upperhex = "0123456789ABCDEF"
1734
1735 // Return for characters that must be escaped as per
1736 // Valid chars are mentioned here:
1737 // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2
1738 shouldEscape := func(c byte) bool {
1739 // Alphanum are always allowed.
1740 if 'a' <= c && c <= 'z' || 'A' <= c && c <= 'Z' || '0' <= c && c <= '9' {
1741 return false
1742 }
1743 switch c {
1744 case '-', '_', '.', '~': // Unreserved characters
1745 return false
1746 case '!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=': // Subdelim characters
1747 return false
1748 case ':', '[', ']', '@': // Authority related delimiters
1749 return false
1750 }
1751 // Everything else must be escaped.
1752 return true
1753 }
1754
1755 hexCount := 0
1756 for i := 0; i < len(authority); i++ {
1757 c := authority[i]
1758 if shouldEscape(c) {
1759 hexCount++
1760 }
1761 }
1762
1763 if hexCount == 0 {
1764 return authority
1765 }
1766
1767 required := len(authority) + 2*hexCount
1768 t := make([]byte, required)
1769
1770 j := 0
1771 // This logic is a barebones version of escape in the go net/url library.
1772 for i := 0; i < len(authority); i++ {
1773 switch c := authority[i]; {
1774 case shouldEscape(c):
1775 t[j] = '%'
1776 t[j+1] = upperhex[c>>4]
1777 t[j+2] = upperhex[c&15]
1778 j += 3
1779 default:
1780 t[j] = authority[i]
1781 j++
1782 }
1783 }
1784 return string(t)
1785}
1786
1787// Determine channel authority. The order of precedence is as follows:
1788// - user specified authority override using `WithAuthority` dial option
1789// - creds' notion of server name for the authentication handshake
1790// - endpoint from dial target of the form "scheme://[authority]/endpoint"
1791//
1792// Stores the determined authority in `cc.authority`.
1793//
1794// Returns a non-nil error if the authority returned by the transport
1795// credentials do not match the authority configured through the dial option.
1796//
1797// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
1798func (cc *ClientConn) initAuthority() error {
1799 dopts := cc.dopts
1800 // Historically, we had two options for users to specify the serverName or
1801 // authority for a channel. One was through the transport credentials
1802 // (either in its constructor, or through the OverrideServerName() method).
1803 // The other option (for cases where WithInsecure() dial option was used)
1804 // was to use the WithAuthority() dial option.
1805 //
1806 // A few things have changed since:
1807 // - `insecure` package with an implementation of the `TransportCredentials`
1808 // interface for the insecure case
1809 // - WithAuthority() dial option support for secure credentials
1810 authorityFromCreds := ""
1811 if creds := dopts.copts.TransportCredentials; creds != nil && creds.Info().ServerName != "" {
1812 authorityFromCreds = creds.Info().ServerName
1813 }
1814 authorityFromDialOption := dopts.authority
1815 if (authorityFromCreds != "" && authorityFromDialOption != "") && authorityFromCreds != authorityFromDialOption {
1816 return fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption)
1817 }
1818
1819 endpoint := cc.parsedTarget.Endpoint()
1820 if authorityFromDialOption != "" {
1821 cc.authority = authorityFromDialOption
1822 } else if authorityFromCreds != "" {
1823 cc.authority = authorityFromCreds
1824 } else if auth, ok := cc.resolverBuilder.(resolver.AuthorityOverrider); ok {
1825 cc.authority = auth.OverrideAuthority(cc.parsedTarget)
1826 } else if strings.HasPrefix(endpoint, ":") {
1827 cc.authority = "localhost" + encodeAuthority(endpoint)
1828 } else {
1829 cc.authority = encodeAuthority(endpoint)
1830 }
1831 return nil
1832}