blob: c0c2c9a76abfae9c42ffce1c724d8395392eb514 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
Abhay Kumara61c5222025-11-10 07:32:50 +000026 "net/url"
27 "slices"
William Kurkianea869482019-04-09 15:16:11 -040028 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
Devmalya Pauldd23a992019-11-14 07:06:31 +000034 "google.golang.org/grpc/balancer/base"
Abhay Kumara61c5222025-11-10 07:32:50 +000035 "google.golang.org/grpc/balancer/pickfirst"
William Kurkianea869482019-04-09 15:16:11 -040036 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/connectivity"
Abhay Kumara61c5222025-11-10 07:32:50 +000038 "google.golang.org/grpc/internal"
William Kurkianea869482019-04-09 15:16:11 -040039 "google.golang.org/grpc/internal/channelz"
William Kurkianea869482019-04-09 15:16:11 -040040 "google.golang.org/grpc/internal/grpcsync"
Abhay Kumara61c5222025-11-10 07:32:50 +000041 "google.golang.org/grpc/internal/idle"
42 iresolver "google.golang.org/grpc/internal/resolver"
bseeniva0b9cbcb2026-02-12 19:11:11 +053043 istats "google.golang.org/grpc/internal/stats"
William Kurkianea869482019-04-09 15:16:11 -040044 "google.golang.org/grpc/internal/transport"
45 "google.golang.org/grpc/keepalive"
William Kurkianea869482019-04-09 15:16:11 -040046 "google.golang.org/grpc/resolver"
Abhilash S.L3b494632019-07-16 15:51:09 +053047 "google.golang.org/grpc/serviceconfig"
bseeniva0b9cbcb2026-02-12 19:11:11 +053048 "google.golang.org/grpc/stats"
William Kurkianea869482019-04-09 15:16:11 -040049 "google.golang.org/grpc/status"
Devmalya Pauldd23a992019-11-14 07:06:31 +000050
51 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
Devmalya Pauldd23a992019-11-14 07:06:31 +000052 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
Abhay Kumara61c5222025-11-10 07:32:50 +000053 _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
54 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
William Kurkianea869482019-04-09 15:16:11 -040055)
56
57const (
58 // minimum time to give a connection to complete
59 minConnectTimeout = 20 * time.Second
William Kurkianea869482019-04-09 15:16:11 -040060)
61
62var (
63 // ErrClientConnClosing indicates that the operation is illegal because
64 // the ClientConn is closing.
65 //
66 // Deprecated: this error should not be relied upon by users; use the status
67 // code of Canceled instead.
68 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
69 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
70 errConnDrain = errors.New("grpc: the connection is drained")
71 // errConnClosing indicates that the connection is closing.
72 errConnClosing = errors.New("grpc: the connection is closing")
Abhay Kumara61c5222025-11-10 07:32:50 +000073 // errConnIdling indicates the connection is being closed as the channel
74 // is moving to an idle mode due to inactivity.
75 errConnIdling = errors.New("grpc: the connection is closing due to channel idleness")
Abhilash S.L3b494632019-07-16 15:51:09 +053076 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
77 // service config.
78 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
Abhay Kumara61c5222025-11-10 07:32:50 +000079 // PickFirstBalancerName is the name of the pick_first balancer.
80 PickFirstBalancerName = pickfirst.Name
William Kurkianea869482019-04-09 15:16:11 -040081)
82
83// The following errors are returned from Dial and DialContext
84var (
85 // errNoTransportSecurity indicates that there is no transport security
86 // being set for ClientConn. Users should either set one or explicitly
87 // call WithInsecure DialOption to disable security.
Abhay Kumara61c5222025-11-10 07:32:50 +000088 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithTransportCredentials(insecure.NewCredentials()) explicitly or set credentials)")
William Kurkianea869482019-04-09 15:16:11 -040089 // errTransportCredsAndBundle indicates that creds bundle is used together
90 // with other individual Transport Credentials.
91 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
Abhay Kumara61c5222025-11-10 07:32:50 +000092 // errNoTransportCredsInBundle indicated that the configured creds bundle
93 // returned a transport credentials which was nil.
94 errNoTransportCredsInBundle = errors.New("grpc: credentials.Bundle must return non-nil transport credentials")
95 // errTransportCredentialsMissing indicates that users want to transmit
96 // security information (e.g., OAuth2 token) which requires secure
97 // connection on an insecure connection.
William Kurkianea869482019-04-09 15:16:11 -040098 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
William Kurkianea869482019-04-09 15:16:11 -040099)
100
101const (
102 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
103 defaultClientMaxSendMessageSize = math.MaxInt32
104 // http2IOBufSize specifies the buffer size for sending frames.
105 defaultWriteBufSize = 32 * 1024
106 defaultReadBufSize = 32 * 1024
107)
108
Abhay Kumara61c5222025-11-10 07:32:50 +0000109type defaultConfigSelector struct {
110 sc *ServiceConfig
William Kurkianea869482019-04-09 15:16:11 -0400111}
112
Abhay Kumara61c5222025-11-10 07:32:50 +0000113func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
114 return &iresolver.RPCConfig{
115 Context: rpcInfo.Context,
116 MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
117 }, nil
118}
119
120// NewClient creates a new gRPC "channel" for the target URI provided. No I/O
121// is performed. Use of the ClientConn for RPCs will automatically cause it to
122// connect. The Connect method may be called to manually create a connection,
123// but for most users this should be unnecessary.
William Kurkianea869482019-04-09 15:16:11 -0400124//
125// The target name syntax is defined in
Abhay Kumara61c5222025-11-10 07:32:50 +0000126// https://github.com/grpc/grpc/blob/master/doc/naming.md. E.g. to use the dns
127// name resolver, a "dns:///" prefix may be applied to the target. The default
128// name resolver will be used if no scheme is detected, or if the parsed scheme
129// is not a registered name resolver. The default resolver is "dns" but can be
130// overridden using the resolver package's SetDefaultScheme.
131//
132// Examples:
133//
134// - "foo.googleapis.com:8080"
135// - "dns:///foo.googleapis.com:8080"
136// - "dns:///foo.googleapis.com"
137// - "dns:///10.0.0.213:8080"
138// - "dns:///%5B2001:db8:85a3:8d3:1319:8a2e:370:7348%5D:443"
139// - "dns://8.8.8.8/foo.googleapis.com:8080"
140// - "dns://8.8.8.8/foo.googleapis.com"
141// - "zookeeper://zk.example.com:9900/example_service"
142//
143// The DialOptions returned by WithBlock, WithTimeout,
144// WithReturnConnectionError, and FailOnNonTempDialError are ignored by this
145// function.
146func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
William Kurkianea869482019-04-09 15:16:11 -0400147 cc := &ClientConn{
Abhay Kumara61c5222025-11-10 07:32:50 +0000148 target: target,
149 conns: make(map[*addrConn]struct{}),
150 dopts: defaultDialOptions(),
William Kurkianea869482019-04-09 15:16:11 -0400151 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000152
William Kurkianea869482019-04-09 15:16:11 -0400153 cc.retryThrottler.Store((*retryThrottler)(nil))
Abhay Kumara61c5222025-11-10 07:32:50 +0000154 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
William Kurkianea869482019-04-09 15:16:11 -0400155 cc.ctx, cc.cancel = context.WithCancel(context.Background())
156
Abhay Kumara61c5222025-11-10 07:32:50 +0000157 // Apply dial options.
158 disableGlobalOpts := false
159 for _, opt := range opts {
160 if _, ok := opt.(*disableGlobalDialOptions); ok {
161 disableGlobalOpts = true
162 break
163 }
164 }
165
166 if !disableGlobalOpts {
167 for _, opt := range globalDialOptions {
168 opt.apply(&cc.dopts)
169 }
170 }
171
William Kurkianea869482019-04-09 15:16:11 -0400172 for _, opt := range opts {
173 opt.apply(&cc.dopts)
174 }
175
Abhay Kumara61c5222025-11-10 07:32:50 +0000176 // Determine the resolver to use.
177 if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
178 return nil, err
179 }
180
181 for _, opt := range globalPerTargetDialOptions {
182 opt.DialOptionForTarget(cc.parsedTarget.URL).apply(&cc.dopts)
183 }
184
Abhilash S.L3b494632019-07-16 15:51:09 +0530185 chainUnaryClientInterceptors(cc)
186 chainStreamClientInterceptors(cc)
187
Abhay Kumara61c5222025-11-10 07:32:50 +0000188 if err := cc.validateTransportCredentials(); err != nil {
189 return nil, err
190 }
191
192 if cc.dopts.defaultServiceConfigRawJSON != nil {
193 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
194 if scpr.Err != nil {
195 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
196 }
197 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
198 }
199 cc.keepaliveParams = cc.dopts.copts.KeepaliveParams
200
201 if err = cc.initAuthority(); err != nil {
202 return nil, err
203 }
204
205 // Register ClientConn with channelz. Note that this is only done after
206 // channel creation cannot fail.
207 cc.channelzRegistration(target)
208 channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget)
209 channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
210
211 cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
212 cc.pickerWrapper = newPickerWrapper()
213
bseeniva0b9cbcb2026-02-12 19:11:11 +0530214 cc.metricsRecorderList = istats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
215 cc.statsHandler = istats.NewCombinedHandler(cc.dopts.copts.StatsHandlers...)
Abhay Kumara61c5222025-11-10 07:32:50 +0000216
217 cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
218 cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
219
220 return cc, nil
221}
222
223// Dial calls DialContext(context.Background(), target, opts...).
224//
225// Deprecated: use NewClient instead. Will be supported throughout 1.x.
226func Dial(target string, opts ...DialOption) (*ClientConn, error) {
227 return DialContext(context.Background(), target, opts...)
228}
229
230// DialContext calls NewClient and then exits idle mode. If WithBlock(true) is
231// used, it calls Connect and WaitForStateChange until either the context
232// expires or the state of the ClientConn is Ready.
233//
234// One subtle difference between NewClient and Dial and DialContext is that the
235// former uses "dns" as the default name resolver, while the latter use
236// "passthrough" for backward compatibility. This distinction should not matter
237// to most users, but could matter to legacy users that specify a custom dialer
238// and expect it to receive the target string directly.
239//
240// Deprecated: use NewClient instead. Will be supported throughout 1.x.
241func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
242 // At the end of this method, we kick the channel out of idle, rather than
243 // waiting for the first rpc.
244 //
245 // WithLocalDNSResolution dial option in `grpc.Dial` ensures that it
246 // preserves behavior: when default scheme passthrough is used, skip
247 // hostname resolution, when "dns" is used for resolution, perform
248 // resolution on the client.
249 opts = append([]DialOption{withDefaultScheme("passthrough"), WithLocalDNSResolution()}, opts...)
250 cc, err := NewClient(target, opts...)
251 if err != nil {
252 return nil, err
253 }
254
255 // We start the channel off in idle mode, but kick it out of idle now,
256 // instead of waiting for the first RPC. This is the legacy behavior of
257 // Dial.
Abhilash S.L3b494632019-07-16 15:51:09 +0530258 defer func() {
259 if err != nil {
260 cc.Close()
261 }
262 }()
263
Abhay Kumara61c5222025-11-10 07:32:50 +0000264 // This creates the name resolver, load balancer, etc.
265 if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
266 return nil, err
William Kurkianea869482019-04-09 15:16:11 -0400267 }
268
Abhay Kumara61c5222025-11-10 07:32:50 +0000269 // Return now for non-blocking dials.
270 if !cc.dopts.block {
271 return cc, nil
William Kurkianea869482019-04-09 15:16:11 -0400272 }
273
274 if cc.dopts.timeout > 0 {
275 var cancel context.CancelFunc
276 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
277 defer cancel()
278 }
William Kurkianea869482019-04-09 15:16:11 -0400279 defer func() {
280 select {
281 case <-ctx.Done():
Abhay Kumara61c5222025-11-10 07:32:50 +0000282 switch {
283 case ctx.Err() == err:
284 conn = nil
285 case err == nil || !cc.dopts.returnLastError:
286 conn, err = nil, ctx.Err()
287 default:
288 conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
289 }
William Kurkianea869482019-04-09 15:16:11 -0400290 default:
291 }
William Kurkianea869482019-04-09 15:16:11 -0400292 }()
293
Abhay Kumara61c5222025-11-10 07:32:50 +0000294 // A blocking dial blocks until the clientConn is ready.
295 for {
296 s := cc.GetState()
297 if s == connectivity.Idle {
298 cc.Connect()
William Kurkianea869482019-04-09 15:16:11 -0400299 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000300 if s == connectivity.Ready {
301 return cc, nil
302 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
303 if err = cc.connectionError(); err != nil {
304 terr, ok := err.(interface {
305 Temporary() bool
306 })
307 if ok && !terr.Temporary() {
308 return nil, err
309 }
William Kurkianea869482019-04-09 15:16:11 -0400310 }
William Kurkianea869482019-04-09 15:16:11 -0400311 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000312 if !cc.WaitForStateChange(ctx, s) {
313 // ctx got timeout or canceled.
314 if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
315 return nil, err
William Kurkianea869482019-04-09 15:16:11 -0400316 }
William Kurkianea869482019-04-09 15:16:11 -0400317 return nil, ctx.Err()
318 }
319 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000320}
William Kurkianea869482019-04-09 15:16:11 -0400321
Abhay Kumara61c5222025-11-10 07:32:50 +0000322// addTraceEvent is a helper method to add a trace event on the channel. If the
323// channel is a nested one, the same event is also added on the parent channel.
324func (cc *ClientConn) addTraceEvent(msg string) {
325 ted := &channelz.TraceEvent{
326 Desc: fmt.Sprintf("Channel %s", msg),
327 Severity: channelz.CtInfo,
William Kurkianea869482019-04-09 15:16:11 -0400328 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000329 if cc.dopts.channelzParent != nil {
330 ted.Parent = &channelz.TraceEvent{
331 Desc: fmt.Sprintf("Nested channel(id:%d) %s", cc.channelz.ID, msg),
332 Severity: channelz.CtInfo,
333 }
William Kurkianea869482019-04-09 15:16:11 -0400334 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000335 channelz.AddTraceEvent(logger, cc.channelz, 0, ted)
336}
William Kurkianea869482019-04-09 15:16:11 -0400337
Abhay Kumara61c5222025-11-10 07:32:50 +0000338type idler ClientConn
William Kurkianea869482019-04-09 15:16:11 -0400339
Abhay Kumara61c5222025-11-10 07:32:50 +0000340func (i *idler) EnterIdleMode() {
341 (*ClientConn)(i).enterIdleMode()
342}
343
344func (i *idler) ExitIdleMode() error {
345 return (*ClientConn)(i).exitIdleMode()
346}
347
348// exitIdleMode moves the channel out of idle mode by recreating the name
349// resolver and load balancer. This should never be called directly; use
350// cc.idlenessMgr.ExitIdleMode instead.
351func (cc *ClientConn) exitIdleMode() (err error) {
William Kurkianea869482019-04-09 15:16:11 -0400352 cc.mu.Lock()
Abhay Kumara61c5222025-11-10 07:32:50 +0000353 if cc.conns == nil {
354 cc.mu.Unlock()
355 return errConnClosing
356 }
William Kurkianea869482019-04-09 15:16:11 -0400357 cc.mu.Unlock()
Abhay Kumara61c5222025-11-10 07:32:50 +0000358
359 // This needs to be called without cc.mu because this builds a new resolver
360 // which might update state or report error inline, which would then need to
361 // acquire cc.mu.
362 if err := cc.resolverWrapper.start(); err != nil {
363 return err
364 }
365
366 cc.addTraceEvent("exiting idle mode")
367 return nil
368}
369
370// initIdleStateLocked initializes common state to how it should be while idle.
371func (cc *ClientConn) initIdleStateLocked() {
372 cc.resolverWrapper = newCCResolverWrapper(cc)
373 cc.balancerWrapper = newCCBalancerWrapper(cc)
374 cc.firstResolveEvent = grpcsync.NewEvent()
375 // cc.conns == nil is a proxy for the ClientConn being closed. So, instead
376 // of setting it to nil here, we recreate the map. This also means that we
377 // don't have to do this when exiting idle mode.
378 cc.conns = make(map[*addrConn]struct{})
379}
380
381// enterIdleMode puts the channel in idle mode, and as part of it shuts down the
382// name resolver, load balancer, and any subchannels. This should never be
383// called directly; use cc.idlenessMgr.EnterIdleMode instead.
384func (cc *ClientConn) enterIdleMode() {
385 cc.mu.Lock()
386
387 if cc.conns == nil {
388 cc.mu.Unlock()
389 return
390 }
391
392 conns := cc.conns
393
394 rWrapper := cc.resolverWrapper
395 rWrapper.close()
396 cc.pickerWrapper.reset()
397 bWrapper := cc.balancerWrapper
398 bWrapper.close()
399 cc.csMgr.updateState(connectivity.Idle)
400 cc.addTraceEvent("entering idle mode")
401
402 cc.initIdleStateLocked()
403
404 cc.mu.Unlock()
405
406 // Block until the name resolver and LB policy are closed.
407 <-rWrapper.serializer.Done()
408 <-bWrapper.serializer.Done()
409
410 // Close all subchannels after the LB policy is closed.
411 for ac := range conns {
412 ac.tearDown(errConnIdling)
413 }
414}
415
416// validateTransportCredentials performs a series of checks on the configured
417// transport credentials. It returns a non-nil error if any of these conditions
418// are met:
419// - no transport creds and no creds bundle is configured
420// - both transport creds and creds bundle are configured
421// - creds bundle is configured, but it lacks a transport credentials
422// - insecure transport creds configured alongside call creds that require
423// transport level security
424//
425// If none of the above conditions are met, the configured credentials are
426// deemed valid and a nil error is returned.
427func (cc *ClientConn) validateTransportCredentials() error {
428 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
429 return errNoTransportSecurity
430 }
431 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
432 return errTransportCredsAndBundle
433 }
434 if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
435 return errNoTransportCredsInBundle
436 }
437 transportCreds := cc.dopts.copts.TransportCredentials
438 if transportCreds == nil {
439 transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
440 }
441 if transportCreds.Info().SecurityProtocol == "insecure" {
442 for _, cd := range cc.dopts.copts.PerRPCCredentials {
443 if cd.RequireTransportSecurity() {
444 return errTransportCredentialsMissing
William Kurkianea869482019-04-09 15:16:11 -0400445 }
446 }
447 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000448 return nil
449}
William Kurkianea869482019-04-09 15:16:11 -0400450
Abhay Kumara61c5222025-11-10 07:32:50 +0000451// channelzRegistration registers the newly created ClientConn with channelz and
452// stores the returned identifier in `cc.channelz`. A channelz trace event is
453// emitted for ClientConn creation. If the newly created ClientConn is a nested
454// one, i.e a valid parent ClientConn ID is specified via a dial option, the
455// trace event is also added to the parent.
456//
457// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
458func (cc *ClientConn) channelzRegistration(target string) {
459 parentChannel, _ := cc.dopts.channelzParent.(*channelz.Channel)
460 cc.channelz = channelz.RegisterChannel(parentChannel, target)
461 cc.addTraceEvent(fmt.Sprintf("created for target %q", target))
William Kurkianea869482019-04-09 15:16:11 -0400462}
463
Abhilash S.L3b494632019-07-16 15:51:09 +0530464// chainUnaryClientInterceptors chains all unary client interceptors into one.
465func chainUnaryClientInterceptors(cc *ClientConn) {
466 interceptors := cc.dopts.chainUnaryInts
467 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
468 // be executed before any other chained interceptors.
469 if cc.dopts.unaryInt != nil {
470 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
471 }
472 var chainedInt UnaryClientInterceptor
473 if len(interceptors) == 0 {
474 chainedInt = nil
475 } else if len(interceptors) == 1 {
476 chainedInt = interceptors[0]
477 } else {
Abhay Kumara61c5222025-11-10 07:32:50 +0000478 chainedInt = func(ctx context.Context, method string, req, reply any, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
Abhilash S.L3b494632019-07-16 15:51:09 +0530479 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
480 }
481 }
482 cc.dopts.unaryInt = chainedInt
483}
484
485// getChainUnaryInvoker recursively generate the chained unary invoker.
486func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
487 if curr == len(interceptors)-1 {
488 return finalInvoker
489 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000490 return func(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {
Abhilash S.L3b494632019-07-16 15:51:09 +0530491 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
492 }
493}
494
495// chainStreamClientInterceptors chains all stream client interceptors into one.
496func chainStreamClientInterceptors(cc *ClientConn) {
497 interceptors := cc.dopts.chainStreamInts
498 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
499 // be executed before any other chained interceptors.
500 if cc.dopts.streamInt != nil {
501 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
502 }
503 var chainedInt StreamClientInterceptor
504 if len(interceptors) == 0 {
505 chainedInt = nil
506 } else if len(interceptors) == 1 {
507 chainedInt = interceptors[0]
508 } else {
509 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
510 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
511 }
512 }
513 cc.dopts.streamInt = chainedInt
514}
515
516// getChainStreamer recursively generate the chained client stream constructor.
517func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
518 if curr == len(interceptors)-1 {
519 return finalStreamer
520 }
521 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
522 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
523 }
524}
525
Abhay Kumara61c5222025-11-10 07:32:50 +0000526// newConnectivityStateManager creates an connectivityStateManager with
527// the specified channel.
528func newConnectivityStateManager(ctx context.Context, channel *channelz.Channel) *connectivityStateManager {
529 return &connectivityStateManager{
530 channelz: channel,
531 pubSub: grpcsync.NewPubSub(ctx),
532 }
533}
534
William Kurkianea869482019-04-09 15:16:11 -0400535// connectivityStateManager keeps the connectivity.State of ClientConn.
536// This struct will eventually be exported so the balancers can access it.
Abhay Kumara61c5222025-11-10 07:32:50 +0000537//
538// TODO: If possible, get rid of the `connectivityStateManager` type, and
539// provide this functionality using the `PubSub`, to avoid keeping track of
540// the connectivity state at two places.
William Kurkianea869482019-04-09 15:16:11 -0400541type connectivityStateManager struct {
542 mu sync.Mutex
543 state connectivity.State
544 notifyChan chan struct{}
Abhay Kumara61c5222025-11-10 07:32:50 +0000545 channelz *channelz.Channel
546 pubSub *grpcsync.PubSub
William Kurkianea869482019-04-09 15:16:11 -0400547}
548
549// updateState updates the connectivity.State of ClientConn.
550// If there's a change it notifies goroutines waiting on state change to
551// happen.
552func (csm *connectivityStateManager) updateState(state connectivity.State) {
553 csm.mu.Lock()
554 defer csm.mu.Unlock()
555 if csm.state == connectivity.Shutdown {
556 return
557 }
558 if csm.state == state {
559 return
560 }
561 csm.state = state
Abhay Kumara61c5222025-11-10 07:32:50 +0000562 csm.channelz.ChannelMetrics.State.Store(&state)
563 csm.pubSub.Publish(state)
564
565 channelz.Infof(logger, csm.channelz, "Channel Connectivity change to %v", state)
William Kurkianea869482019-04-09 15:16:11 -0400566 if csm.notifyChan != nil {
567 // There are other goroutines waiting on this channel.
568 close(csm.notifyChan)
569 csm.notifyChan = nil
570 }
571}
572
573func (csm *connectivityStateManager) getState() connectivity.State {
574 csm.mu.Lock()
575 defer csm.mu.Unlock()
576 return csm.state
577}
578
579func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
580 csm.mu.Lock()
581 defer csm.mu.Unlock()
582 if csm.notifyChan == nil {
583 csm.notifyChan = make(chan struct{})
584 }
585 return csm.notifyChan
586}
587
Abhay Kumara61c5222025-11-10 07:32:50 +0000588// ClientConnInterface defines the functions clients need to perform unary and
589// streaming RPCs. It is implemented by *ClientConn, and is only intended to
590// be referenced by generated code.
591type ClientConnInterface interface {
592 // Invoke performs a unary RPC and returns after the response is received
593 // into reply.
594 Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error
595 // NewStream begins a streaming RPC.
596 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
597}
598
599// Assert *ClientConn implements ClientConnInterface.
600var _ ClientConnInterface = (*ClientConn)(nil)
601
Devmalya Pauldd23a992019-11-14 07:06:31 +0000602// ClientConn represents a virtual connection to a conceptual endpoint, to
603// perform RPCs.
604//
605// A ClientConn is free to have zero or more actual connections to the endpoint
606// based on configuration, load, etc. It is also free to determine which actual
607// endpoints to use and may change it every RPC, permitting client-side load
608// balancing.
609//
610// A ClientConn encapsulates a range of functionality including name
611// resolution, TCP connection establishment (with retries and backoff) and TLS
612// handshakes. It also handles errors on established connections by
613// re-resolving the name and reconnecting.
William Kurkianea869482019-04-09 15:16:11 -0400614type ClientConn struct {
Abhay Kumara61c5222025-11-10 07:32:50 +0000615 ctx context.Context // Initialized using the background context at dial time.
616 cancel context.CancelFunc // Cancelled on close.
William Kurkianea869482019-04-09 15:16:11 -0400617
Abhay Kumara61c5222025-11-10 07:32:50 +0000618 // The following are initialized at dial time, and are read-only after that.
619 target string // User's dial target.
620 parsedTarget resolver.Target // See initParsedTargetAndResolverBuilder().
621 authority string // See initAuthority().
622 dopts dialOptions // Default and user specified dial options.
623 channelz *channelz.Channel // Channelz object.
624 resolverBuilder resolver.Builder // See initParsedTargetAndResolverBuilder().
625 idlenessMgr *idle.Manager
bseeniva0b9cbcb2026-02-12 19:11:11 +0530626 metricsRecorderList *istats.MetricsRecorderList
627 statsHandler stats.Handler
William Kurkianea869482019-04-09 15:16:11 -0400628
Abhay Kumara61c5222025-11-10 07:32:50 +0000629 // The following provide their own synchronization, and therefore don't
630 // require cc.mu to be held to access them.
631 csMgr *connectivityStateManager
632 pickerWrapper *pickerWrapper
633 safeConfigSelector iresolver.SafeConfigSelector
634 retryThrottler atomic.Value // Updated from service config.
William Kurkianea869482019-04-09 15:16:11 -0400635
Abhay Kumara61c5222025-11-10 07:32:50 +0000636 // mu protects the following fields.
637 // TODO: split mu so the same mutex isn't used for everything.
William Kurkianea869482019-04-09 15:16:11 -0400638 mu sync.RWMutex
Abhay Kumara61c5222025-11-10 07:32:50 +0000639 resolverWrapper *ccResolverWrapper // Always recreated whenever entering idle to simplify Close.
640 balancerWrapper *ccBalancerWrapper // Always recreated whenever entering idle to simplify Close.
641 sc *ServiceConfig // Latest service config received from the resolver.
642 conns map[*addrConn]struct{} // Set to nil on close.
643 keepaliveParams keepalive.ClientParameters // May be updated upon receipt of a GoAway.
644 // firstResolveEvent is used to track whether the name resolver sent us at
645 // least one update. RPCs block on this event. May be accessed without mu
646 // if we know we cannot be asked to enter idle mode while accessing it (e.g.
647 // when the idle manager has already been closed, or if we are already
648 // entering idle mode).
William Kurkianea869482019-04-09 15:16:11 -0400649 firstResolveEvent *grpcsync.Event
650
Abhay Kumara61c5222025-11-10 07:32:50 +0000651 lceMu sync.Mutex // protects lastConnectionError
652 lastConnectionError error
William Kurkianea869482019-04-09 15:16:11 -0400653}
654
655// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
656// ctx expires. A true value is returned in former case and false in latter.
William Kurkianea869482019-04-09 15:16:11 -0400657func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
658 ch := cc.csMgr.getNotifyChan()
659 if cc.csMgr.getState() != sourceState {
660 return true
661 }
662 select {
663 case <-ctx.Done():
664 return false
665 case <-ch:
666 return true
667 }
668}
669
670// GetState returns the connectivity.State of ClientConn.
William Kurkianea869482019-04-09 15:16:11 -0400671func (cc *ClientConn) GetState() connectivity.State {
672 return cc.csMgr.getState()
673}
674
Abhay Kumara61c5222025-11-10 07:32:50 +0000675// Connect causes all subchannels in the ClientConn to attempt to connect if
676// the channel is idle. Does not wait for the connection attempts to begin
677// before returning.
678//
679// # Experimental
680//
681// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
682// release.
683func (cc *ClientConn) Connect() {
684 if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
685 cc.addTraceEvent(err.Error())
686 return
William Kurkianea869482019-04-09 15:16:11 -0400687 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000688 // If the ClientConn was not in idle mode, we need to call ExitIdle on the
689 // LB policy so that connections can be created.
690 cc.mu.Lock()
691 cc.balancerWrapper.exitIdle()
692 cc.mu.Unlock()
William Kurkianea869482019-04-09 15:16:11 -0400693}
694
Abhay Kumara61c5222025-11-10 07:32:50 +0000695// waitForResolvedAddrs blocks until the resolver provides addresses or the
696// context expires, whichever happens first.
697//
698// Error is nil unless the context expires first; otherwise returns a status
699// error based on the context.
700//
701// The returned boolean indicates whether it did block or not. If the
702// resolution has already happened once before, it returns false without
703// blocking. Otherwise, it wait for the resolution and return true if
704// resolution has succeeded or return false along with error if resolution has
705// failed.
706func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) (bool, error) {
William Kurkianea869482019-04-09 15:16:11 -0400707 // This is on the RPC path, so we use a fast path to avoid the
708 // more-expensive "select" below after the resolver has returned once.
709 if cc.firstResolveEvent.HasFired() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000710 return false, nil
William Kurkianea869482019-04-09 15:16:11 -0400711 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000712 internal.NewStreamWaitingForResolver()
William Kurkianea869482019-04-09 15:16:11 -0400713 select {
714 case <-cc.firstResolveEvent.Done():
Abhay Kumara61c5222025-11-10 07:32:50 +0000715 return true, nil
William Kurkianea869482019-04-09 15:16:11 -0400716 case <-ctx.Done():
Abhay Kumara61c5222025-11-10 07:32:50 +0000717 return false, status.FromContextError(ctx.Err()).Err()
William Kurkianea869482019-04-09 15:16:11 -0400718 case <-cc.ctx.Done():
Abhay Kumara61c5222025-11-10 07:32:50 +0000719 return false, ErrClientConnClosing
William Kurkianea869482019-04-09 15:16:11 -0400720 }
721}
722
Devmalya Pauldd23a992019-11-14 07:06:31 +0000723var emptyServiceConfig *ServiceConfig
724
725func init() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000726 cfg := parseServiceConfig("{}", defaultMaxCallAttempts)
Devmalya Pauldd23a992019-11-14 07:06:31 +0000727 if cfg.Err != nil {
728 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
729 }
730 emptyServiceConfig = cfg.Config.(*ServiceConfig)
Abhay Kumara61c5222025-11-10 07:32:50 +0000731
732 internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
733 return cc.csMgr.pubSub.Subscribe(s)
734 }
735 internal.EnterIdleModeForTesting = func(cc *ClientConn) {
736 cc.idlenessMgr.EnterIdleModeForTesting()
737 }
738 internal.ExitIdleModeForTesting = func(cc *ClientConn) error {
739 return cc.idlenessMgr.ExitIdleMode()
740 }
Devmalya Pauldd23a992019-11-14 07:06:31 +0000741}
742
Abhay Kumara61c5222025-11-10 07:32:50 +0000743func (cc *ClientConn) maybeApplyDefaultServiceConfig() {
Devmalya Pauldd23a992019-11-14 07:06:31 +0000744 if cc.sc != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +0000745 cc.applyServiceConfigAndBalancer(cc.sc, nil)
Devmalya Pauldd23a992019-11-14 07:06:31 +0000746 return
747 }
748 if cc.dopts.defaultServiceConfig != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +0000749 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000750 } else {
Abhay Kumara61c5222025-11-10 07:32:50 +0000751 cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000752 }
753}
754
Abhay Kumara61c5222025-11-10 07:32:50 +0000755func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error {
Devmalya Pauldd23a992019-11-14 07:06:31 +0000756 defer cc.firstResolveEvent.Fire()
Abhilash S.L3b494632019-07-16 15:51:09 +0530757 // Check if the ClientConn is already closed. Some fields (e.g.
758 // balancerWrapper) are set to nil when closing the ClientConn, and could
759 // cause nil pointer panic if we don't have this check.
William Kurkianea869482019-04-09 15:16:11 -0400760 if cc.conns == nil {
Devmalya Pauldd23a992019-11-14 07:06:31 +0000761 cc.mu.Unlock()
Abhilash S.L3b494632019-07-16 15:51:09 +0530762 return nil
William Kurkianea869482019-04-09 15:16:11 -0400763 }
764
Devmalya Pauldd23a992019-11-14 07:06:31 +0000765 if err != nil {
766 // May need to apply the initial service config in case the resolver
767 // doesn't support service configs, or doesn't provide a service config
768 // with the new addresses.
Abhay Kumara61c5222025-11-10 07:32:50 +0000769 cc.maybeApplyDefaultServiceConfig()
Devmalya Pauldd23a992019-11-14 07:06:31 +0000770
Abhay Kumara61c5222025-11-10 07:32:50 +0000771 cc.balancerWrapper.resolverError(err)
Devmalya Pauldd23a992019-11-14 07:06:31 +0000772
773 // No addresses are valid with err set; return early.
774 cc.mu.Unlock()
775 return balancer.ErrBadResolverState
776 }
777
778 var ret error
Abhay Kumara61c5222025-11-10 07:32:50 +0000779 if cc.dopts.disableServiceConfig {
780 channelz.Infof(logger, cc.channelz, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
781 cc.maybeApplyDefaultServiceConfig()
782 } else if s.ServiceConfig == nil {
783 cc.maybeApplyDefaultServiceConfig()
Devmalya Pauldd23a992019-11-14 07:06:31 +0000784 // TODO: do we need to apply a failing LB policy if there is no
785 // default, per the error handling design?
786 } else {
787 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
Abhay Kumara61c5222025-11-10 07:32:50 +0000788 configSelector := iresolver.GetConfigSelector(s)
789 if configSelector != nil {
790 if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
791 channelz.Infof(logger, cc.channelz, "method configs in service config will be ignored due to presence of config selector")
792 }
793 } else {
794 configSelector = &defaultConfigSelector{sc}
795 }
796 cc.applyServiceConfigAndBalancer(sc, configSelector)
Devmalya Pauldd23a992019-11-14 07:06:31 +0000797 } else {
798 ret = balancer.ErrBadResolverState
Abhay Kumara61c5222025-11-10 07:32:50 +0000799 if cc.sc == nil {
800 // Apply the failing LB only if we haven't received valid service config
801 // from the name resolver in the past.
802 cc.applyFailingLBLocked(s.ServiceConfig)
Devmalya Pauldd23a992019-11-14 07:06:31 +0000803 cc.mu.Unlock()
804 return ret
805 }
806 }
William Kurkianea869482019-04-09 15:16:11 -0400807 }
808
Abhay Kumara61c5222025-11-10 07:32:50 +0000809 balCfg := cc.sc.lbConfig
Devmalya Pauldd23a992019-11-14 07:06:31 +0000810 bw := cc.balancerWrapper
811 cc.mu.Unlock()
Abhay Kumara61c5222025-11-10 07:32:50 +0000812
Devmalya Pauldd23a992019-11-14 07:06:31 +0000813 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
814 if ret == nil {
815 ret = uccsErr // prefer ErrBadResolver state since any other error is
816 // currently meaningless to the caller.
817 }
818 return ret
William Kurkianea869482019-04-09 15:16:11 -0400819}
820
Abhay Kumara61c5222025-11-10 07:32:50 +0000821// applyFailingLBLocked is akin to configuring an LB policy on the channel which
822// always fails RPCs. Here, an actual LB policy is not configured, but an always
823// erroring picker is configured, which returns errors with information about
824// what was invalid in the received service config. A config selector with no
825// service config is configured, and the connectivity state of the channel is
826// set to TransientFailure.
827func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) {
828 var err error
829 if sc.Err != nil {
830 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err)
831 } else {
832 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
William Kurkianea869482019-04-09 15:16:11 -0400833 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000834 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
835 cc.pickerWrapper.updatePicker(base.NewErrPicker(err))
836 cc.csMgr.updateState(connectivity.TransientFailure)
William Kurkianea869482019-04-09 15:16:11 -0400837}
838
Abhay Kumara61c5222025-11-10 07:32:50 +0000839// Makes a copy of the input addresses slice. Addresses are passed during
840// subconn creation and address update operations.
841func copyAddresses(in []resolver.Address) []resolver.Address {
842 out := make([]resolver.Address, len(in))
843 copy(out, in)
844 return out
William Kurkianea869482019-04-09 15:16:11 -0400845}
846
Abhay Kumara61c5222025-11-10 07:32:50 +0000847// newAddrConnLocked creates an addrConn for addrs and adds it to cc.conns.
William Kurkianea869482019-04-09 15:16:11 -0400848//
849// Caller needs to make sure len(addrs) > 0.
Abhay Kumara61c5222025-11-10 07:32:50 +0000850func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
851 if cc.conns == nil {
852 return nil, ErrClientConnClosing
853 }
854
William Kurkianea869482019-04-09 15:16:11 -0400855 ac := &addrConn{
Abhay Kumara61c5222025-11-10 07:32:50 +0000856 state: connectivity.Idle,
William Kurkianea869482019-04-09 15:16:11 -0400857 cc: cc,
Abhay Kumara61c5222025-11-10 07:32:50 +0000858 addrs: copyAddresses(addrs),
William Kurkianea869482019-04-09 15:16:11 -0400859 scopts: opts,
860 dopts: cc.dopts,
Abhay Kumara61c5222025-11-10 07:32:50 +0000861 channelz: channelz.RegisterSubChannel(cc.channelz, ""),
William Kurkianea869482019-04-09 15:16:11 -0400862 resetBackoff: make(chan struct{}),
863 }
864 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
Abhay Kumara61c5222025-11-10 07:32:50 +0000865 // Start with our address set to the first address; this may be updated if
866 // we connect to different addresses.
867 ac.channelz.ChannelMetrics.Target.Store(&addrs[0].Addr)
868
869 channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
870 Desc: "Subchannel created",
871 Severity: channelz.CtInfo,
872 Parent: &channelz.TraceEvent{
873 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelz.ID),
874 Severity: channelz.CtInfo,
875 },
876 })
877
William Kurkianea869482019-04-09 15:16:11 -0400878 // Track ac in cc. This needs to be done before any getTransport(...) is called.
William Kurkianea869482019-04-09 15:16:11 -0400879 cc.conns[ac] = struct{}{}
William Kurkianea869482019-04-09 15:16:11 -0400880 return ac, nil
881}
882
883// removeAddrConn removes the addrConn in the subConn from clientConn.
884// It also tears down the ac with the given error.
885func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
886 cc.mu.Lock()
887 if cc.conns == nil {
888 cc.mu.Unlock()
889 return
890 }
891 delete(cc.conns, ac)
892 cc.mu.Unlock()
893 ac.tearDown(err)
894}
895
William Kurkianea869482019-04-09 15:16:11 -0400896// Target returns the target string of the ClientConn.
William Kurkianea869482019-04-09 15:16:11 -0400897func (cc *ClientConn) Target() string {
898 return cc.target
899}
900
Abhay Kumara61c5222025-11-10 07:32:50 +0000901// CanonicalTarget returns the canonical target string used when creating cc.
902//
903// This always has the form "<scheme>://[authority]/<endpoint>". For example:
904//
905// - "dns:///example.com:42"
906// - "dns://8.8.8.8/example.com:42"
907// - "unix:///path/to/socket"
908func (cc *ClientConn) CanonicalTarget() string {
909 return cc.parsedTarget.String()
910}
911
William Kurkianea869482019-04-09 15:16:11 -0400912func (cc *ClientConn) incrCallsStarted() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000913 cc.channelz.ChannelMetrics.CallsStarted.Add(1)
914 cc.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
William Kurkianea869482019-04-09 15:16:11 -0400915}
916
917func (cc *ClientConn) incrCallsSucceeded() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000918 cc.channelz.ChannelMetrics.CallsSucceeded.Add(1)
William Kurkianea869482019-04-09 15:16:11 -0400919}
920
921func (cc *ClientConn) incrCallsFailed() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000922 cc.channelz.ChannelMetrics.CallsFailed.Add(1)
William Kurkianea869482019-04-09 15:16:11 -0400923}
924
925// connect starts creating a transport.
926// It does nothing if the ac is not IDLE.
927// TODO(bar) Move this to the addrConn section.
928func (ac *addrConn) connect() error {
929 ac.mu.Lock()
930 if ac.state == connectivity.Shutdown {
Abhay Kumara61c5222025-11-10 07:32:50 +0000931 if logger.V(2) {
932 logger.Infof("connect called on shutdown addrConn; ignoring.")
933 }
William Kurkianea869482019-04-09 15:16:11 -0400934 ac.mu.Unlock()
935 return errConnClosing
936 }
937 if ac.state != connectivity.Idle {
Abhay Kumara61c5222025-11-10 07:32:50 +0000938 if logger.V(2) {
939 logger.Infof("connect called on addrConn in non-idle state (%v); ignoring.", ac.state)
940 }
William Kurkianea869482019-04-09 15:16:11 -0400941 ac.mu.Unlock()
942 return nil
943 }
William Kurkianea869482019-04-09 15:16:11 -0400944
Abhay Kumara61c5222025-11-10 07:32:50 +0000945 ac.resetTransportAndUnlock()
William Kurkianea869482019-04-09 15:16:11 -0400946 return nil
947}
948
Abhay Kumara61c5222025-11-10 07:32:50 +0000949// equalAddressIgnoringBalAttributes returns true is a and b are considered equal.
950// This is different from the Equal method on the resolver.Address type which
951// considers all fields to determine equality. Here, we only consider fields
952// that are meaningful to the subConn.
953func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
954 return a.Addr == b.Addr && a.ServerName == b.ServerName &&
955 a.Attributes.Equal(b.Attributes) &&
956 a.Metadata == b.Metadata
957}
958
959func equalAddressesIgnoringBalAttributes(a, b []resolver.Address) bool {
960 return slices.EqualFunc(a, b, func(a, b resolver.Address) bool { return equalAddressIgnoringBalAttributes(&a, &b) })
961}
962
963// updateAddrs updates ac.addrs with the new addresses list and handles active
964// connections or connection attempts.
965func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
966 addrs = copyAddresses(addrs)
967 limit := len(addrs)
968 if limit > 5 {
969 limit = 5
970 }
971 channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), addrs[:limit])
972
William Kurkianea869482019-04-09 15:16:11 -0400973 ac.mu.Lock()
Abhay Kumara61c5222025-11-10 07:32:50 +0000974 if equalAddressesIgnoringBalAttributes(ac.addrs, addrs) {
975 ac.mu.Unlock()
976 return
977 }
978
979 ac.addrs = addrs
980
Abhilash S.L3b494632019-07-16 15:51:09 +0530981 if ac.state == connectivity.Shutdown ||
982 ac.state == connectivity.TransientFailure ||
983 ac.state == connectivity.Idle {
Abhay Kumara61c5222025-11-10 07:32:50 +0000984 // We were not connecting, so do nothing but update the addresses.
985 ac.mu.Unlock()
986 return
William Kurkianea869482019-04-09 15:16:11 -0400987 }
988
Abhay Kumara61c5222025-11-10 07:32:50 +0000989 if ac.state == connectivity.Ready {
990 // Try to find the connected address.
991 for _, a := range addrs {
992 a.ServerName = ac.cc.getServerName(a)
993 if equalAddressIgnoringBalAttributes(&a, &ac.curAddr) {
994 // We are connected to a valid address, so do nothing but
995 // update the addresses.
996 ac.mu.Unlock()
997 return
998 }
William Kurkianea869482019-04-09 15:16:11 -0400999 }
1000 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001001
1002 // We are either connected to the wrong address or currently connecting.
1003 // Stop the current iteration and restart.
1004
1005 ac.cancel()
1006 ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)
1007
1008 // We have to defer here because GracefulClose => onClose, which requires
1009 // locking ac.mu.
1010 if ac.transport != nil {
1011 defer ac.transport.GracefulClose()
1012 ac.transport = nil
William Kurkianea869482019-04-09 15:16:11 -04001013 }
1014
Abhay Kumara61c5222025-11-10 07:32:50 +00001015 if len(addrs) == 0 {
1016 ac.updateConnectivityState(connectivity.Idle, nil)
1017 }
1018
1019 // Since we were connecting/connected, we should start a new connection
1020 // attempt.
1021 go ac.resetTransportAndUnlock()
1022}
1023
1024// getServerName determines the serverName to be used in the connection
1025// handshake. The default value for the serverName is the authority on the
1026// ClientConn, which either comes from the user's dial target or through an
1027// authority override specified using the WithAuthority dial option. Name
1028// resolvers can specify a per-address override for the serverName through the
1029// resolver.Address.ServerName field which is used only if the WithAuthority
1030// dial option was not used. The rationale is that per-address authority
1031// overrides specified by the name resolver can represent a security risk, while
1032// an override specified by the user is more dependable since they probably know
1033// what they are doing.
1034func (cc *ClientConn) getServerName(addr resolver.Address) string {
1035 if cc.dopts.authority != "" {
1036 return cc.dopts.authority
1037 }
1038 if addr.ServerName != "" {
1039 return addr.ServerName
1040 }
1041 return cc.authority
1042}
1043
1044func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
1045 if sc == nil {
1046 return MethodConfig{}
1047 }
1048 if m, ok := sc.Methods[method]; ok {
1049 return m
1050 }
1051 i := strings.LastIndex(method, "/")
1052 if m, ok := sc.Methods[method[:i+1]]; ok {
1053 return m
1054 }
1055 return sc.Methods[""]
William Kurkianea869482019-04-09 15:16:11 -04001056}
1057
1058// GetMethodConfig gets the method config of the input method.
1059// If there's an exact match for input method (i.e. /service/method), we return
1060// the corresponding MethodConfig.
Abhay Kumara61c5222025-11-10 07:32:50 +00001061// If there isn't an exact match for the input method, we look for the service's default
1062// config under the service (i.e /service/) and then for the default for all services (empty string).
1063//
1064// If there is a default MethodConfig for the service, we return it.
William Kurkianea869482019-04-09 15:16:11 -04001065// Otherwise, we return an empty MethodConfig.
1066func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
1067 // TODO: Avoid the locking here.
1068 cc.mu.RLock()
1069 defer cc.mu.RUnlock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001070 return getMethodConfig(cc.sc, method)
William Kurkianea869482019-04-09 15:16:11 -04001071}
1072
1073func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
1074 cc.mu.RLock()
1075 defer cc.mu.RUnlock()
Abhilash S.L3b494632019-07-16 15:51:09 +05301076 if cc.sc == nil {
1077 return nil
1078 }
William Kurkianea869482019-04-09 15:16:11 -04001079 return cc.sc.healthCheckConfig
1080}
1081
Abhay Kumara61c5222025-11-10 07:32:50 +00001082func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector) {
Abhilash S.L3b494632019-07-16 15:51:09 +05301083 if sc == nil {
1084 // should never reach here.
Devmalya Pauldd23a992019-11-14 07:06:31 +00001085 return
William Kurkianea869482019-04-09 15:16:11 -04001086 }
William Kurkianea869482019-04-09 15:16:11 -04001087 cc.sc = sc
Abhay Kumara61c5222025-11-10 07:32:50 +00001088 if configSelector != nil {
1089 cc.safeConfigSelector.UpdateConfigSelector(configSelector)
1090 }
William Kurkianea869482019-04-09 15:16:11 -04001091
Abhilash S.L3b494632019-07-16 15:51:09 +05301092 if cc.sc.retryThrottling != nil {
William Kurkianea869482019-04-09 15:16:11 -04001093 newThrottler := &retryThrottler{
Abhilash S.L3b494632019-07-16 15:51:09 +05301094 tokens: cc.sc.retryThrottling.MaxTokens,
1095 max: cc.sc.retryThrottling.MaxTokens,
1096 thresh: cc.sc.retryThrottling.MaxTokens / 2,
1097 ratio: cc.sc.retryThrottling.TokenRatio,
William Kurkianea869482019-04-09 15:16:11 -04001098 }
1099 cc.retryThrottler.Store(newThrottler)
1100 } else {
1101 cc.retryThrottler.Store((*retryThrottler)(nil))
1102 }
William Kurkianea869482019-04-09 15:16:11 -04001103}
1104
Abhay Kumara61c5222025-11-10 07:32:50 +00001105func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
William Kurkianea869482019-04-09 15:16:11 -04001106 cc.mu.RLock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001107 cc.resolverWrapper.resolveNow(o)
William Kurkianea869482019-04-09 15:16:11 -04001108 cc.mu.RUnlock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001109}
1110
1111func (cc *ClientConn) resolveNowLocked(o resolver.ResolveNowOptions) {
1112 cc.resolverWrapper.resolveNow(o)
William Kurkianea869482019-04-09 15:16:11 -04001113}
1114
1115// ResetConnectBackoff wakes up all subchannels in transient failure and causes
1116// them to attempt another connection immediately. It also resets the backoff
1117// times used for subsequent attempts regardless of the current state.
1118//
1119// In general, this function should not be used. Typical service or network
1120// outages result in a reasonable client reconnection strategy by default.
1121// However, if a previously unavailable network becomes available, this may be
1122// used to trigger an immediate reconnect.
1123//
Abhay Kumara61c5222025-11-10 07:32:50 +00001124// # Experimental
1125//
1126// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1127// later release.
William Kurkianea869482019-04-09 15:16:11 -04001128func (cc *ClientConn) ResetConnectBackoff() {
1129 cc.mu.Lock()
Devmalya Pauldd23a992019-11-14 07:06:31 +00001130 conns := cc.conns
1131 cc.mu.Unlock()
1132 for ac := range conns {
William Kurkianea869482019-04-09 15:16:11 -04001133 ac.resetConnectBackoff()
1134 }
1135}
1136
1137// Close tears down the ClientConn and all underlying connections.
1138func (cc *ClientConn) Close() error {
Abhay Kumara61c5222025-11-10 07:32:50 +00001139 defer func() {
1140 cc.cancel()
1141 <-cc.csMgr.pubSub.Done()
1142 }()
1143
1144 // Prevent calls to enter/exit idle immediately, and ensure we are not
1145 // currently entering/exiting idle mode.
1146 cc.idlenessMgr.Close()
William Kurkianea869482019-04-09 15:16:11 -04001147
1148 cc.mu.Lock()
1149 if cc.conns == nil {
1150 cc.mu.Unlock()
1151 return ErrClientConnClosing
1152 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001153
William Kurkianea869482019-04-09 15:16:11 -04001154 conns := cc.conns
1155 cc.conns = nil
1156 cc.csMgr.updateState(connectivity.Shutdown)
1157
Abhay Kumara61c5222025-11-10 07:32:50 +00001158 // We can safely unlock and continue to access all fields now as
1159 // cc.conns==nil, preventing any further operations on cc.
William Kurkianea869482019-04-09 15:16:11 -04001160 cc.mu.Unlock()
1161
Abhay Kumara61c5222025-11-10 07:32:50 +00001162 cc.resolverWrapper.close()
1163 // The order of closing matters here since the balancer wrapper assumes the
1164 // picker is closed before it is closed.
1165 cc.pickerWrapper.close()
1166 cc.balancerWrapper.close()
William Kurkianea869482019-04-09 15:16:11 -04001167
Abhay Kumara61c5222025-11-10 07:32:50 +00001168 <-cc.resolverWrapper.serializer.Done()
1169 <-cc.balancerWrapper.serializer.Done()
1170 var wg sync.WaitGroup
William Kurkianea869482019-04-09 15:16:11 -04001171 for ac := range conns {
Abhay Kumara61c5222025-11-10 07:32:50 +00001172 wg.Add(1)
1173 go func(ac *addrConn) {
1174 defer wg.Done()
1175 ac.tearDown(ErrClientConnClosing)
1176 }(ac)
William Kurkianea869482019-04-09 15:16:11 -04001177 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001178 wg.Wait()
1179 cc.addTraceEvent("deleted")
1180 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
1181 // trace reference to the entity being deleted, and thus prevent it from being
1182 // deleted right away.
1183 channelz.RemoveEntry(cc.channelz.ID)
1184
William Kurkianea869482019-04-09 15:16:11 -04001185 return nil
1186}
1187
1188// addrConn is a network connection to a given address.
1189type addrConn struct {
1190 ctx context.Context
1191 cancel context.CancelFunc
1192
1193 cc *ClientConn
1194 dopts dialOptions
Abhay Kumara61c5222025-11-10 07:32:50 +00001195 acbw *acBalancerWrapper
William Kurkianea869482019-04-09 15:16:11 -04001196 scopts balancer.NewSubConnOptions
1197
1198 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1199 // health checking may require server to report healthy to set ac to READY), and is reset
1200 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1201 // is received, transport is closed, ac has been torn down).
1202 transport transport.ClientTransport // The current transport.
1203
Abhay Kumara61c5222025-11-10 07:32:50 +00001204 // This mutex is used on the RPC path, so its usage should be minimized as
1205 // much as possible.
1206 // TODO: Find a lock-free way to retrieve the transport and state from the
1207 // addrConn.
William Kurkianea869482019-04-09 15:16:11 -04001208 mu sync.Mutex
1209 curAddr resolver.Address // The current address.
1210 addrs []resolver.Address // All addresses that the resolver resolved to.
1211
1212 // Use updateConnectivityState for updating addrConn's connectivity state.
1213 state connectivity.State
1214
William Kurkianea869482019-04-09 15:16:11 -04001215 backoffIdx int // Needs to be stateful for resetConnectBackoff.
1216 resetBackoff chan struct{}
1217
Abhay Kumara61c5222025-11-10 07:32:50 +00001218 channelz *channelz.SubChannel
William Kurkianea869482019-04-09 15:16:11 -04001219}
1220
1221// Note: this requires a lock on ac.mu.
Abhay Kumara61c5222025-11-10 07:32:50 +00001222func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
William Kurkianea869482019-04-09 15:16:11 -04001223 if ac.state == s {
1224 return
1225 }
William Kurkianea869482019-04-09 15:16:11 -04001226 ac.state = s
Abhay Kumara61c5222025-11-10 07:32:50 +00001227 ac.channelz.ChannelMetrics.State.Store(&s)
1228 if lastErr == nil {
1229 channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v", s)
1230 } else {
1231 channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
William Kurkianea869482019-04-09 15:16:11 -04001232 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001233 ac.acbw.updateState(s, ac.curAddr, lastErr)
William Kurkianea869482019-04-09 15:16:11 -04001234}
1235
1236// adjustParams updates parameters used to create transports upon
1237// receiving a GoAway.
1238func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
Abhay Kumara61c5222025-11-10 07:32:50 +00001239 if r == transport.GoAwayTooManyPings {
William Kurkianea869482019-04-09 15:16:11 -04001240 v := 2 * ac.dopts.copts.KeepaliveParams.Time
1241 ac.cc.mu.Lock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001242 if v > ac.cc.keepaliveParams.Time {
1243 ac.cc.keepaliveParams.Time = v
William Kurkianea869482019-04-09 15:16:11 -04001244 }
1245 ac.cc.mu.Unlock()
1246 }
1247}
1248
Abhay Kumara61c5222025-11-10 07:32:50 +00001249// resetTransportAndUnlock unconditionally connects the addrConn.
1250//
1251// ac.mu must be held by the caller, and this function will guarantee it is released.
1252func (ac *addrConn) resetTransportAndUnlock() {
1253 acCtx := ac.ctx
1254 if acCtx.Err() != nil {
William Kurkianea869482019-04-09 15:16:11 -04001255 ac.mu.Unlock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001256 return
William Kurkianea869482019-04-09 15:16:11 -04001257 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001258
1259 addrs := ac.addrs
1260 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1261 // This will be the duration that dial gets to finish.
1262 dialDuration := minConnectTimeout
1263 if ac.dopts.minConnectTimeout != nil {
1264 dialDuration = ac.dopts.minConnectTimeout()
1265 }
1266
1267 if dialDuration < backoffFor {
1268 // Give dial more time as we keep failing to connect.
1269 dialDuration = backoffFor
1270 }
1271 // We can potentially spend all the time trying the first address, and
1272 // if the server accepts the connection and then hangs, the following
1273 // addresses will never be tried.
1274 //
1275 // The spec doesn't mention what should be done for multiple addresses.
1276 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1277 connectDeadline := time.Now().Add(dialDuration)
1278
1279 ac.updateConnectivityState(connectivity.Connecting, nil)
1280 ac.mu.Unlock()
1281
1282 if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
1283 // TODO: #7534 - Move re-resolution requests into the pick_first LB policy
1284 // to ensure one resolution request per pass instead of per subconn failure.
1285 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1286 ac.mu.Lock()
1287 if acCtx.Err() != nil {
1288 // addrConn was torn down.
1289 ac.mu.Unlock()
1290 return
1291 }
1292 // After exhausting all addresses, the addrConn enters
1293 // TRANSIENT_FAILURE.
1294 ac.updateConnectivityState(connectivity.TransientFailure, err)
1295
1296 // Backoff.
1297 b := ac.resetBackoff
1298 ac.mu.Unlock()
1299
1300 timer := time.NewTimer(backoffFor)
1301 select {
1302 case <-timer.C:
1303 ac.mu.Lock()
1304 ac.backoffIdx++
1305 ac.mu.Unlock()
1306 case <-b:
1307 timer.Stop()
1308 case <-acCtx.Done():
1309 timer.Stop()
1310 return
1311 }
1312
1313 ac.mu.Lock()
1314 if acCtx.Err() == nil {
1315 ac.updateConnectivityState(connectivity.Idle, err)
1316 }
1317 ac.mu.Unlock()
1318 return
1319 }
1320 // Success; reset backoff.
1321 ac.mu.Lock()
1322 ac.backoffIdx = 0
1323 ac.mu.Unlock()
William Kurkianea869482019-04-09 15:16:11 -04001324}
1325
Abhay Kumara61c5222025-11-10 07:32:50 +00001326// tryAllAddrs tries to create a connection to the addresses, and stop when at
1327// the first successful one. It returns an error if no address was successfully
1328// connected, or updates ac appropriately with the new transport.
1329func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
1330 var firstConnErr error
Abhilash S.L3b494632019-07-16 15:51:09 +05301331 for _, addr := range addrs {
Abhay Kumara61c5222025-11-10 07:32:50 +00001332 ac.channelz.ChannelMetrics.Target.Store(&addr.Addr)
1333 if ctx.Err() != nil {
1334 return errConnClosing
Abhilash S.L3b494632019-07-16 15:51:09 +05301335 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001336 ac.mu.Lock()
Abhilash S.L3b494632019-07-16 15:51:09 +05301337
1338 ac.cc.mu.RLock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001339 ac.dopts.copts.KeepaliveParams = ac.cc.keepaliveParams
Abhilash S.L3b494632019-07-16 15:51:09 +05301340 ac.cc.mu.RUnlock()
1341
1342 copts := ac.dopts.copts
1343 if ac.scopts.CredsBundle != nil {
1344 copts.CredsBundle = ac.scopts.CredsBundle
1345 }
1346 ac.mu.Unlock()
1347
Abhay Kumara61c5222025-11-10 07:32:50 +00001348 channelz.Infof(logger, ac.channelz, "Subchannel picks a new address %q to connect", addr.Addr)
Abhilash S.L3b494632019-07-16 15:51:09 +05301349
Abhay Kumara61c5222025-11-10 07:32:50 +00001350 err := ac.createTransport(ctx, addr, copts, connectDeadline)
Abhilash S.L3b494632019-07-16 15:51:09 +05301351 if err == nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001352 return nil
Abhilash S.L3b494632019-07-16 15:51:09 +05301353 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001354 if firstConnErr == nil {
1355 firstConnErr = err
1356 }
1357 ac.cc.updateConnectionError(err)
Abhilash S.L3b494632019-07-16 15:51:09 +05301358 }
1359
1360 // Couldn't connect to any address.
Abhay Kumara61c5222025-11-10 07:32:50 +00001361 return firstConnErr
Abhilash S.L3b494632019-07-16 15:51:09 +05301362}
1363
Abhay Kumara61c5222025-11-10 07:32:50 +00001364// createTransport creates a connection to addr. It returns an error if the
1365// address was not successfully connected, or updates ac appropriately with the
1366// new transport.
1367func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
1368 addr.ServerName = ac.cc.getServerName(addr)
1369 hctx, hcancel := context.WithCancel(ctx)
William Kurkianea869482019-04-09 15:16:11 -04001370
Abhay Kumara61c5222025-11-10 07:32:50 +00001371 onClose := func(r transport.GoAwayReason) {
William Kurkianea869482019-04-09 15:16:11 -04001372 ac.mu.Lock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001373 defer ac.mu.Unlock()
1374 // adjust params based on GoAwayReason
William Kurkianea869482019-04-09 15:16:11 -04001375 ac.adjustParams(r)
Abhay Kumara61c5222025-11-10 07:32:50 +00001376 if ctx.Err() != nil {
1377 // Already shut down or connection attempt canceled. tearDown() or
1378 // updateAddrs() already cleared the transport and canceled hctx
1379 // via ac.ctx, and we expected this connection to be closed, so do
1380 // nothing here.
1381 return
1382 }
1383 hcancel()
1384 if ac.transport == nil {
1385 // We're still connecting to this address, which could error. Do
1386 // not update the connectivity state or resolve; these will happen
1387 // at the end of the tryAllAddrs connection loop in the event of an
1388 // error.
1389 return
1390 }
1391 ac.transport = nil
1392 // Refresh the name resolver on any connection loss.
1393 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1394 // Always go idle and wait for the LB policy to initiate a new
1395 // connection attempt.
1396 ac.updateConnectivityState(connectivity.Idle, nil)
William Kurkianea869482019-04-09 15:16:11 -04001397 }
1398
Abhay Kumara61c5222025-11-10 07:32:50 +00001399 connectCtx, cancel := context.WithDeadline(ctx, connectDeadline)
William Kurkianea869482019-04-09 15:16:11 -04001400 defer cancel()
Abhay Kumara61c5222025-11-10 07:32:50 +00001401 copts.ChannelzParent = ac.channelz
William Kurkianea869482019-04-09 15:16:11 -04001402
Abhay Kumara61c5222025-11-10 07:32:50 +00001403 newTr, err := transport.NewHTTP2Client(connectCtx, ac.cc.ctx, addr, copts, onClose)
William Kurkianea869482019-04-09 15:16:11 -04001404 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001405 if logger.V(2) {
1406 logger.Infof("Creating new client transport to %q: %v", addr, err)
1407 }
William Kurkianea869482019-04-09 15:16:11 -04001408 // newTr is either nil, or closed.
Abhay Kumara61c5222025-11-10 07:32:50 +00001409 hcancel()
1410 channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
1411 return err
William Kurkianea869482019-04-09 15:16:11 -04001412 }
1413
Abhay Kumara61c5222025-11-10 07:32:50 +00001414 ac.mu.Lock()
1415 defer ac.mu.Unlock()
1416 if ctx.Err() != nil {
1417 // This can happen if the subConn was removed while in `Connecting`
1418 // state. tearDown() would have set the state to `Shutdown`, but
1419 // would not have closed the transport since ac.transport would not
1420 // have been set at that point.
1421 //
1422 // We run this in a goroutine because newTr.Close() calls onClose()
1423 // inline, which requires locking ac.mu.
1424 //
1425 // The error we pass to Close() is immaterial since there are no open
1426 // streams at this point, so no trailers with error details will be sent
1427 // out. We just need to pass a non-nil error.
1428 //
1429 // This can also happen when updateAddrs is called during a connection
1430 // attempt.
1431 go newTr.Close(transport.ErrConnClosing)
1432 return nil
William Kurkianea869482019-04-09 15:16:11 -04001433 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001434 if hctx.Err() != nil {
1435 // onClose was already called for this connection, but the connection
1436 // was successfully established first. Consider it a success and set
1437 // the new state to Idle.
1438 ac.updateConnectivityState(connectivity.Idle, nil)
1439 return nil
1440 }
1441 ac.curAddr = addr
1442 ac.transport = newTr
1443 ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
1444 return nil
William Kurkianea869482019-04-09 15:16:11 -04001445}
1446
Abhilash S.L3b494632019-07-16 15:51:09 +05301447// startHealthCheck starts the health checking stream (RPC) to watch the health
1448// stats of this connection if health checking is requested and configured.
1449//
1450// LB channel health checking is enabled when all requirements below are met:
1451// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
Abhay Kumara61c5222025-11-10 07:32:50 +00001452// 2. internal.HealthCheckFunc is set by importing the grpc/health package
Abhilash S.L3b494632019-07-16 15:51:09 +05301453// 3. a service config with non-empty healthCheckConfig field is provided
1454// 4. the load balancer requests it
1455//
1456// It sets addrConn to READY if the health checking stream is not started.
1457//
1458// Caller must hold ac.mu.
1459func (ac *addrConn) startHealthCheck(ctx context.Context) {
1460 var healthcheckManagingState bool
1461 defer func() {
1462 if !healthcheckManagingState {
Abhay Kumara61c5222025-11-10 07:32:50 +00001463 ac.updateConnectivityState(connectivity.Ready, nil)
Abhilash S.L3b494632019-07-16 15:51:09 +05301464 }
1465 }()
1466
1467 if ac.cc.dopts.disableHealthCheck {
1468 return
William Kurkianea869482019-04-09 15:16:11 -04001469 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301470 healthCheckConfig := ac.cc.healthCheckConfig()
1471 if healthCheckConfig == nil {
1472 return
1473 }
1474 if !ac.scopts.HealthCheckEnabled {
1475 return
1476 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001477 healthCheckFunc := internal.HealthCheckFunc
Abhilash S.L3b494632019-07-16 15:51:09 +05301478 if healthCheckFunc == nil {
1479 // The health package is not imported to set health check function.
1480 //
1481 // TODO: add a link to the health check doc in the error message.
Abhay Kumara61c5222025-11-10 07:32:50 +00001482 channelz.Error(logger, ac.channelz, "Health check is requested but health check function is not set.")
Abhilash S.L3b494632019-07-16 15:51:09 +05301483 return
1484 }
1485
1486 healthcheckManagingState = true
1487
1488 // Set up the health check helper functions.
1489 currentTr := ac.transport
Abhay Kumara61c5222025-11-10 07:32:50 +00001490 newStream := func(method string) (any, error) {
Abhilash S.L3b494632019-07-16 15:51:09 +05301491 ac.mu.Lock()
1492 if ac.transport != currentTr {
1493 ac.mu.Unlock()
1494 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1495 }
1496 ac.mu.Unlock()
1497 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1498 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001499 setConnectivityState := func(s connectivity.State, lastErr error) {
William Kurkianea869482019-04-09 15:16:11 -04001500 ac.mu.Lock()
1501 defer ac.mu.Unlock()
Abhilash S.L3b494632019-07-16 15:51:09 +05301502 if ac.transport != currentTr {
William Kurkianea869482019-04-09 15:16:11 -04001503 return
1504 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001505 ac.updateConnectivityState(s, lastErr)
William Kurkianea869482019-04-09 15:16:11 -04001506 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301507 // Start the health checking stream.
1508 go func() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001509 err := healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
Abhilash S.L3b494632019-07-16 15:51:09 +05301510 if err != nil {
1511 if status.Code(err) == codes.Unimplemented {
Abhay Kumara61c5222025-11-10 07:32:50 +00001512 channelz.Error(logger, ac.channelz, "Subchannel health check is unimplemented at server side, thus health check is disabled")
Abhilash S.L3b494632019-07-16 15:51:09 +05301513 } else {
Abhay Kumara61c5222025-11-10 07:32:50 +00001514 channelz.Errorf(logger, ac.channelz, "Health checking failed: %v", err)
William Kurkianea869482019-04-09 15:16:11 -04001515 }
William Kurkianea869482019-04-09 15:16:11 -04001516 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301517 }()
William Kurkianea869482019-04-09 15:16:11 -04001518}
1519
1520func (ac *addrConn) resetConnectBackoff() {
1521 ac.mu.Lock()
1522 close(ac.resetBackoff)
1523 ac.backoffIdx = 0
1524 ac.resetBackoff = make(chan struct{})
1525 ac.mu.Unlock()
1526}
1527
Abhay Kumara61c5222025-11-10 07:32:50 +00001528// getReadyTransport returns the transport if ac's state is READY or nil if not.
1529func (ac *addrConn) getReadyTransport() transport.ClientTransport {
William Kurkianea869482019-04-09 15:16:11 -04001530 ac.mu.Lock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001531 defer ac.mu.Unlock()
1532 if ac.state == connectivity.Ready {
1533 return ac.transport
William Kurkianea869482019-04-09 15:16:11 -04001534 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001535 return nil
William Kurkianea869482019-04-09 15:16:11 -04001536}
1537
1538// tearDown starts to tear down the addrConn.
Abhay Kumara61c5222025-11-10 07:32:50 +00001539//
1540// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
1541// will leak. In most cases, call cc.removeAddrConn() instead.
William Kurkianea869482019-04-09 15:16:11 -04001542func (ac *addrConn) tearDown(err error) {
1543 ac.mu.Lock()
1544 if ac.state == connectivity.Shutdown {
1545 ac.mu.Unlock()
1546 return
1547 }
1548 curTr := ac.transport
1549 ac.transport = nil
1550 // We have to set the state to Shutdown before anything else to prevent races
Devmalya Pauldd23a992019-11-14 07:06:31 +00001551 // between setting the state and logic that waits on context cancellation / etc.
Abhay Kumara61c5222025-11-10 07:32:50 +00001552 ac.updateConnectivityState(connectivity.Shutdown, nil)
William Kurkianea869482019-04-09 15:16:11 -04001553 ac.cancel()
William Kurkianea869482019-04-09 15:16:11 -04001554 ac.curAddr = resolver.Address{}
Abhay Kumara61c5222025-11-10 07:32:50 +00001555
1556 channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
1557 Desc: "Subchannel deleted",
1558 Severity: channelz.CtInfo,
1559 Parent: &channelz.TraceEvent{
1560 Desc: fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelz.ID),
1561 Severity: channelz.CtInfo,
1562 },
1563 })
1564 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
1565 // trace reference to the entity being deleted, and thus prevent it from
1566 // being deleted right away.
1567 channelz.RemoveEntry(ac.channelz.ID)
William Kurkianea869482019-04-09 15:16:11 -04001568 ac.mu.Unlock()
William Kurkianea869482019-04-09 15:16:11 -04001569
Abhay Kumara61c5222025-11-10 07:32:50 +00001570 // We have to release the lock before the call to GracefulClose/Close here
1571 // because both of them call onClose(), which requires locking ac.mu.
1572 if curTr != nil {
1573 if err == errConnDrain {
1574 // Close the transport gracefully when the subConn is being shutdown.
1575 //
1576 // GracefulClose() may be executed multiple times if:
1577 // - multiple GoAway frames are received from the server
1578 // - there are concurrent name resolver or balancer triggered
1579 // address removal and GoAway
1580 curTr.GracefulClose()
1581 } else {
1582 // Hard close the transport when the channel is entering idle or is
1583 // being shutdown. In the case where the channel is being shutdown,
1584 // closing of transports is also taken care of by cancellation of cc.ctx.
1585 // But in the case where the channel is entering idle, we need to
1586 // explicitly close the transports here. Instead of distinguishing
1587 // between these two cases, it is simpler to close the transport
1588 // unconditionally here.
1589 curTr.Close(err)
1590 }
William Kurkianea869482019-04-09 15:16:11 -04001591 }
1592}
1593
William Kurkianea869482019-04-09 15:16:11 -04001594type retryThrottler struct {
1595 max float64
1596 thresh float64
1597 ratio float64
1598
1599 mu sync.Mutex
1600 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1601}
1602
1603// throttle subtracts a retry token from the pool and returns whether a retry
1604// should be throttled (disallowed) based upon the retry throttling policy in
1605// the service config.
1606func (rt *retryThrottler) throttle() bool {
1607 if rt == nil {
1608 return false
1609 }
1610 rt.mu.Lock()
1611 defer rt.mu.Unlock()
1612 rt.tokens--
1613 if rt.tokens < 0 {
1614 rt.tokens = 0
1615 }
1616 return rt.tokens <= rt.thresh
1617}
1618
1619func (rt *retryThrottler) successfulRPC() {
1620 if rt == nil {
1621 return
1622 }
1623 rt.mu.Lock()
1624 defer rt.mu.Unlock()
1625 rt.tokens += rt.ratio
1626 if rt.tokens > rt.max {
1627 rt.tokens = rt.max
1628 }
1629}
1630
Abhay Kumara61c5222025-11-10 07:32:50 +00001631func (ac *addrConn) incrCallsStarted() {
1632 ac.channelz.ChannelMetrics.CallsStarted.Add(1)
1633 ac.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
William Kurkianea869482019-04-09 15:16:11 -04001634}
1635
Abhay Kumara61c5222025-11-10 07:32:50 +00001636func (ac *addrConn) incrCallsSucceeded() {
1637 ac.channelz.ChannelMetrics.CallsSucceeded.Add(1)
1638}
1639
1640func (ac *addrConn) incrCallsFailed() {
1641 ac.channelz.ChannelMetrics.CallsFailed.Add(1)
William Kurkianea869482019-04-09 15:16:11 -04001642}
1643
1644// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1645// underlying connections within the specified timeout.
1646//
1647// Deprecated: This error is never returned by grpc and should not be
1648// referenced by users.
1649var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
Abhay Kumara61c5222025-11-10 07:32:50 +00001650
1651// getResolver finds the scheme in the cc's resolvers or the global registry.
1652// scheme should always be lowercase (typically by virtue of url.Parse()
1653// performing proper RFC3986 behavior).
1654func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1655 for _, rb := range cc.dopts.resolvers {
1656 if scheme == rb.Scheme() {
1657 return rb
1658 }
1659 }
1660 return resolver.Get(scheme)
1661}
1662
1663func (cc *ClientConn) updateConnectionError(err error) {
1664 cc.lceMu.Lock()
1665 cc.lastConnectionError = err
1666 cc.lceMu.Unlock()
1667}
1668
1669func (cc *ClientConn) connectionError() error {
1670 cc.lceMu.Lock()
1671 defer cc.lceMu.Unlock()
1672 return cc.lastConnectionError
1673}
1674
1675// initParsedTargetAndResolverBuilder parses the user's dial target and stores
1676// the parsed target in `cc.parsedTarget`.
1677//
1678// The resolver to use is determined based on the scheme in the parsed target
1679// and the same is stored in `cc.resolverBuilder`.
1680//
1681// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
1682func (cc *ClientConn) initParsedTargetAndResolverBuilder() error {
1683 logger.Infof("original dial target is: %q", cc.target)
1684
1685 var rb resolver.Builder
1686 parsedTarget, err := parseTarget(cc.target)
1687 if err == nil {
1688 rb = cc.getResolver(parsedTarget.URL.Scheme)
1689 if rb != nil {
1690 cc.parsedTarget = parsedTarget
1691 cc.resolverBuilder = rb
1692 return nil
1693 }
1694 }
1695
1696 // We are here because the user's dial target did not contain a scheme or
1697 // specified an unregistered scheme. We should fallback to the default
1698 // scheme, except when a custom dialer is specified in which case, we should
1699 // always use passthrough scheme. For either case, we need to respect any overridden
1700 // global defaults set by the user.
1701 defScheme := cc.dopts.defaultScheme
1702 if internal.UserSetDefaultScheme {
1703 defScheme = resolver.GetDefaultScheme()
1704 }
1705
1706 canonicalTarget := defScheme + ":///" + cc.target
1707
1708 parsedTarget, err = parseTarget(canonicalTarget)
1709 if err != nil {
1710 return err
1711 }
1712 rb = cc.getResolver(parsedTarget.URL.Scheme)
1713 if rb == nil {
1714 return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
1715 }
1716 cc.parsedTarget = parsedTarget
1717 cc.resolverBuilder = rb
1718 return nil
1719}
1720
1721// parseTarget uses RFC 3986 semantics to parse the given target into a
1722// resolver.Target struct containing url. Query params are stripped from the
1723// endpoint.
1724func parseTarget(target string) (resolver.Target, error) {
1725 u, err := url.Parse(target)
1726 if err != nil {
1727 return resolver.Target{}, err
1728 }
1729
1730 return resolver.Target{URL: *u}, nil
1731}
1732
1733// encodeAuthority escapes the authority string based on valid chars defined in
1734// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
1735func encodeAuthority(authority string) string {
1736 const upperhex = "0123456789ABCDEF"
1737
1738 // Return for characters that must be escaped as per
1739 // Valid chars are mentioned here:
1740 // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2
1741 shouldEscape := func(c byte) bool {
1742 // Alphanum are always allowed.
1743 if 'a' <= c && c <= 'z' || 'A' <= c && c <= 'Z' || '0' <= c && c <= '9' {
1744 return false
1745 }
1746 switch c {
1747 case '-', '_', '.', '~': // Unreserved characters
1748 return false
1749 case '!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=': // Subdelim characters
1750 return false
1751 case ':', '[', ']', '@': // Authority related delimiters
1752 return false
1753 }
1754 // Everything else must be escaped.
1755 return true
1756 }
1757
1758 hexCount := 0
1759 for i := 0; i < len(authority); i++ {
1760 c := authority[i]
1761 if shouldEscape(c) {
1762 hexCount++
1763 }
1764 }
1765
1766 if hexCount == 0 {
1767 return authority
1768 }
1769
1770 required := len(authority) + 2*hexCount
1771 t := make([]byte, required)
1772
1773 j := 0
1774 // This logic is a barebones version of escape in the go net/url library.
1775 for i := 0; i < len(authority); i++ {
1776 switch c := authority[i]; {
1777 case shouldEscape(c):
1778 t[j] = '%'
1779 t[j+1] = upperhex[c>>4]
1780 t[j+2] = upperhex[c&15]
1781 j += 3
1782 default:
1783 t[j] = authority[i]
1784 j++
1785 }
1786 }
1787 return string(t)
1788}
1789
1790// Determine channel authority. The order of precedence is as follows:
1791// - user specified authority override using `WithAuthority` dial option
1792// - creds' notion of server name for the authentication handshake
1793// - endpoint from dial target of the form "scheme://[authority]/endpoint"
1794//
1795// Stores the determined authority in `cc.authority`.
1796//
1797// Returns a non-nil error if the authority returned by the transport
1798// credentials do not match the authority configured through the dial option.
1799//
1800// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
1801func (cc *ClientConn) initAuthority() error {
1802 dopts := cc.dopts
1803 // Historically, we had two options for users to specify the serverName or
1804 // authority for a channel. One was through the transport credentials
1805 // (either in its constructor, or through the OverrideServerName() method).
1806 // The other option (for cases where WithInsecure() dial option was used)
1807 // was to use the WithAuthority() dial option.
1808 //
1809 // A few things have changed since:
1810 // - `insecure` package with an implementation of the `TransportCredentials`
1811 // interface for the insecure case
1812 // - WithAuthority() dial option support for secure credentials
1813 authorityFromCreds := ""
1814 if creds := dopts.copts.TransportCredentials; creds != nil && creds.Info().ServerName != "" {
1815 authorityFromCreds = creds.Info().ServerName
1816 }
1817 authorityFromDialOption := dopts.authority
1818 if (authorityFromCreds != "" && authorityFromDialOption != "") && authorityFromCreds != authorityFromDialOption {
1819 return fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption)
1820 }
1821
1822 endpoint := cc.parsedTarget.Endpoint()
1823 if authorityFromDialOption != "" {
1824 cc.authority = authorityFromDialOption
1825 } else if authorityFromCreds != "" {
1826 cc.authority = authorityFromCreds
1827 } else if auth, ok := cc.resolverBuilder.(resolver.AuthorityOverrider); ok {
1828 cc.authority = auth.OverrideAuthority(cc.parsedTarget)
1829 } else if strings.HasPrefix(endpoint, ":") {
1830 cc.authority = "localhost" + encodeAuthority(endpoint)
1831 } else {
1832 cc.authority = encodeAuthority(endpoint)
1833 }
1834 return nil
1835}