blob: b4bc3a2bf368e8b9c51072a84795f999863ae214 [file] [log] [blame]
Abhay Kumarfe505f22025-11-10 14:16:31 +00001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
bseenivadd66c362026-02-12 19:13:26 +053019// Package pickfirst contains the pick_first load balancing policy which
20// is the universal leaf policy.
Abhay Kumarfe505f22025-11-10 14:16:31 +000021package pickfirst
22
23import (
24 "encoding/json"
25 "errors"
26 "fmt"
bseenivadd66c362026-02-12 19:13:26 +053027 "net"
28 "net/netip"
29 "sync"
30 "time"
Abhay Kumarfe505f22025-11-10 14:16:31 +000031
32 "google.golang.org/grpc/balancer"
33 "google.golang.org/grpc/balancer/pickfirst/internal"
34 "google.golang.org/grpc/connectivity"
bseenivadd66c362026-02-12 19:13:26 +053035 expstats "google.golang.org/grpc/experimental/stats"
Abhay Kumarfe505f22025-11-10 14:16:31 +000036 "google.golang.org/grpc/grpclog"
Abhay Kumarfe505f22025-11-10 14:16:31 +000037 internalgrpclog "google.golang.org/grpc/internal/grpclog"
38 "google.golang.org/grpc/internal/pretty"
39 "google.golang.org/grpc/resolver"
40 "google.golang.org/grpc/serviceconfig"
Abhay Kumarfe505f22025-11-10 14:16:31 +000041)
42
43func init() {
Abhay Kumarfe505f22025-11-10 14:16:31 +000044 balancer.Register(pickfirstBuilder{})
45}
46
bseenivadd66c362026-02-12 19:13:26 +053047// Name is the name of the pick_first balancer.
48const Name = "pick_first"
49
50// enableHealthListenerKeyType is a unique key type used in resolver
51// attributes to indicate whether the health listener usage is enabled.
52type enableHealthListenerKeyType struct{}
53
54var (
55 logger = grpclog.Component("pick-first-leaf-lb")
56 disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
57 Name: "grpc.lb.pick_first.disconnections",
58 Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
59 Unit: "{disconnection}",
60 Labels: []string{"grpc.target"},
61 Default: false,
62 })
63 connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
64 Name: "grpc.lb.pick_first.connection_attempts_succeeded",
65 Description: "EXPERIMENTAL. Number of successful connection attempts.",
66 Unit: "{attempt}",
67 Labels: []string{"grpc.target"},
68 Default: false,
69 })
70 connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
71 Name: "grpc.lb.pick_first.connection_attempts_failed",
72 Description: "EXPERIMENTAL. Number of failed connection attempts.",
73 Unit: "{attempt}",
74 Labels: []string{"grpc.target"},
75 Default: false,
76 })
77)
Abhay Kumarfe505f22025-11-10 14:16:31 +000078
79const (
bseenivadd66c362026-02-12 19:13:26 +053080 // TODO: change to pick-first when this becomes the default pick_first policy.
81 logPrefix = "[pick-first-leaf-lb %p] "
82 // connectionDelayInterval is the time to wait for during the happy eyeballs
83 // pass before starting the next connection attempt.
84 connectionDelayInterval = 250 * time.Millisecond
85)
86
87type ipAddrFamily int
88
89const (
90 // ipAddrFamilyUnknown represents strings that can't be parsed as an IP
91 // address.
92 ipAddrFamilyUnknown ipAddrFamily = iota
93 ipAddrFamilyV4
94 ipAddrFamilyV6
Abhay Kumarfe505f22025-11-10 14:16:31 +000095)
96
97type pickfirstBuilder struct{}
98
bseenivadd66c362026-02-12 19:13:26 +053099func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions) balancer.Balancer {
100 b := &pickfirstBalancer{
101 cc: cc,
102 target: bo.Target.String(),
103 metricsRecorder: cc.MetricsRecorder(),
104
105 subConns: resolver.NewAddressMapV2[*scData](),
106 state: connectivity.Connecting,
107 cancelConnectionTimer: func() {},
108 }
Abhay Kumarfe505f22025-11-10 14:16:31 +0000109 b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
110 return b
111}
112
bseenivadd66c362026-02-12 19:13:26 +0530113func (b pickfirstBuilder) Name() string {
Abhay Kumarfe505f22025-11-10 14:16:31 +0000114 return Name
115}
116
bseenivadd66c362026-02-12 19:13:26 +0530117func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
118 var cfg pfConfig
119 if err := json.Unmarshal(js, &cfg); err != nil {
120 return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
121 }
122 return cfg, nil
123}
124
125// EnableHealthListener updates the state to configure pickfirst for using a
126// generic health listener.
127//
128// # Experimental
129//
130// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
131// release.
132func EnableHealthListener(state resolver.State) resolver.State {
133 state.Attributes = state.Attributes.WithValue(enableHealthListenerKeyType{}, true)
134 return state
135}
136
Abhay Kumarfe505f22025-11-10 14:16:31 +0000137type pfConfig struct {
138 serviceconfig.LoadBalancingConfig `json:"-"`
139
140 // If set to true, instructs the LB policy to shuffle the order of the list
141 // of endpoints received from the name resolver before attempting to
142 // connect to them.
143 ShuffleAddressList bool `json:"shuffleAddressList"`
144}
145
bseenivadd66c362026-02-12 19:13:26 +0530146// scData keeps track of the current state of the subConn.
147// It is not safe for concurrent access.
148type scData struct {
149 // The following fields are initialized at build time and read-only after
150 // that.
151 subConn balancer.SubConn
152 addr resolver.Address
153
154 rawConnectivityState connectivity.State
155 // The effective connectivity state based on raw connectivity, health state
156 // and after following sticky TransientFailure behaviour defined in A62.
157 effectiveState connectivity.State
158 lastErr error
159 connectionFailedInFirstPass bool
160}
161
162func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
163 sd := &scData{
164 rawConnectivityState: connectivity.Idle,
165 effectiveState: connectivity.Idle,
166 addr: addr,
Abhay Kumarfe505f22025-11-10 14:16:31 +0000167 }
bseenivadd66c362026-02-12 19:13:26 +0530168 sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
169 StateListener: func(state balancer.SubConnState) {
170 b.updateSubConnState(sd, state)
171 },
172 })
173 if err != nil {
174 return nil, err
175 }
176 sd.subConn = sc
177 return sd, nil
Abhay Kumarfe505f22025-11-10 14:16:31 +0000178}
179
180type pickfirstBalancer struct {
bseenivadd66c362026-02-12 19:13:26 +0530181 // The following fields are initialized at build time and read-only after
182 // that and therefore do not need to be guarded by a mutex.
183 logger *internalgrpclog.PrefixLogger
184 cc balancer.ClientConn
185 target string
186 metricsRecorder expstats.MetricsRecorder // guaranteed to be non nil
187
188 // The mutex is used to ensure synchronization of updates triggered
189 // from the idle picker and the already serialized resolver,
190 // SubConn state updates.
191 mu sync.Mutex
192 // State reported to the channel based on SubConn states and resolver
193 // updates.
194 state connectivity.State
195 // scData for active subonns mapped by address.
196 subConns *resolver.AddressMapV2[*scData]
197 addressList addressList
198 firstPass bool
199 numTF int
200 cancelConnectionTimer func()
201 healthCheckingEnabled bool
Abhay Kumarfe505f22025-11-10 14:16:31 +0000202}
203
bseenivadd66c362026-02-12 19:13:26 +0530204// ResolverError is called by the ClientConn when the name resolver produces
205// an error or when pickfirst determined the resolver update to be invalid.
Abhay Kumarfe505f22025-11-10 14:16:31 +0000206func (b *pickfirstBalancer) ResolverError(err error) {
bseenivadd66c362026-02-12 19:13:26 +0530207 b.mu.Lock()
208 defer b.mu.Unlock()
209 b.resolverErrorLocked(err)
210}
211
212func (b *pickfirstBalancer) resolverErrorLocked(err error) {
Abhay Kumarfe505f22025-11-10 14:16:31 +0000213 if b.logger.V(2) {
214 b.logger.Infof("Received error from the name resolver: %v", err)
215 }
Abhay Kumarfe505f22025-11-10 14:16:31 +0000216
bseenivadd66c362026-02-12 19:13:26 +0530217 // The picker will not change since the balancer does not currently
218 // report an error. If the balancer hasn't received a single good resolver
219 // update yet, transition to TRANSIENT_FAILURE.
220 if b.state != connectivity.TransientFailure && b.addressList.size() > 0 {
221 if b.logger.V(2) {
222 b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.")
223 }
Abhay Kumarfe505f22025-11-10 14:16:31 +0000224 return
225 }
bseenivadd66c362026-02-12 19:13:26 +0530226
227 b.updateBalancerState(balancer.State{
Abhay Kumarfe505f22025-11-10 14:16:31 +0000228 ConnectivityState: connectivity.TransientFailure,
229 Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
230 })
231}
232
Abhay Kumarfe505f22025-11-10 14:16:31 +0000233func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
bseenivadd66c362026-02-12 19:13:26 +0530234 b.mu.Lock()
235 defer b.mu.Unlock()
236 b.cancelConnectionTimer()
Abhay Kumarfe505f22025-11-10 14:16:31 +0000237 if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
bseenivadd66c362026-02-12 19:13:26 +0530238 // Cleanup state pertaining to the previous resolver state.
239 // Treat an empty address list like an error by calling b.ResolverError.
240 b.closeSubConnsLocked()
241 b.addressList.updateAddrs(nil)
242 b.resolverErrorLocked(errors.New("produced zero addresses"))
Abhay Kumarfe505f22025-11-10 14:16:31 +0000243 return balancer.ErrBadResolverState
244 }
bseenivadd66c362026-02-12 19:13:26 +0530245 b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKeyType{}) != nil
Abhay Kumarfe505f22025-11-10 14:16:31 +0000246 cfg, ok := state.BalancerConfig.(pfConfig)
247 if state.BalancerConfig != nil && !ok {
bseenivadd66c362026-02-12 19:13:26 +0530248 return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState)
Abhay Kumarfe505f22025-11-10 14:16:31 +0000249 }
250
251 if b.logger.V(2) {
252 b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))
253 }
254
bseenivadd66c362026-02-12 19:13:26 +0530255 var newAddrs []resolver.Address
Abhay Kumarfe505f22025-11-10 14:16:31 +0000256 if endpoints := state.ResolverState.Endpoints; len(endpoints) != 0 {
bseenivadd66c362026-02-12 19:13:26 +0530257 // Perform the optional shuffling described in gRFC A62. The shuffling
258 // will change the order of endpoints but not touch the order of the
259 // addresses within each endpoint. - A61
Abhay Kumarfe505f22025-11-10 14:16:31 +0000260 if cfg.ShuffleAddressList {
261 endpoints = append([]resolver.Endpoint{}, endpoints...)
262 internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
263 }
264
bseenivadd66c362026-02-12 19:13:26 +0530265 // "Flatten the list by concatenating the ordered list of addresses for
266 // each of the endpoints, in order." - A61
Abhay Kumarfe505f22025-11-10 14:16:31 +0000267 for _, endpoint := range endpoints {
bseenivadd66c362026-02-12 19:13:26 +0530268 newAddrs = append(newAddrs, endpoint.Addresses...)
Abhay Kumarfe505f22025-11-10 14:16:31 +0000269 }
270 } else {
271 // Endpoints not set, process addresses until we migrate resolver
272 // emissions fully to Endpoints. The top channel does wrap emitted
273 // addresses with endpoints, however some balancers such as weighted
274 // target do not forward the corresponding correct endpoints down/split
275 // endpoints properly. Once all balancers correctly forward endpoints
276 // down, can delete this else conditional.
bseenivadd66c362026-02-12 19:13:26 +0530277 newAddrs = state.ResolverState.Addresses
Abhay Kumarfe505f22025-11-10 14:16:31 +0000278 if cfg.ShuffleAddressList {
bseenivadd66c362026-02-12 19:13:26 +0530279 newAddrs = append([]resolver.Address{}, newAddrs...)
280 internal.RandShuffle(len(newAddrs), func(i, j int) { newAddrs[i], newAddrs[j] = newAddrs[j], newAddrs[i] })
Abhay Kumarfe505f22025-11-10 14:16:31 +0000281 }
282 }
283
bseenivadd66c362026-02-12 19:13:26 +0530284 // If an address appears in multiple endpoints or in the same endpoint
285 // multiple times, we keep it only once. We will create only one SubConn
286 // for the address because an AddressMap is used to store SubConns.
287 // Not de-duplicating would result in attempting to connect to the same
288 // SubConn multiple times in the same pass. We don't want this.
289 newAddrs = deDupAddresses(newAddrs)
290 newAddrs = interleaveAddresses(newAddrs)
291
292 prevAddr := b.addressList.currentAddress()
293 prevSCData, found := b.subConns.Get(prevAddr)
294 prevAddrsCount := b.addressList.size()
295 isPrevRawConnectivityStateReady := found && prevSCData.rawConnectivityState == connectivity.Ready
296 b.addressList.updateAddrs(newAddrs)
297
298 // If the previous ready SubConn exists in new address list,
299 // keep this connection and don't create new SubConns.
300 if isPrevRawConnectivityStateReady && b.addressList.seekTo(prevAddr) {
Abhay Kumarfe505f22025-11-10 14:16:31 +0000301 return nil
302 }
303
bseenivadd66c362026-02-12 19:13:26 +0530304 b.reconcileSubConnsLocked(newAddrs)
305 // If it's the first resolver update or the balancer was already READY
306 // (but the new address list does not contain the ready SubConn) or
307 // CONNECTING, enter CONNECTING.
308 // We may be in TRANSIENT_FAILURE due to a previous empty address list,
309 // we should still enter CONNECTING because the sticky TF behaviour
310 // mentioned in A62 applies only when the TRANSIENT_FAILURE is reported
311 // due to connectivity failures.
312 if isPrevRawConnectivityStateReady || b.state == connectivity.Connecting || prevAddrsCount == 0 {
313 // Start connection attempt at first address.
314 b.forceUpdateConcludedStateLocked(balancer.State{
315 ConnectivityState: connectivity.Connecting,
316 Picker: &picker{err: balancer.ErrNoSubConnAvailable},
Abhay Kumarfe505f22025-11-10 14:16:31 +0000317 })
bseenivadd66c362026-02-12 19:13:26 +0530318 b.startFirstPassLocked()
319 } else if b.state == connectivity.TransientFailure {
320 // If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
321 // we're READY. See A62.
322 b.startFirstPassLocked()
Abhay Kumarfe505f22025-11-10 14:16:31 +0000323 }
Abhay Kumarfe505f22025-11-10 14:16:31 +0000324 return nil
325}
326
327// UpdateSubConnState is unused as a StateListener is always registered when
328// creating SubConns.
329func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
330 b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
331}
332
bseenivadd66c362026-02-12 19:13:26 +0530333func (b *pickfirstBalancer) Close() {
334 b.mu.Lock()
335 defer b.mu.Unlock()
336 b.closeSubConnsLocked()
337 b.cancelConnectionTimer()
338 b.state = connectivity.Shutdown
339}
Abhay Kumarfe505f22025-11-10 14:16:31 +0000340
bseenivadd66c362026-02-12 19:13:26 +0530341// ExitIdle moves the balancer out of idle state. It can be called concurrently
342// by the idlePicker and clientConn so access to variables should be
343// synchronized.
344func (b *pickfirstBalancer) ExitIdle() {
345 b.mu.Lock()
346 defer b.mu.Unlock()
347 if b.state == connectivity.Idle {
348 // Move the balancer into CONNECTING state immediately. This is done to
349 // avoid staying in IDLE if a resolver update arrives before the first
350 // SubConn reports CONNECTING.
351 b.updateBalancerState(balancer.State{
352 ConnectivityState: connectivity.Connecting,
Abhay Kumarfe505f22025-11-10 14:16:31 +0000353 Picker: &picker{err: balancer.ErrNoSubConnAvailable},
354 })
bseenivadd66c362026-02-12 19:13:26 +0530355 b.startFirstPassLocked()
356 }
357}
358
359func (b *pickfirstBalancer) startFirstPassLocked() {
360 b.firstPass = true
361 b.numTF = 0
362 // Reset the connection attempt record for existing SubConns.
363 for _, sd := range b.subConns.Values() {
364 sd.connectionFailedInFirstPass = false
365 }
366 b.requestConnectionLocked()
367}
368
369func (b *pickfirstBalancer) closeSubConnsLocked() {
370 for _, sd := range b.subConns.Values() {
371 sd.subConn.Shutdown()
372 }
373 b.subConns = resolver.NewAddressMapV2[*scData]()
374}
375
376// deDupAddresses ensures that each address appears only once in the slice.
377func deDupAddresses(addrs []resolver.Address) []resolver.Address {
378 seenAddrs := resolver.NewAddressMapV2[bool]()
379 retAddrs := []resolver.Address{}
380
381 for _, addr := range addrs {
382 if _, ok := seenAddrs.Get(addr); ok {
383 continue
384 }
385 seenAddrs.Set(addr, true)
386 retAddrs = append(retAddrs, addr)
387 }
388 return retAddrs
389}
390
391// interleaveAddresses interleaves addresses of both families (IPv4 and IPv6)
392// as per RFC-8305 section 4.
393// Whichever address family is first in the list is followed by an address of
394// the other address family; that is, if the first address in the list is IPv6,
395// then the first IPv4 address should be moved up in the list to be second in
396// the list. It doesn't support configuring "First Address Family Count", i.e.
397// there will always be a single member of the first address family at the
398// beginning of the interleaved list.
399// Addresses that are neither IPv4 nor IPv6 are treated as part of a third
400// "unknown" family for interleaving.
401// See: https://datatracker.ietf.org/doc/html/rfc8305#autoid-6
402func interleaveAddresses(addrs []resolver.Address) []resolver.Address {
403 familyAddrsMap := map[ipAddrFamily][]resolver.Address{}
404 interleavingOrder := []ipAddrFamily{}
405 for _, addr := range addrs {
406 family := addressFamily(addr.Addr)
407 if _, found := familyAddrsMap[family]; !found {
408 interleavingOrder = append(interleavingOrder, family)
409 }
410 familyAddrsMap[family] = append(familyAddrsMap[family], addr)
411 }
412
413 interleavedAddrs := make([]resolver.Address, 0, len(addrs))
414
415 for curFamilyIdx := 0; len(interleavedAddrs) < len(addrs); curFamilyIdx = (curFamilyIdx + 1) % len(interleavingOrder) {
416 // Some IP types may have fewer addresses than others, so we look for
417 // the next type that has a remaining member to add to the interleaved
418 // list.
419 family := interleavingOrder[curFamilyIdx]
420 remainingMembers := familyAddrsMap[family]
421 if len(remainingMembers) > 0 {
422 interleavedAddrs = append(interleavedAddrs, remainingMembers[0])
423 familyAddrsMap[family] = remainingMembers[1:]
424 }
425 }
426
427 return interleavedAddrs
428}
429
430// addressFamily returns the ipAddrFamily after parsing the address string.
431// If the address isn't of the format "ip-address:port", it returns
432// ipAddrFamilyUnknown. The address may be valid even if it's not an IP when
433// using a resolver like passthrough where the address may be a hostname in
434// some format that the dialer can resolve.
435func addressFamily(address string) ipAddrFamily {
436 // Parse the IP after removing the port.
437 host, _, err := net.SplitHostPort(address)
438 if err != nil {
439 return ipAddrFamilyUnknown
440 }
441 ip, err := netip.ParseAddr(host)
442 if err != nil {
443 return ipAddrFamilyUnknown
444 }
445 switch {
446 case ip.Is4() || ip.Is4In6():
447 return ipAddrFamilyV4
448 case ip.Is6():
449 return ipAddrFamilyV6
450 default:
451 return ipAddrFamilyUnknown
452 }
453}
454
455// reconcileSubConnsLocked updates the active subchannels based on a new address
456// list from the resolver. It does this by:
457// - closing subchannels: any existing subchannels associated with addresses
458// that are no longer in the updated list are shut down.
459// - removing subchannels: entries for these closed subchannels are removed
460// from the subchannel map.
461//
462// This ensures that the subchannel map accurately reflects the current set of
463// addresses received from the name resolver.
464func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address) {
465 newAddrsMap := resolver.NewAddressMapV2[bool]()
466 for _, addr := range newAddrs {
467 newAddrsMap.Set(addr, true)
468 }
469
470 for _, oldAddr := range b.subConns.Keys() {
471 if _, ok := newAddrsMap.Get(oldAddr); ok {
472 continue
473 }
474 val, _ := b.subConns.Get(oldAddr)
475 val.subConn.Shutdown()
476 b.subConns.Delete(oldAddr)
477 }
478}
479
480// shutdownRemainingLocked shuts down remaining subConns. Called when a subConn
481// becomes ready, which means that all other subConn must be shutdown.
482func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) {
483 b.cancelConnectionTimer()
484 for _, sd := range b.subConns.Values() {
485 if sd.subConn != selected.subConn {
486 sd.subConn.Shutdown()
487 }
488 }
489 b.subConns = resolver.NewAddressMapV2[*scData]()
490 b.subConns.Set(selected.addr, selected)
491}
492
493// requestConnectionLocked starts connecting on the subchannel corresponding to
494// the current address. If no subchannel exists, one is created. If the current
495// subchannel is in TransientFailure, a connection to the next address is
496// attempted until a subchannel is found.
497func (b *pickfirstBalancer) requestConnectionLocked() {
498 if !b.addressList.isValid() {
499 return
500 }
501 var lastErr error
502 for valid := true; valid; valid = b.addressList.increment() {
503 curAddr := b.addressList.currentAddress()
504 sd, ok := b.subConns.Get(curAddr)
505 if !ok {
506 var err error
507 // We want to assign the new scData to sd from the outer scope,
508 // hence we can't use := below.
509 sd, err = b.newSCData(curAddr)
510 if err != nil {
511 // This should never happen, unless the clientConn is being shut
512 // down.
513 if b.logger.V(2) {
514 b.logger.Infof("Failed to create a subConn for address %v: %v", curAddr.String(), err)
515 }
516 // Do nothing, the LB policy will be closed soon.
517 return
518 }
519 b.subConns.Set(curAddr, sd)
520 }
521
522 switch sd.rawConnectivityState {
523 case connectivity.Idle:
524 sd.subConn.Connect()
525 b.scheduleNextConnectionLocked()
526 return
527 case connectivity.TransientFailure:
528 // The SubConn is being re-used and failed during a previous pass
529 // over the addressList. It has not completed backoff yet.
530 // Mark it as having failed and try the next address.
531 sd.connectionFailedInFirstPass = true
532 lastErr = sd.lastErr
533 continue
534 case connectivity.Connecting:
535 // Wait for the connection attempt to complete or the timer to fire
536 // before attempting the next address.
537 b.scheduleNextConnectionLocked()
538 return
539 default:
540 b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", sd.rawConnectivityState)
541 return
542
543 }
544 }
545
546 // All the remaining addresses in the list are in TRANSIENT_FAILURE, end the
547 // first pass if possible.
548 b.endFirstPassIfPossibleLocked(lastErr)
549}
550
551func (b *pickfirstBalancer) scheduleNextConnectionLocked() {
552 b.cancelConnectionTimer()
553 if !b.addressList.hasNext() {
554 return
555 }
556 curAddr := b.addressList.currentAddress()
557 cancelled := false // Access to this is protected by the balancer's mutex.
558 closeFn := internal.TimeAfterFunc(connectionDelayInterval, func() {
559 b.mu.Lock()
560 defer b.mu.Unlock()
561 // If the scheduled task is cancelled while acquiring the mutex, return.
562 if cancelled {
Abhay Kumarfe505f22025-11-10 14:16:31 +0000563 return
564 }
bseenivadd66c362026-02-12 19:13:26 +0530565 if b.logger.V(2) {
566 b.logger.Infof("Happy Eyeballs timer expired while waiting for connection to %q.", curAddr.Addr)
567 }
568 if b.addressList.increment() {
569 b.requestConnectionLocked()
570 }
571 })
572 // Access to the cancellation callback held by the balancer is guarded by
573 // the balancer's mutex, so it's safe to set the boolean from the callback.
574 b.cancelConnectionTimer = sync.OnceFunc(func() {
575 cancelled = true
576 closeFn()
577 })
578}
579
580func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
581 b.mu.Lock()
582 defer b.mu.Unlock()
583 oldState := sd.rawConnectivityState
584 sd.rawConnectivityState = newState.ConnectivityState
585 // Previously relevant SubConns can still callback with state updates.
586 // To prevent pickers from returning these obsolete SubConns, this logic
587 // is included to check if the current list of active SubConns includes this
588 // SubConn.
589 if !b.isActiveSCData(sd) {
590 return
591 }
592 if newState.ConnectivityState == connectivity.Shutdown {
593 sd.effectiveState = connectivity.Shutdown
594 return
595 }
596
597 // Record a connection attempt when exiting CONNECTING.
598 if newState.ConnectivityState == connectivity.TransientFailure {
599 sd.connectionFailedInFirstPass = true
600 connectionAttemptsFailedMetric.Record(b.metricsRecorder, 1, b.target)
601 }
602
603 if newState.ConnectivityState == connectivity.Ready {
604 connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target)
605 b.shutdownRemainingLocked(sd)
606 if !b.addressList.seekTo(sd.addr) {
607 // This should not fail as we should have only one SubConn after
608 // entering READY. The SubConn should be present in the addressList.
609 b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
610 return
611 }
612 if !b.healthCheckingEnabled {
613 if b.logger.V(2) {
614 b.logger.Infof("SubConn %p reported connectivity state READY and the health listener is disabled. Transitioning SubConn to READY.", sd.subConn)
615 }
616
617 sd.effectiveState = connectivity.Ready
618 b.updateBalancerState(balancer.State{
619 ConnectivityState: connectivity.Ready,
620 Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
621 })
622 return
623 }
624 if b.logger.V(2) {
625 b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn)
626 }
627 // Send a CONNECTING update to take the SubConn out of sticky-TF if
628 // required.
629 sd.effectiveState = connectivity.Connecting
630 b.updateBalancerState(balancer.State{
631 ConnectivityState: connectivity.Connecting,
632 Picker: &picker{err: balancer.ErrNoSubConnAvailable},
633 })
634 sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) {
635 b.updateSubConnHealthState(sd, scs)
636 })
637 return
638 }
639
640 // If the LB policy is READY, and it receives a subchannel state change,
641 // it means that the READY subchannel has failed.
642 // A SubConn can also transition from CONNECTING directly to IDLE when
643 // a transport is successfully created, but the connection fails
644 // before the SubConn can send the notification for READY. We treat
645 // this as a successful connection and transition to IDLE.
646 // TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
647 // part of the if condition below once the issue is fixed.
648 if oldState == connectivity.Ready || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
649 // Once a transport fails, the balancer enters IDLE and starts from
650 // the first address when the picker is used.
651 b.shutdownRemainingLocked(sd)
652 sd.effectiveState = newState.ConnectivityState
653 // READY SubConn interspliced in between CONNECTING and IDLE, need to
654 // account for that.
655 if oldState == connectivity.Connecting {
656 // A known issue (https://github.com/grpc/grpc-go/issues/7862)
657 // causes a race that prevents the READY state change notification.
658 // This works around it.
659 connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target)
660 }
661 disconnectionsMetric.Record(b.metricsRecorder, 1, b.target)
662 b.addressList.reset()
663 b.updateBalancerState(balancer.State{
664 ConnectivityState: connectivity.Idle,
665 Picker: &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)},
666 })
667 return
668 }
669
670 if b.firstPass {
671 switch newState.ConnectivityState {
672 case connectivity.Connecting:
673 // The effective state can be in either IDLE, CONNECTING or
674 // TRANSIENT_FAILURE. If it's TRANSIENT_FAILURE, stay in
675 // TRANSIENT_FAILURE until it's READY. See A62.
676 if sd.effectiveState != connectivity.TransientFailure {
677 sd.effectiveState = connectivity.Connecting
678 b.updateBalancerState(balancer.State{
679 ConnectivityState: connectivity.Connecting,
680 Picker: &picker{err: balancer.ErrNoSubConnAvailable},
681 })
682 }
683 case connectivity.TransientFailure:
684 sd.lastErr = newState.ConnectionError
685 sd.effectiveState = connectivity.TransientFailure
686 // Since we're re-using common SubConns while handling resolver
687 // updates, we could receive an out of turn TRANSIENT_FAILURE from
688 // a pass over the previous address list. Happy Eyeballs will also
689 // cause out of order updates to arrive.
690
691 if curAddr := b.addressList.currentAddress(); equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
692 b.cancelConnectionTimer()
693 if b.addressList.increment() {
694 b.requestConnectionLocked()
695 return
696 }
697 }
698
699 // End the first pass if we've seen a TRANSIENT_FAILURE from all
700 // SubConns once.
701 b.endFirstPassIfPossibleLocked(newState.ConnectionError)
702 }
703 return
704 }
705
706 // We have finished the first pass, keep re-connecting failing SubConns.
707 switch newState.ConnectivityState {
708 case connectivity.TransientFailure:
709 b.numTF = (b.numTF + 1) % b.subConns.Len()
710 sd.lastErr = newState.ConnectionError
711 if b.numTF%b.subConns.Len() == 0 {
712 b.updateBalancerState(balancer.State{
713 ConnectivityState: connectivity.TransientFailure,
714 Picker: &picker{err: newState.ConnectionError},
715 })
716 }
717 // We don't need to request re-resolution since the SubConn already
718 // does that before reporting TRANSIENT_FAILURE.
719 // TODO: #7534 - Move re-resolution requests from SubConn into
720 // pick_first.
721 case connectivity.Idle:
722 sd.subConn.Connect()
723 }
724}
725
726// endFirstPassIfPossibleLocked ends the first happy-eyeballs pass if all the
727// addresses are tried and their SubConns have reported a failure.
728func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
729 // An optimization to avoid iterating over the entire SubConn map.
730 if b.addressList.isValid() {
731 return
732 }
733 // Connect() has been called on all the SubConns. The first pass can be
734 // ended if all the SubConns have reported a failure.
735 for _, sd := range b.subConns.Values() {
736 if !sd.connectionFailedInFirstPass {
737 return
738 }
739 }
740 b.firstPass = false
741 b.updateBalancerState(balancer.State{
742 ConnectivityState: connectivity.TransientFailure,
743 Picker: &picker{err: lastErr},
744 })
745 // Start re-connecting all the SubConns that are already in IDLE.
746 for _, sd := range b.subConns.Values() {
747 if sd.rawConnectivityState == connectivity.Idle {
748 sd.subConn.Connect()
749 }
750 }
751}
752
753func (b *pickfirstBalancer) isActiveSCData(sd *scData) bool {
754 activeSD, found := b.subConns.Get(sd.addr)
755 return found && activeSD == sd
756}
757
758func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) {
759 b.mu.Lock()
760 defer b.mu.Unlock()
761 // Previously relevant SubConns can still callback with state updates.
762 // To prevent pickers from returning these obsolete SubConns, this logic
763 // is included to check if the current list of active SubConns includes
764 // this SubConn.
765 if !b.isActiveSCData(sd) {
766 return
767 }
768 sd.effectiveState = state.ConnectivityState
769 switch state.ConnectivityState {
770 case connectivity.Ready:
771 b.updateBalancerState(balancer.State{
772 ConnectivityState: connectivity.Ready,
773 Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
Abhay Kumarfe505f22025-11-10 14:16:31 +0000774 })
775 case connectivity.TransientFailure:
bseenivadd66c362026-02-12 19:13:26 +0530776 b.updateBalancerState(balancer.State{
777 ConnectivityState: connectivity.TransientFailure,
778 Picker: &picker{err: fmt.Errorf("pickfirst: health check failure: %v", state.ConnectionError)},
Abhay Kumarfe505f22025-11-10 14:16:31 +0000779 })
bseenivadd66c362026-02-12 19:13:26 +0530780 case connectivity.Connecting:
781 b.updateBalancerState(balancer.State{
782 ConnectivityState: connectivity.Connecting,
783 Picker: &picker{err: balancer.ErrNoSubConnAvailable},
784 })
785 default:
786 b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state)
Abhay Kumarfe505f22025-11-10 14:16:31 +0000787 }
Abhay Kumarfe505f22025-11-10 14:16:31 +0000788}
789
bseenivadd66c362026-02-12 19:13:26 +0530790// updateBalancerState stores the state reported to the channel and calls
791// ClientConn.UpdateState(). As an optimization, it avoids sending duplicate
792// updates to the channel.
793func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) {
794 // In case of TransientFailures allow the picker to be updated to update
795 // the connectivity error, in all other cases don't send duplicate state
796 // updates.
797 if newState.ConnectivityState == b.state && b.state != connectivity.TransientFailure {
798 return
799 }
800 b.forceUpdateConcludedStateLocked(newState)
Abhay Kumarfe505f22025-11-10 14:16:31 +0000801}
802
bseenivadd66c362026-02-12 19:13:26 +0530803// forceUpdateConcludedStateLocked stores the state reported to the channel and
804// calls ClientConn.UpdateState().
805// A separate function is defined to force update the ClientConn state since the
806// channel doesn't correctly assume that LB policies start in CONNECTING and
807// relies on LB policy to send an initial CONNECTING update.
808func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) {
809 b.state = newState.ConnectivityState
810 b.cc.UpdateState(newState)
Abhay Kumarfe505f22025-11-10 14:16:31 +0000811}
812
813type picker struct {
814 result balancer.PickResult
815 err error
816}
817
818func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
819 return p.result, p.err
820}
821
822// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
823// CONNECTING when Pick is called.
824type idlePicker struct {
bseenivadd66c362026-02-12 19:13:26 +0530825 exitIdle func()
Abhay Kumarfe505f22025-11-10 14:16:31 +0000826}
827
828func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
bseenivadd66c362026-02-12 19:13:26 +0530829 i.exitIdle()
Abhay Kumarfe505f22025-11-10 14:16:31 +0000830 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
831}
bseenivadd66c362026-02-12 19:13:26 +0530832
833// addressList manages sequentially iterating over addresses present in a list
834// of endpoints. It provides a 1 dimensional view of the addresses present in
835// the endpoints.
836// This type is not safe for concurrent access.
837type addressList struct {
838 addresses []resolver.Address
839 idx int
840}
841
842func (al *addressList) isValid() bool {
843 return al.idx < len(al.addresses)
844}
845
846func (al *addressList) size() int {
847 return len(al.addresses)
848}
849
850// increment moves to the next index in the address list.
851// This method returns false if it went off the list, true otherwise.
852func (al *addressList) increment() bool {
853 if !al.isValid() {
854 return false
855 }
856 al.idx++
857 return al.idx < len(al.addresses)
858}
859
860// currentAddress returns the current address pointed to in the addressList.
861// If the list is in an invalid state, it returns an empty address instead.
862func (al *addressList) currentAddress() resolver.Address {
863 if !al.isValid() {
864 return resolver.Address{}
865 }
866 return al.addresses[al.idx]
867}
868
869func (al *addressList) reset() {
870 al.idx = 0
871}
872
873func (al *addressList) updateAddrs(addrs []resolver.Address) {
874 al.addresses = addrs
875 al.reset()
876}
877
878// seekTo returns false if the needle was not found and the current index was
879// left unchanged.
880func (al *addressList) seekTo(needle resolver.Address) bool {
881 for ai, addr := range al.addresses {
882 if !equalAddressIgnoringBalAttributes(&addr, &needle) {
883 continue
884 }
885 al.idx = ai
886 return true
887 }
888 return false
889}
890
891// hasNext returns whether incrementing the addressList will result in moving
892// past the end of the list. If the list has already moved past the end, it
893// returns false.
894func (al *addressList) hasNext() bool {
895 if !al.isValid() {
896 return false
897 }
898 return al.idx+1 < len(al.addresses)
899}
900
901// equalAddressIgnoringBalAttributes returns true is a and b are considered
902// equal. This is different from the Equal method on the resolver.Address type
903// which considers all fields to determine equality. Here, we only consider
904// fields that are meaningful to the SubConn.
905func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
906 return a.Addr == b.Addr && a.ServerName == b.ServerName &&
907 a.Attributes.Equal(b.Attributes)
908}