| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 1 | /* |
| 2 | * |
| 3 | * Copyright 2017 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 19 | // Package pickfirst contains the pick_first load balancing policy which |
| 20 | // is the universal leaf policy. |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 21 | package pickfirst |
| 22 | |
| 23 | import ( |
| 24 | "encoding/json" |
| 25 | "errors" |
| 26 | "fmt" |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 27 | "net" |
| 28 | "net/netip" |
| 29 | "sync" |
| 30 | "time" |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 31 | |
| 32 | "google.golang.org/grpc/balancer" |
| 33 | "google.golang.org/grpc/balancer/pickfirst/internal" |
| 34 | "google.golang.org/grpc/connectivity" |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 35 | expstats "google.golang.org/grpc/experimental/stats" |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 36 | "google.golang.org/grpc/grpclog" |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 37 | internalgrpclog "google.golang.org/grpc/internal/grpclog" |
| 38 | "google.golang.org/grpc/internal/pretty" |
| 39 | "google.golang.org/grpc/resolver" |
| 40 | "google.golang.org/grpc/serviceconfig" |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 41 | ) |
| 42 | |
| 43 | func init() { |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 44 | balancer.Register(pickfirstBuilder{}) |
| 45 | } |
| 46 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 47 | // Name is the name of the pick_first balancer. |
| 48 | const Name = "pick_first" |
| 49 | |
| 50 | // enableHealthListenerKeyType is a unique key type used in resolver |
| 51 | // attributes to indicate whether the health listener usage is enabled. |
| 52 | type enableHealthListenerKeyType struct{} |
| 53 | |
| 54 | var ( |
| 55 | logger = grpclog.Component("pick-first-leaf-lb") |
| 56 | disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{ |
| 57 | Name: "grpc.lb.pick_first.disconnections", |
| 58 | Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.", |
| 59 | Unit: "{disconnection}", |
| 60 | Labels: []string{"grpc.target"}, |
| 61 | Default: false, |
| 62 | }) |
| 63 | connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{ |
| 64 | Name: "grpc.lb.pick_first.connection_attempts_succeeded", |
| 65 | Description: "EXPERIMENTAL. Number of successful connection attempts.", |
| 66 | Unit: "{attempt}", |
| 67 | Labels: []string{"grpc.target"}, |
| 68 | Default: false, |
| 69 | }) |
| 70 | connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{ |
| 71 | Name: "grpc.lb.pick_first.connection_attempts_failed", |
| 72 | Description: "EXPERIMENTAL. Number of failed connection attempts.", |
| 73 | Unit: "{attempt}", |
| 74 | Labels: []string{"grpc.target"}, |
| 75 | Default: false, |
| 76 | }) |
| 77 | ) |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 78 | |
| 79 | const ( |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 80 | // TODO: change to pick-first when this becomes the default pick_first policy. |
| 81 | logPrefix = "[pick-first-leaf-lb %p] " |
| 82 | // connectionDelayInterval is the time to wait for during the happy eyeballs |
| 83 | // pass before starting the next connection attempt. |
| 84 | connectionDelayInterval = 250 * time.Millisecond |
| 85 | ) |
| 86 | |
| 87 | type ipAddrFamily int |
| 88 | |
| 89 | const ( |
| 90 | // ipAddrFamilyUnknown represents strings that can't be parsed as an IP |
| 91 | // address. |
| 92 | ipAddrFamilyUnknown ipAddrFamily = iota |
| 93 | ipAddrFamilyV4 |
| 94 | ipAddrFamilyV6 |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 95 | ) |
| 96 | |
| 97 | type pickfirstBuilder struct{} |
| 98 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 99 | func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions) balancer.Balancer { |
| 100 | b := &pickfirstBalancer{ |
| 101 | cc: cc, |
| 102 | target: bo.Target.String(), |
| 103 | metricsRecorder: cc.MetricsRecorder(), |
| 104 | |
| 105 | subConns: resolver.NewAddressMapV2[*scData](), |
| 106 | state: connectivity.Connecting, |
| 107 | cancelConnectionTimer: func() {}, |
| 108 | } |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 109 | b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b)) |
| 110 | return b |
| 111 | } |
| 112 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 113 | func (b pickfirstBuilder) Name() string { |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 114 | return Name |
| 115 | } |
| 116 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 117 | func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { |
| 118 | var cfg pfConfig |
| 119 | if err := json.Unmarshal(js, &cfg); err != nil { |
| 120 | return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err) |
| 121 | } |
| 122 | return cfg, nil |
| 123 | } |
| 124 | |
| 125 | // EnableHealthListener updates the state to configure pickfirst for using a |
| 126 | // generic health listener. |
| 127 | // |
| 128 | // # Experimental |
| 129 | // |
| 130 | // Notice: This API is EXPERIMENTAL and may be changed or removed in a later |
| 131 | // release. |
| 132 | func EnableHealthListener(state resolver.State) resolver.State { |
| 133 | state.Attributes = state.Attributes.WithValue(enableHealthListenerKeyType{}, true) |
| 134 | return state |
| 135 | } |
| 136 | |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 137 | type pfConfig struct { |
| 138 | serviceconfig.LoadBalancingConfig `json:"-"` |
| 139 | |
| 140 | // If set to true, instructs the LB policy to shuffle the order of the list |
| 141 | // of endpoints received from the name resolver before attempting to |
| 142 | // connect to them. |
| 143 | ShuffleAddressList bool `json:"shuffleAddressList"` |
| 144 | } |
| 145 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 146 | // scData keeps track of the current state of the subConn. |
| 147 | // It is not safe for concurrent access. |
| 148 | type scData struct { |
| 149 | // The following fields are initialized at build time and read-only after |
| 150 | // that. |
| 151 | subConn balancer.SubConn |
| 152 | addr resolver.Address |
| 153 | |
| 154 | rawConnectivityState connectivity.State |
| 155 | // The effective connectivity state based on raw connectivity, health state |
| 156 | // and after following sticky TransientFailure behaviour defined in A62. |
| 157 | effectiveState connectivity.State |
| 158 | lastErr error |
| 159 | connectionFailedInFirstPass bool |
| 160 | } |
| 161 | |
| 162 | func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) { |
| 163 | sd := &scData{ |
| 164 | rawConnectivityState: connectivity.Idle, |
| 165 | effectiveState: connectivity.Idle, |
| 166 | addr: addr, |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 167 | } |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 168 | sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{ |
| 169 | StateListener: func(state balancer.SubConnState) { |
| 170 | b.updateSubConnState(sd, state) |
| 171 | }, |
| 172 | }) |
| 173 | if err != nil { |
| 174 | return nil, err |
| 175 | } |
| 176 | sd.subConn = sc |
| 177 | return sd, nil |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 178 | } |
| 179 | |
| 180 | type pickfirstBalancer struct { |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 181 | // The following fields are initialized at build time and read-only after |
| 182 | // that and therefore do not need to be guarded by a mutex. |
| 183 | logger *internalgrpclog.PrefixLogger |
| 184 | cc balancer.ClientConn |
| 185 | target string |
| 186 | metricsRecorder expstats.MetricsRecorder // guaranteed to be non nil |
| 187 | |
| 188 | // The mutex is used to ensure synchronization of updates triggered |
| 189 | // from the idle picker and the already serialized resolver, |
| 190 | // SubConn state updates. |
| 191 | mu sync.Mutex |
| 192 | // State reported to the channel based on SubConn states and resolver |
| 193 | // updates. |
| 194 | state connectivity.State |
| 195 | // scData for active subonns mapped by address. |
| 196 | subConns *resolver.AddressMapV2[*scData] |
| 197 | addressList addressList |
| 198 | firstPass bool |
| 199 | numTF int |
| 200 | cancelConnectionTimer func() |
| 201 | healthCheckingEnabled bool |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 202 | } |
| 203 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 204 | // ResolverError is called by the ClientConn when the name resolver produces |
| 205 | // an error or when pickfirst determined the resolver update to be invalid. |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 206 | func (b *pickfirstBalancer) ResolverError(err error) { |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 207 | b.mu.Lock() |
| 208 | defer b.mu.Unlock() |
| 209 | b.resolverErrorLocked(err) |
| 210 | } |
| 211 | |
| 212 | func (b *pickfirstBalancer) resolverErrorLocked(err error) { |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 213 | if b.logger.V(2) { |
| 214 | b.logger.Infof("Received error from the name resolver: %v", err) |
| 215 | } |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 216 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 217 | // The picker will not change since the balancer does not currently |
| 218 | // report an error. If the balancer hasn't received a single good resolver |
| 219 | // update yet, transition to TRANSIENT_FAILURE. |
| 220 | if b.state != connectivity.TransientFailure && b.addressList.size() > 0 { |
| 221 | if b.logger.V(2) { |
| 222 | b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.") |
| 223 | } |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 224 | return |
| 225 | } |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 226 | |
| 227 | b.updateBalancerState(balancer.State{ |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 228 | ConnectivityState: connectivity.TransientFailure, |
| 229 | Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)}, |
| 230 | }) |
| 231 | } |
| 232 | |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 233 | func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 234 | b.mu.Lock() |
| 235 | defer b.mu.Unlock() |
| 236 | b.cancelConnectionTimer() |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 237 | if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 { |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 238 | // Cleanup state pertaining to the previous resolver state. |
| 239 | // Treat an empty address list like an error by calling b.ResolverError. |
| 240 | b.closeSubConnsLocked() |
| 241 | b.addressList.updateAddrs(nil) |
| 242 | b.resolverErrorLocked(errors.New("produced zero addresses")) |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 243 | return balancer.ErrBadResolverState |
| 244 | } |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 245 | b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKeyType{}) != nil |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 246 | cfg, ok := state.BalancerConfig.(pfConfig) |
| 247 | if state.BalancerConfig != nil && !ok { |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 248 | return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState) |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 249 | } |
| 250 | |
| 251 | if b.logger.V(2) { |
| 252 | b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState)) |
| 253 | } |
| 254 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 255 | var newAddrs []resolver.Address |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 256 | if endpoints := state.ResolverState.Endpoints; len(endpoints) != 0 { |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 257 | // Perform the optional shuffling described in gRFC A62. The shuffling |
| 258 | // will change the order of endpoints but not touch the order of the |
| 259 | // addresses within each endpoint. - A61 |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 260 | if cfg.ShuffleAddressList { |
| 261 | endpoints = append([]resolver.Endpoint{}, endpoints...) |
| 262 | internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] }) |
| 263 | } |
| 264 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 265 | // "Flatten the list by concatenating the ordered list of addresses for |
| 266 | // each of the endpoints, in order." - A61 |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 267 | for _, endpoint := range endpoints { |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 268 | newAddrs = append(newAddrs, endpoint.Addresses...) |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 269 | } |
| 270 | } else { |
| 271 | // Endpoints not set, process addresses until we migrate resolver |
| 272 | // emissions fully to Endpoints. The top channel does wrap emitted |
| 273 | // addresses with endpoints, however some balancers such as weighted |
| 274 | // target do not forward the corresponding correct endpoints down/split |
| 275 | // endpoints properly. Once all balancers correctly forward endpoints |
| 276 | // down, can delete this else conditional. |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 277 | newAddrs = state.ResolverState.Addresses |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 278 | if cfg.ShuffleAddressList { |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 279 | newAddrs = append([]resolver.Address{}, newAddrs...) |
| 280 | internal.RandShuffle(len(newAddrs), func(i, j int) { newAddrs[i], newAddrs[j] = newAddrs[j], newAddrs[i] }) |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 281 | } |
| 282 | } |
| 283 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 284 | // If an address appears in multiple endpoints or in the same endpoint |
| 285 | // multiple times, we keep it only once. We will create only one SubConn |
| 286 | // for the address because an AddressMap is used to store SubConns. |
| 287 | // Not de-duplicating would result in attempting to connect to the same |
| 288 | // SubConn multiple times in the same pass. We don't want this. |
| 289 | newAddrs = deDupAddresses(newAddrs) |
| 290 | newAddrs = interleaveAddresses(newAddrs) |
| 291 | |
| 292 | prevAddr := b.addressList.currentAddress() |
| 293 | prevSCData, found := b.subConns.Get(prevAddr) |
| 294 | prevAddrsCount := b.addressList.size() |
| 295 | isPrevRawConnectivityStateReady := found && prevSCData.rawConnectivityState == connectivity.Ready |
| 296 | b.addressList.updateAddrs(newAddrs) |
| 297 | |
| 298 | // If the previous ready SubConn exists in new address list, |
| 299 | // keep this connection and don't create new SubConns. |
| 300 | if isPrevRawConnectivityStateReady && b.addressList.seekTo(prevAddr) { |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 301 | return nil |
| 302 | } |
| 303 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 304 | b.reconcileSubConnsLocked(newAddrs) |
| 305 | // If it's the first resolver update or the balancer was already READY |
| 306 | // (but the new address list does not contain the ready SubConn) or |
| 307 | // CONNECTING, enter CONNECTING. |
| 308 | // We may be in TRANSIENT_FAILURE due to a previous empty address list, |
| 309 | // we should still enter CONNECTING because the sticky TF behaviour |
| 310 | // mentioned in A62 applies only when the TRANSIENT_FAILURE is reported |
| 311 | // due to connectivity failures. |
| 312 | if isPrevRawConnectivityStateReady || b.state == connectivity.Connecting || prevAddrsCount == 0 { |
| 313 | // Start connection attempt at first address. |
| 314 | b.forceUpdateConcludedStateLocked(balancer.State{ |
| 315 | ConnectivityState: connectivity.Connecting, |
| 316 | Picker: &picker{err: balancer.ErrNoSubConnAvailable}, |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 317 | }) |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 318 | b.startFirstPassLocked() |
| 319 | } else if b.state == connectivity.TransientFailure { |
| 320 | // If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until |
| 321 | // we're READY. See A62. |
| 322 | b.startFirstPassLocked() |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 323 | } |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 324 | return nil |
| 325 | } |
| 326 | |
| 327 | // UpdateSubConnState is unused as a StateListener is always registered when |
| 328 | // creating SubConns. |
| 329 | func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { |
| 330 | b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state) |
| 331 | } |
| 332 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 333 | func (b *pickfirstBalancer) Close() { |
| 334 | b.mu.Lock() |
| 335 | defer b.mu.Unlock() |
| 336 | b.closeSubConnsLocked() |
| 337 | b.cancelConnectionTimer() |
| 338 | b.state = connectivity.Shutdown |
| 339 | } |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 340 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 341 | // ExitIdle moves the balancer out of idle state. It can be called concurrently |
| 342 | // by the idlePicker and clientConn so access to variables should be |
| 343 | // synchronized. |
| 344 | func (b *pickfirstBalancer) ExitIdle() { |
| 345 | b.mu.Lock() |
| 346 | defer b.mu.Unlock() |
| 347 | if b.state == connectivity.Idle { |
| 348 | // Move the balancer into CONNECTING state immediately. This is done to |
| 349 | // avoid staying in IDLE if a resolver update arrives before the first |
| 350 | // SubConn reports CONNECTING. |
| 351 | b.updateBalancerState(balancer.State{ |
| 352 | ConnectivityState: connectivity.Connecting, |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 353 | Picker: &picker{err: balancer.ErrNoSubConnAvailable}, |
| 354 | }) |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 355 | b.startFirstPassLocked() |
| 356 | } |
| 357 | } |
| 358 | |
| 359 | func (b *pickfirstBalancer) startFirstPassLocked() { |
| 360 | b.firstPass = true |
| 361 | b.numTF = 0 |
| 362 | // Reset the connection attempt record for existing SubConns. |
| 363 | for _, sd := range b.subConns.Values() { |
| 364 | sd.connectionFailedInFirstPass = false |
| 365 | } |
| 366 | b.requestConnectionLocked() |
| 367 | } |
| 368 | |
| 369 | func (b *pickfirstBalancer) closeSubConnsLocked() { |
| 370 | for _, sd := range b.subConns.Values() { |
| 371 | sd.subConn.Shutdown() |
| 372 | } |
| 373 | b.subConns = resolver.NewAddressMapV2[*scData]() |
| 374 | } |
| 375 | |
| 376 | // deDupAddresses ensures that each address appears only once in the slice. |
| 377 | func deDupAddresses(addrs []resolver.Address) []resolver.Address { |
| 378 | seenAddrs := resolver.NewAddressMapV2[bool]() |
| 379 | retAddrs := []resolver.Address{} |
| 380 | |
| 381 | for _, addr := range addrs { |
| 382 | if _, ok := seenAddrs.Get(addr); ok { |
| 383 | continue |
| 384 | } |
| 385 | seenAddrs.Set(addr, true) |
| 386 | retAddrs = append(retAddrs, addr) |
| 387 | } |
| 388 | return retAddrs |
| 389 | } |
| 390 | |
| 391 | // interleaveAddresses interleaves addresses of both families (IPv4 and IPv6) |
| 392 | // as per RFC-8305 section 4. |
| 393 | // Whichever address family is first in the list is followed by an address of |
| 394 | // the other address family; that is, if the first address in the list is IPv6, |
| 395 | // then the first IPv4 address should be moved up in the list to be second in |
| 396 | // the list. It doesn't support configuring "First Address Family Count", i.e. |
| 397 | // there will always be a single member of the first address family at the |
| 398 | // beginning of the interleaved list. |
| 399 | // Addresses that are neither IPv4 nor IPv6 are treated as part of a third |
| 400 | // "unknown" family for interleaving. |
| 401 | // See: https://datatracker.ietf.org/doc/html/rfc8305#autoid-6 |
| 402 | func interleaveAddresses(addrs []resolver.Address) []resolver.Address { |
| 403 | familyAddrsMap := map[ipAddrFamily][]resolver.Address{} |
| 404 | interleavingOrder := []ipAddrFamily{} |
| 405 | for _, addr := range addrs { |
| 406 | family := addressFamily(addr.Addr) |
| 407 | if _, found := familyAddrsMap[family]; !found { |
| 408 | interleavingOrder = append(interleavingOrder, family) |
| 409 | } |
| 410 | familyAddrsMap[family] = append(familyAddrsMap[family], addr) |
| 411 | } |
| 412 | |
| 413 | interleavedAddrs := make([]resolver.Address, 0, len(addrs)) |
| 414 | |
| 415 | for curFamilyIdx := 0; len(interleavedAddrs) < len(addrs); curFamilyIdx = (curFamilyIdx + 1) % len(interleavingOrder) { |
| 416 | // Some IP types may have fewer addresses than others, so we look for |
| 417 | // the next type that has a remaining member to add to the interleaved |
| 418 | // list. |
| 419 | family := interleavingOrder[curFamilyIdx] |
| 420 | remainingMembers := familyAddrsMap[family] |
| 421 | if len(remainingMembers) > 0 { |
| 422 | interleavedAddrs = append(interleavedAddrs, remainingMembers[0]) |
| 423 | familyAddrsMap[family] = remainingMembers[1:] |
| 424 | } |
| 425 | } |
| 426 | |
| 427 | return interleavedAddrs |
| 428 | } |
| 429 | |
| 430 | // addressFamily returns the ipAddrFamily after parsing the address string. |
| 431 | // If the address isn't of the format "ip-address:port", it returns |
| 432 | // ipAddrFamilyUnknown. The address may be valid even if it's not an IP when |
| 433 | // using a resolver like passthrough where the address may be a hostname in |
| 434 | // some format that the dialer can resolve. |
| 435 | func addressFamily(address string) ipAddrFamily { |
| 436 | // Parse the IP after removing the port. |
| 437 | host, _, err := net.SplitHostPort(address) |
| 438 | if err != nil { |
| 439 | return ipAddrFamilyUnknown |
| 440 | } |
| 441 | ip, err := netip.ParseAddr(host) |
| 442 | if err != nil { |
| 443 | return ipAddrFamilyUnknown |
| 444 | } |
| 445 | switch { |
| 446 | case ip.Is4() || ip.Is4In6(): |
| 447 | return ipAddrFamilyV4 |
| 448 | case ip.Is6(): |
| 449 | return ipAddrFamilyV6 |
| 450 | default: |
| 451 | return ipAddrFamilyUnknown |
| 452 | } |
| 453 | } |
| 454 | |
| 455 | // reconcileSubConnsLocked updates the active subchannels based on a new address |
| 456 | // list from the resolver. It does this by: |
| 457 | // - closing subchannels: any existing subchannels associated with addresses |
| 458 | // that are no longer in the updated list are shut down. |
| 459 | // - removing subchannels: entries for these closed subchannels are removed |
| 460 | // from the subchannel map. |
| 461 | // |
| 462 | // This ensures that the subchannel map accurately reflects the current set of |
| 463 | // addresses received from the name resolver. |
| 464 | func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address) { |
| 465 | newAddrsMap := resolver.NewAddressMapV2[bool]() |
| 466 | for _, addr := range newAddrs { |
| 467 | newAddrsMap.Set(addr, true) |
| 468 | } |
| 469 | |
| 470 | for _, oldAddr := range b.subConns.Keys() { |
| 471 | if _, ok := newAddrsMap.Get(oldAddr); ok { |
| 472 | continue |
| 473 | } |
| 474 | val, _ := b.subConns.Get(oldAddr) |
| 475 | val.subConn.Shutdown() |
| 476 | b.subConns.Delete(oldAddr) |
| 477 | } |
| 478 | } |
| 479 | |
| 480 | // shutdownRemainingLocked shuts down remaining subConns. Called when a subConn |
| 481 | // becomes ready, which means that all other subConn must be shutdown. |
| 482 | func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) { |
| 483 | b.cancelConnectionTimer() |
| 484 | for _, sd := range b.subConns.Values() { |
| 485 | if sd.subConn != selected.subConn { |
| 486 | sd.subConn.Shutdown() |
| 487 | } |
| 488 | } |
| 489 | b.subConns = resolver.NewAddressMapV2[*scData]() |
| 490 | b.subConns.Set(selected.addr, selected) |
| 491 | } |
| 492 | |
| 493 | // requestConnectionLocked starts connecting on the subchannel corresponding to |
| 494 | // the current address. If no subchannel exists, one is created. If the current |
| 495 | // subchannel is in TransientFailure, a connection to the next address is |
| 496 | // attempted until a subchannel is found. |
| 497 | func (b *pickfirstBalancer) requestConnectionLocked() { |
| 498 | if !b.addressList.isValid() { |
| 499 | return |
| 500 | } |
| 501 | var lastErr error |
| 502 | for valid := true; valid; valid = b.addressList.increment() { |
| 503 | curAddr := b.addressList.currentAddress() |
| 504 | sd, ok := b.subConns.Get(curAddr) |
| 505 | if !ok { |
| 506 | var err error |
| 507 | // We want to assign the new scData to sd from the outer scope, |
| 508 | // hence we can't use := below. |
| 509 | sd, err = b.newSCData(curAddr) |
| 510 | if err != nil { |
| 511 | // This should never happen, unless the clientConn is being shut |
| 512 | // down. |
| 513 | if b.logger.V(2) { |
| 514 | b.logger.Infof("Failed to create a subConn for address %v: %v", curAddr.String(), err) |
| 515 | } |
| 516 | // Do nothing, the LB policy will be closed soon. |
| 517 | return |
| 518 | } |
| 519 | b.subConns.Set(curAddr, sd) |
| 520 | } |
| 521 | |
| 522 | switch sd.rawConnectivityState { |
| 523 | case connectivity.Idle: |
| 524 | sd.subConn.Connect() |
| 525 | b.scheduleNextConnectionLocked() |
| 526 | return |
| 527 | case connectivity.TransientFailure: |
| 528 | // The SubConn is being re-used and failed during a previous pass |
| 529 | // over the addressList. It has not completed backoff yet. |
| 530 | // Mark it as having failed and try the next address. |
| 531 | sd.connectionFailedInFirstPass = true |
| 532 | lastErr = sd.lastErr |
| 533 | continue |
| 534 | case connectivity.Connecting: |
| 535 | // Wait for the connection attempt to complete or the timer to fire |
| 536 | // before attempting the next address. |
| 537 | b.scheduleNextConnectionLocked() |
| 538 | return |
| 539 | default: |
| 540 | b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", sd.rawConnectivityState) |
| 541 | return |
| 542 | |
| 543 | } |
| 544 | } |
| 545 | |
| 546 | // All the remaining addresses in the list are in TRANSIENT_FAILURE, end the |
| 547 | // first pass if possible. |
| 548 | b.endFirstPassIfPossibleLocked(lastErr) |
| 549 | } |
| 550 | |
| 551 | func (b *pickfirstBalancer) scheduleNextConnectionLocked() { |
| 552 | b.cancelConnectionTimer() |
| 553 | if !b.addressList.hasNext() { |
| 554 | return |
| 555 | } |
| 556 | curAddr := b.addressList.currentAddress() |
| 557 | cancelled := false // Access to this is protected by the balancer's mutex. |
| 558 | closeFn := internal.TimeAfterFunc(connectionDelayInterval, func() { |
| 559 | b.mu.Lock() |
| 560 | defer b.mu.Unlock() |
| 561 | // If the scheduled task is cancelled while acquiring the mutex, return. |
| 562 | if cancelled { |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 563 | return |
| 564 | } |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 565 | if b.logger.V(2) { |
| 566 | b.logger.Infof("Happy Eyeballs timer expired while waiting for connection to %q.", curAddr.Addr) |
| 567 | } |
| 568 | if b.addressList.increment() { |
| 569 | b.requestConnectionLocked() |
| 570 | } |
| 571 | }) |
| 572 | // Access to the cancellation callback held by the balancer is guarded by |
| 573 | // the balancer's mutex, so it's safe to set the boolean from the callback. |
| 574 | b.cancelConnectionTimer = sync.OnceFunc(func() { |
| 575 | cancelled = true |
| 576 | closeFn() |
| 577 | }) |
| 578 | } |
| 579 | |
| 580 | func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) { |
| 581 | b.mu.Lock() |
| 582 | defer b.mu.Unlock() |
| 583 | oldState := sd.rawConnectivityState |
| 584 | sd.rawConnectivityState = newState.ConnectivityState |
| 585 | // Previously relevant SubConns can still callback with state updates. |
| 586 | // To prevent pickers from returning these obsolete SubConns, this logic |
| 587 | // is included to check if the current list of active SubConns includes this |
| 588 | // SubConn. |
| 589 | if !b.isActiveSCData(sd) { |
| 590 | return |
| 591 | } |
| 592 | if newState.ConnectivityState == connectivity.Shutdown { |
| 593 | sd.effectiveState = connectivity.Shutdown |
| 594 | return |
| 595 | } |
| 596 | |
| 597 | // Record a connection attempt when exiting CONNECTING. |
| 598 | if newState.ConnectivityState == connectivity.TransientFailure { |
| 599 | sd.connectionFailedInFirstPass = true |
| 600 | connectionAttemptsFailedMetric.Record(b.metricsRecorder, 1, b.target) |
| 601 | } |
| 602 | |
| 603 | if newState.ConnectivityState == connectivity.Ready { |
| 604 | connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target) |
| 605 | b.shutdownRemainingLocked(sd) |
| 606 | if !b.addressList.seekTo(sd.addr) { |
| 607 | // This should not fail as we should have only one SubConn after |
| 608 | // entering READY. The SubConn should be present in the addressList. |
| 609 | b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses) |
| 610 | return |
| 611 | } |
| 612 | if !b.healthCheckingEnabled { |
| 613 | if b.logger.V(2) { |
| 614 | b.logger.Infof("SubConn %p reported connectivity state READY and the health listener is disabled. Transitioning SubConn to READY.", sd.subConn) |
| 615 | } |
| 616 | |
| 617 | sd.effectiveState = connectivity.Ready |
| 618 | b.updateBalancerState(balancer.State{ |
| 619 | ConnectivityState: connectivity.Ready, |
| 620 | Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}}, |
| 621 | }) |
| 622 | return |
| 623 | } |
| 624 | if b.logger.V(2) { |
| 625 | b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn) |
| 626 | } |
| 627 | // Send a CONNECTING update to take the SubConn out of sticky-TF if |
| 628 | // required. |
| 629 | sd.effectiveState = connectivity.Connecting |
| 630 | b.updateBalancerState(balancer.State{ |
| 631 | ConnectivityState: connectivity.Connecting, |
| 632 | Picker: &picker{err: balancer.ErrNoSubConnAvailable}, |
| 633 | }) |
| 634 | sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) { |
| 635 | b.updateSubConnHealthState(sd, scs) |
| 636 | }) |
| 637 | return |
| 638 | } |
| 639 | |
| 640 | // If the LB policy is READY, and it receives a subchannel state change, |
| 641 | // it means that the READY subchannel has failed. |
| 642 | // A SubConn can also transition from CONNECTING directly to IDLE when |
| 643 | // a transport is successfully created, but the connection fails |
| 644 | // before the SubConn can send the notification for READY. We treat |
| 645 | // this as a successful connection and transition to IDLE. |
| 646 | // TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second |
| 647 | // part of the if condition below once the issue is fixed. |
| 648 | if oldState == connectivity.Ready || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) { |
| 649 | // Once a transport fails, the balancer enters IDLE and starts from |
| 650 | // the first address when the picker is used. |
| 651 | b.shutdownRemainingLocked(sd) |
| 652 | sd.effectiveState = newState.ConnectivityState |
| 653 | // READY SubConn interspliced in between CONNECTING and IDLE, need to |
| 654 | // account for that. |
| 655 | if oldState == connectivity.Connecting { |
| 656 | // A known issue (https://github.com/grpc/grpc-go/issues/7862) |
| 657 | // causes a race that prevents the READY state change notification. |
| 658 | // This works around it. |
| 659 | connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target) |
| 660 | } |
| 661 | disconnectionsMetric.Record(b.metricsRecorder, 1, b.target) |
| 662 | b.addressList.reset() |
| 663 | b.updateBalancerState(balancer.State{ |
| 664 | ConnectivityState: connectivity.Idle, |
| 665 | Picker: &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)}, |
| 666 | }) |
| 667 | return |
| 668 | } |
| 669 | |
| 670 | if b.firstPass { |
| 671 | switch newState.ConnectivityState { |
| 672 | case connectivity.Connecting: |
| 673 | // The effective state can be in either IDLE, CONNECTING or |
| 674 | // TRANSIENT_FAILURE. If it's TRANSIENT_FAILURE, stay in |
| 675 | // TRANSIENT_FAILURE until it's READY. See A62. |
| 676 | if sd.effectiveState != connectivity.TransientFailure { |
| 677 | sd.effectiveState = connectivity.Connecting |
| 678 | b.updateBalancerState(balancer.State{ |
| 679 | ConnectivityState: connectivity.Connecting, |
| 680 | Picker: &picker{err: balancer.ErrNoSubConnAvailable}, |
| 681 | }) |
| 682 | } |
| 683 | case connectivity.TransientFailure: |
| 684 | sd.lastErr = newState.ConnectionError |
| 685 | sd.effectiveState = connectivity.TransientFailure |
| 686 | // Since we're re-using common SubConns while handling resolver |
| 687 | // updates, we could receive an out of turn TRANSIENT_FAILURE from |
| 688 | // a pass over the previous address list. Happy Eyeballs will also |
| 689 | // cause out of order updates to arrive. |
| 690 | |
| 691 | if curAddr := b.addressList.currentAddress(); equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) { |
| 692 | b.cancelConnectionTimer() |
| 693 | if b.addressList.increment() { |
| 694 | b.requestConnectionLocked() |
| 695 | return |
| 696 | } |
| 697 | } |
| 698 | |
| 699 | // End the first pass if we've seen a TRANSIENT_FAILURE from all |
| 700 | // SubConns once. |
| 701 | b.endFirstPassIfPossibleLocked(newState.ConnectionError) |
| 702 | } |
| 703 | return |
| 704 | } |
| 705 | |
| 706 | // We have finished the first pass, keep re-connecting failing SubConns. |
| 707 | switch newState.ConnectivityState { |
| 708 | case connectivity.TransientFailure: |
| 709 | b.numTF = (b.numTF + 1) % b.subConns.Len() |
| 710 | sd.lastErr = newState.ConnectionError |
| 711 | if b.numTF%b.subConns.Len() == 0 { |
| 712 | b.updateBalancerState(balancer.State{ |
| 713 | ConnectivityState: connectivity.TransientFailure, |
| 714 | Picker: &picker{err: newState.ConnectionError}, |
| 715 | }) |
| 716 | } |
| 717 | // We don't need to request re-resolution since the SubConn already |
| 718 | // does that before reporting TRANSIENT_FAILURE. |
| 719 | // TODO: #7534 - Move re-resolution requests from SubConn into |
| 720 | // pick_first. |
| 721 | case connectivity.Idle: |
| 722 | sd.subConn.Connect() |
| 723 | } |
| 724 | } |
| 725 | |
| 726 | // endFirstPassIfPossibleLocked ends the first happy-eyeballs pass if all the |
| 727 | // addresses are tried and their SubConns have reported a failure. |
| 728 | func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) { |
| 729 | // An optimization to avoid iterating over the entire SubConn map. |
| 730 | if b.addressList.isValid() { |
| 731 | return |
| 732 | } |
| 733 | // Connect() has been called on all the SubConns. The first pass can be |
| 734 | // ended if all the SubConns have reported a failure. |
| 735 | for _, sd := range b.subConns.Values() { |
| 736 | if !sd.connectionFailedInFirstPass { |
| 737 | return |
| 738 | } |
| 739 | } |
| 740 | b.firstPass = false |
| 741 | b.updateBalancerState(balancer.State{ |
| 742 | ConnectivityState: connectivity.TransientFailure, |
| 743 | Picker: &picker{err: lastErr}, |
| 744 | }) |
| 745 | // Start re-connecting all the SubConns that are already in IDLE. |
| 746 | for _, sd := range b.subConns.Values() { |
| 747 | if sd.rawConnectivityState == connectivity.Idle { |
| 748 | sd.subConn.Connect() |
| 749 | } |
| 750 | } |
| 751 | } |
| 752 | |
| 753 | func (b *pickfirstBalancer) isActiveSCData(sd *scData) bool { |
| 754 | activeSD, found := b.subConns.Get(sd.addr) |
| 755 | return found && activeSD == sd |
| 756 | } |
| 757 | |
| 758 | func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) { |
| 759 | b.mu.Lock() |
| 760 | defer b.mu.Unlock() |
| 761 | // Previously relevant SubConns can still callback with state updates. |
| 762 | // To prevent pickers from returning these obsolete SubConns, this logic |
| 763 | // is included to check if the current list of active SubConns includes |
| 764 | // this SubConn. |
| 765 | if !b.isActiveSCData(sd) { |
| 766 | return |
| 767 | } |
| 768 | sd.effectiveState = state.ConnectivityState |
| 769 | switch state.ConnectivityState { |
| 770 | case connectivity.Ready: |
| 771 | b.updateBalancerState(balancer.State{ |
| 772 | ConnectivityState: connectivity.Ready, |
| 773 | Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}}, |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 774 | }) |
| 775 | case connectivity.TransientFailure: |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 776 | b.updateBalancerState(balancer.State{ |
| 777 | ConnectivityState: connectivity.TransientFailure, |
| 778 | Picker: &picker{err: fmt.Errorf("pickfirst: health check failure: %v", state.ConnectionError)}, |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 779 | }) |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 780 | case connectivity.Connecting: |
| 781 | b.updateBalancerState(balancer.State{ |
| 782 | ConnectivityState: connectivity.Connecting, |
| 783 | Picker: &picker{err: balancer.ErrNoSubConnAvailable}, |
| 784 | }) |
| 785 | default: |
| 786 | b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state) |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 787 | } |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 788 | } |
| 789 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 790 | // updateBalancerState stores the state reported to the channel and calls |
| 791 | // ClientConn.UpdateState(). As an optimization, it avoids sending duplicate |
| 792 | // updates to the channel. |
| 793 | func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) { |
| 794 | // In case of TransientFailures allow the picker to be updated to update |
| 795 | // the connectivity error, in all other cases don't send duplicate state |
| 796 | // updates. |
| 797 | if newState.ConnectivityState == b.state && b.state != connectivity.TransientFailure { |
| 798 | return |
| 799 | } |
| 800 | b.forceUpdateConcludedStateLocked(newState) |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 801 | } |
| 802 | |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 803 | // forceUpdateConcludedStateLocked stores the state reported to the channel and |
| 804 | // calls ClientConn.UpdateState(). |
| 805 | // A separate function is defined to force update the ClientConn state since the |
| 806 | // channel doesn't correctly assume that LB policies start in CONNECTING and |
| 807 | // relies on LB policy to send an initial CONNECTING update. |
| 808 | func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) { |
| 809 | b.state = newState.ConnectivityState |
| 810 | b.cc.UpdateState(newState) |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 811 | } |
| 812 | |
| 813 | type picker struct { |
| 814 | result balancer.PickResult |
| 815 | err error |
| 816 | } |
| 817 | |
| 818 | func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) { |
| 819 | return p.result, p.err |
| 820 | } |
| 821 | |
| 822 | // idlePicker is used when the SubConn is IDLE and kicks the SubConn into |
| 823 | // CONNECTING when Pick is called. |
| 824 | type idlePicker struct { |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 825 | exitIdle func() |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 826 | } |
| 827 | |
| 828 | func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 829 | i.exitIdle() |
| Abhay Kumar | fe505f2 | 2025-11-10 14:16:31 +0000 | [diff] [blame] | 830 | return balancer.PickResult{}, balancer.ErrNoSubConnAvailable |
| 831 | } |
| bseeniva | dd66c36 | 2026-02-12 19:13:26 +0530 | [diff] [blame] | 832 | |
| 833 | // addressList manages sequentially iterating over addresses present in a list |
| 834 | // of endpoints. It provides a 1 dimensional view of the addresses present in |
| 835 | // the endpoints. |
| 836 | // This type is not safe for concurrent access. |
| 837 | type addressList struct { |
| 838 | addresses []resolver.Address |
| 839 | idx int |
| 840 | } |
| 841 | |
| 842 | func (al *addressList) isValid() bool { |
| 843 | return al.idx < len(al.addresses) |
| 844 | } |
| 845 | |
| 846 | func (al *addressList) size() int { |
| 847 | return len(al.addresses) |
| 848 | } |
| 849 | |
| 850 | // increment moves to the next index in the address list. |
| 851 | // This method returns false if it went off the list, true otherwise. |
| 852 | func (al *addressList) increment() bool { |
| 853 | if !al.isValid() { |
| 854 | return false |
| 855 | } |
| 856 | al.idx++ |
| 857 | return al.idx < len(al.addresses) |
| 858 | } |
| 859 | |
| 860 | // currentAddress returns the current address pointed to in the addressList. |
| 861 | // If the list is in an invalid state, it returns an empty address instead. |
| 862 | func (al *addressList) currentAddress() resolver.Address { |
| 863 | if !al.isValid() { |
| 864 | return resolver.Address{} |
| 865 | } |
| 866 | return al.addresses[al.idx] |
| 867 | } |
| 868 | |
| 869 | func (al *addressList) reset() { |
| 870 | al.idx = 0 |
| 871 | } |
| 872 | |
| 873 | func (al *addressList) updateAddrs(addrs []resolver.Address) { |
| 874 | al.addresses = addrs |
| 875 | al.reset() |
| 876 | } |
| 877 | |
| 878 | // seekTo returns false if the needle was not found and the current index was |
| 879 | // left unchanged. |
| 880 | func (al *addressList) seekTo(needle resolver.Address) bool { |
| 881 | for ai, addr := range al.addresses { |
| 882 | if !equalAddressIgnoringBalAttributes(&addr, &needle) { |
| 883 | continue |
| 884 | } |
| 885 | al.idx = ai |
| 886 | return true |
| 887 | } |
| 888 | return false |
| 889 | } |
| 890 | |
| 891 | // hasNext returns whether incrementing the addressList will result in moving |
| 892 | // past the end of the list. If the list has already moved past the end, it |
| 893 | // returns false. |
| 894 | func (al *addressList) hasNext() bool { |
| 895 | if !al.isValid() { |
| 896 | return false |
| 897 | } |
| 898 | return al.idx+1 < len(al.addresses) |
| 899 | } |
| 900 | |
| 901 | // equalAddressIgnoringBalAttributes returns true is a and b are considered |
| 902 | // equal. This is different from the Equal method on the resolver.Address type |
| 903 | // which considers all fields to determine equality. Here, we only consider |
| 904 | // fields that are meaningful to the SubConn. |
| 905 | func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool { |
| 906 | return a.Addr == b.Addr && a.ServerName == b.ServerName && |
| 907 | a.Attributes.Equal(b.Attributes) |
| 908 | } |