| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame^] | 1 | /* |
| 2 | * |
| 3 | * Copyright 2024 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | // Package endpointsharding implements a load balancing policy that manages |
| 20 | // homogeneous child policies each owning a single endpoint. |
| 21 | // |
| 22 | // # Experimental |
| 23 | // |
| 24 | // Notice: This package is EXPERIMENTAL and may be changed or removed in a |
| 25 | // later release. |
| 26 | package endpointsharding |
| 27 | |
| 28 | import ( |
| 29 | "errors" |
| 30 | rand "math/rand/v2" |
| 31 | "sync" |
| 32 | "sync/atomic" |
| 33 | |
| 34 | "google.golang.org/grpc/balancer" |
| 35 | "google.golang.org/grpc/balancer/base" |
| 36 | "google.golang.org/grpc/connectivity" |
| 37 | "google.golang.org/grpc/resolver" |
| 38 | ) |
| 39 | |
| 40 | var randIntN = rand.IntN |
| 41 | |
| 42 | // ChildState is the balancer state of a child along with the endpoint which |
| 43 | // identifies the child balancer. |
| 44 | type ChildState struct { |
| 45 | Endpoint resolver.Endpoint |
| 46 | State balancer.State |
| 47 | |
| 48 | // Balancer exposes only the ExitIdler interface of the child LB policy. |
| 49 | // Other methods of the child policy are called only by endpointsharding. |
| 50 | Balancer ExitIdler |
| 51 | } |
| 52 | |
| 53 | // ExitIdler provides access to only the ExitIdle method of the child balancer. |
| 54 | type ExitIdler interface { |
| 55 | // ExitIdle instructs the LB policy to reconnect to backends / exit the |
| 56 | // IDLE state, if appropriate and possible. Note that SubConns that enter |
| 57 | // the IDLE state will not reconnect until SubConn.Connect is called. |
| 58 | ExitIdle() |
| 59 | } |
| 60 | |
| 61 | // Options are the options to configure the behaviour of the |
| 62 | // endpointsharding balancer. |
| 63 | type Options struct { |
| 64 | // DisableAutoReconnect allows the balancer to keep child balancer in the |
| 65 | // IDLE state until they are explicitly triggered to exit using the |
| 66 | // ChildState obtained from the endpointsharding picker. When set to false, |
| 67 | // the endpointsharding balancer will automatically call ExitIdle on child |
| 68 | // connections that report IDLE. |
| 69 | DisableAutoReconnect bool |
| 70 | } |
| 71 | |
| 72 | // ChildBuilderFunc creates a new balancer with the ClientConn. It has the same |
| 73 | // type as the balancer.Builder.Build method. |
| 74 | type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer |
| 75 | |
| 76 | // NewBalancer returns a load balancing policy that manages homogeneous child |
| 77 | // policies each owning a single endpoint. The endpointsharding balancer |
| 78 | // forwards the LoadBalancingConfig in ClientConn state updates to its children. |
| 79 | func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions, childBuilder ChildBuilderFunc, esOpts Options) balancer.Balancer { |
| 80 | es := &endpointSharding{ |
| 81 | cc: cc, |
| 82 | bOpts: opts, |
| 83 | esOpts: esOpts, |
| 84 | childBuilder: childBuilder, |
| 85 | } |
| 86 | es.children.Store(resolver.NewEndpointMap[*balancerWrapper]()) |
| 87 | return es |
| 88 | } |
| 89 | |
| 90 | // endpointSharding is a balancer that wraps child balancers. It creates a child |
| 91 | // balancer with child config for every unique Endpoint received. It updates the |
| 92 | // child states on any update from parent or child. |
| 93 | type endpointSharding struct { |
| 94 | cc balancer.ClientConn |
| 95 | bOpts balancer.BuildOptions |
| 96 | esOpts Options |
| 97 | childBuilder ChildBuilderFunc |
| 98 | |
| 99 | // childMu synchronizes calls to any single child. It must be held for all |
| 100 | // calls into a child. To avoid deadlocks, do not acquire childMu while |
| 101 | // holding mu. |
| 102 | childMu sync.Mutex |
| 103 | children atomic.Pointer[resolver.EndpointMap[*balancerWrapper]] |
| 104 | |
| 105 | // inhibitChildUpdates is set during UpdateClientConnState/ResolverError |
| 106 | // calls (calls to children will each produce an update, only want one |
| 107 | // update). |
| 108 | inhibitChildUpdates atomic.Bool |
| 109 | |
| 110 | // mu synchronizes access to the state stored in balancerWrappers in the |
| 111 | // children field. mu must not be held during calls into a child since |
| 112 | // synchronous calls back from the child may require taking mu, causing a |
| 113 | // deadlock. To avoid deadlocks, do not acquire childMu while holding mu. |
| 114 | mu sync.Mutex |
| 115 | } |
| 116 | |
| 117 | // rotateEndpoints returns a slice of all the input endpoints rotated a random |
| 118 | // amount. |
| 119 | func rotateEndpoints(es []resolver.Endpoint) []resolver.Endpoint { |
| 120 | les := len(es) |
| 121 | if les == 0 { |
| 122 | return es |
| 123 | } |
| 124 | r := randIntN(les) |
| 125 | // Make a copy to avoid mutating data beyond the end of es. |
| 126 | ret := make([]resolver.Endpoint, les) |
| 127 | copy(ret, es[r:]) |
| 128 | copy(ret[les-r:], es[:r]) |
| 129 | return ret |
| 130 | } |
| 131 | |
| 132 | // UpdateClientConnState creates a child for new endpoints and deletes children |
| 133 | // for endpoints that are no longer present. It also updates all the children, |
| 134 | // and sends a single synchronous update of the childrens' aggregated state at |
| 135 | // the end of the UpdateClientConnState operation. If any endpoint has no |
| 136 | // addresses it will ignore that endpoint. Otherwise, returns first error found |
| 137 | // from a child, but fully processes the new update. |
| 138 | func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error { |
| 139 | es.childMu.Lock() |
| 140 | defer es.childMu.Unlock() |
| 141 | |
| 142 | es.inhibitChildUpdates.Store(true) |
| 143 | defer func() { |
| 144 | es.inhibitChildUpdates.Store(false) |
| 145 | es.updateState() |
| 146 | }() |
| 147 | var ret error |
| 148 | |
| 149 | children := es.children.Load() |
| 150 | newChildren := resolver.NewEndpointMap[*balancerWrapper]() |
| 151 | |
| 152 | // Update/Create new children. |
| 153 | for _, endpoint := range rotateEndpoints(state.ResolverState.Endpoints) { |
| 154 | if _, ok := newChildren.Get(endpoint); ok { |
| 155 | // Endpoint child was already created, continue to avoid duplicate |
| 156 | // update. |
| 157 | continue |
| 158 | } |
| 159 | childBalancer, ok := children.Get(endpoint) |
| 160 | if ok { |
| 161 | // Endpoint attributes may have changed, update the stored endpoint. |
| 162 | es.mu.Lock() |
| 163 | childBalancer.childState.Endpoint = endpoint |
| 164 | es.mu.Unlock() |
| 165 | } else { |
| 166 | childBalancer = &balancerWrapper{ |
| 167 | childState: ChildState{Endpoint: endpoint}, |
| 168 | ClientConn: es.cc, |
| 169 | es: es, |
| 170 | } |
| 171 | childBalancer.childState.Balancer = childBalancer |
| 172 | childBalancer.child = es.childBuilder(childBalancer, es.bOpts) |
| 173 | } |
| 174 | newChildren.Set(endpoint, childBalancer) |
| 175 | if err := childBalancer.updateClientConnStateLocked(balancer.ClientConnState{ |
| 176 | BalancerConfig: state.BalancerConfig, |
| 177 | ResolverState: resolver.State{ |
| 178 | Endpoints: []resolver.Endpoint{endpoint}, |
| 179 | Attributes: state.ResolverState.Attributes, |
| 180 | }, |
| 181 | }); err != nil && ret == nil { |
| 182 | // Return first error found, and always commit full processing of |
| 183 | // updating children. If desired to process more specific errors |
| 184 | // across all endpoints, caller should make these specific |
| 185 | // validations, this is a current limitation for simplicity sake. |
| 186 | ret = err |
| 187 | } |
| 188 | } |
| 189 | // Delete old children that are no longer present. |
| 190 | for _, e := range children.Keys() { |
| 191 | child, _ := children.Get(e) |
| 192 | if _, ok := newChildren.Get(e); !ok { |
| 193 | child.closeLocked() |
| 194 | } |
| 195 | } |
| 196 | es.children.Store(newChildren) |
| 197 | if newChildren.Len() == 0 { |
| 198 | return balancer.ErrBadResolverState |
| 199 | } |
| 200 | return ret |
| 201 | } |
| 202 | |
| 203 | // ResolverError forwards the resolver error to all of the endpointSharding's |
| 204 | // children and sends a single synchronous update of the childStates at the end |
| 205 | // of the ResolverError operation. |
| 206 | func (es *endpointSharding) ResolverError(err error) { |
| 207 | es.childMu.Lock() |
| 208 | defer es.childMu.Unlock() |
| 209 | es.inhibitChildUpdates.Store(true) |
| 210 | defer func() { |
| 211 | es.inhibitChildUpdates.Store(false) |
| 212 | es.updateState() |
| 213 | }() |
| 214 | children := es.children.Load() |
| 215 | for _, child := range children.Values() { |
| 216 | child.resolverErrorLocked(err) |
| 217 | } |
| 218 | } |
| 219 | |
| 220 | func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) { |
| 221 | // UpdateSubConnState is deprecated. |
| 222 | } |
| 223 | |
| 224 | func (es *endpointSharding) Close() { |
| 225 | es.childMu.Lock() |
| 226 | defer es.childMu.Unlock() |
| 227 | children := es.children.Load() |
| 228 | for _, child := range children.Values() { |
| 229 | child.closeLocked() |
| 230 | } |
| 231 | } |
| 232 | |
| 233 | func (es *endpointSharding) ExitIdle() { |
| 234 | es.childMu.Lock() |
| 235 | defer es.childMu.Unlock() |
| 236 | for _, bw := range es.children.Load().Values() { |
| 237 | if !bw.isClosed { |
| 238 | bw.child.ExitIdle() |
| 239 | } |
| 240 | } |
| 241 | } |
| 242 | |
| 243 | // updateState updates this component's state. It sends the aggregated state, |
| 244 | // and a picker with round robin behavior with all the child states present if |
| 245 | // needed. |
| 246 | func (es *endpointSharding) updateState() { |
| 247 | if es.inhibitChildUpdates.Load() { |
| 248 | return |
| 249 | } |
| 250 | var readyPickers, connectingPickers, idlePickers, transientFailurePickers []balancer.Picker |
| 251 | |
| 252 | es.mu.Lock() |
| 253 | defer es.mu.Unlock() |
| 254 | |
| 255 | children := es.children.Load() |
| 256 | childStates := make([]ChildState, 0, children.Len()) |
| 257 | |
| 258 | for _, child := range children.Values() { |
| 259 | childState := child.childState |
| 260 | childStates = append(childStates, childState) |
| 261 | childPicker := childState.State.Picker |
| 262 | switch childState.State.ConnectivityState { |
| 263 | case connectivity.Ready: |
| 264 | readyPickers = append(readyPickers, childPicker) |
| 265 | case connectivity.Connecting: |
| 266 | connectingPickers = append(connectingPickers, childPicker) |
| 267 | case connectivity.Idle: |
| 268 | idlePickers = append(idlePickers, childPicker) |
| 269 | case connectivity.TransientFailure: |
| 270 | transientFailurePickers = append(transientFailurePickers, childPicker) |
| 271 | // connectivity.Shutdown shouldn't appear. |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | // Construct the round robin picker based off the aggregated state. Whatever |
| 276 | // the aggregated state, use the pickers present that are currently in that |
| 277 | // state only. |
| 278 | var aggState connectivity.State |
| 279 | var pickers []balancer.Picker |
| 280 | if len(readyPickers) >= 1 { |
| 281 | aggState = connectivity.Ready |
| 282 | pickers = readyPickers |
| 283 | } else if len(connectingPickers) >= 1 { |
| 284 | aggState = connectivity.Connecting |
| 285 | pickers = connectingPickers |
| 286 | } else if len(idlePickers) >= 1 { |
| 287 | aggState = connectivity.Idle |
| 288 | pickers = idlePickers |
| 289 | } else if len(transientFailurePickers) >= 1 { |
| 290 | aggState = connectivity.TransientFailure |
| 291 | pickers = transientFailurePickers |
| 292 | } else { |
| 293 | aggState = connectivity.TransientFailure |
| 294 | pickers = []balancer.Picker{base.NewErrPicker(errors.New("no children to pick from"))} |
| 295 | } // No children (resolver error before valid update). |
| 296 | p := &pickerWithChildStates{ |
| 297 | pickers: pickers, |
| 298 | childStates: childStates, |
| 299 | next: uint32(randIntN(len(pickers))), |
| 300 | } |
| 301 | es.cc.UpdateState(balancer.State{ |
| 302 | ConnectivityState: aggState, |
| 303 | Picker: p, |
| 304 | }) |
| 305 | } |
| 306 | |
| 307 | // pickerWithChildStates delegates to the pickers it holds in a round robin |
| 308 | // fashion. It also contains the childStates of all the endpointSharding's |
| 309 | // children. |
| 310 | type pickerWithChildStates struct { |
| 311 | pickers []balancer.Picker |
| 312 | childStates []ChildState |
| 313 | next uint32 |
| 314 | } |
| 315 | |
| 316 | func (p *pickerWithChildStates) Pick(info balancer.PickInfo) (balancer.PickResult, error) { |
| 317 | nextIndex := atomic.AddUint32(&p.next, 1) |
| 318 | picker := p.pickers[nextIndex%uint32(len(p.pickers))] |
| 319 | return picker.Pick(info) |
| 320 | } |
| 321 | |
| 322 | // ChildStatesFromPicker returns the state of all the children managed by the |
| 323 | // endpoint sharding balancer that created this picker. |
| 324 | func ChildStatesFromPicker(picker balancer.Picker) []ChildState { |
| 325 | p, ok := picker.(*pickerWithChildStates) |
| 326 | if !ok { |
| 327 | return nil |
| 328 | } |
| 329 | return p.childStates |
| 330 | } |
| 331 | |
| 332 | // balancerWrapper is a wrapper of a balancer. It ID's a child balancer by |
| 333 | // endpoint, and persists recent child balancer state. |
| 334 | type balancerWrapper struct { |
| 335 | // The following fields are initialized at build time and read-only after |
| 336 | // that and therefore do not need to be guarded by a mutex. |
| 337 | |
| 338 | // child contains the wrapped balancer. Access its methods only through |
| 339 | // methods on balancerWrapper to ensure proper synchronization |
| 340 | child balancer.Balancer |
| 341 | balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns |
| 342 | |
| 343 | es *endpointSharding |
| 344 | |
| 345 | // Access to the following fields is guarded by es.mu. |
| 346 | |
| 347 | childState ChildState |
| 348 | isClosed bool |
| 349 | } |
| 350 | |
| 351 | func (bw *balancerWrapper) UpdateState(state balancer.State) { |
| 352 | bw.es.mu.Lock() |
| 353 | bw.childState.State = state |
| 354 | bw.es.mu.Unlock() |
| 355 | if state.ConnectivityState == connectivity.Idle && !bw.es.esOpts.DisableAutoReconnect { |
| 356 | bw.ExitIdle() |
| 357 | } |
| 358 | bw.es.updateState() |
| 359 | } |
| 360 | |
| 361 | // ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to |
| 362 | // avoid deadlocks due to synchronous balancer state updates. |
| 363 | func (bw *balancerWrapper) ExitIdle() { |
| 364 | go func() { |
| 365 | bw.es.childMu.Lock() |
| 366 | if !bw.isClosed { |
| 367 | bw.child.ExitIdle() |
| 368 | } |
| 369 | bw.es.childMu.Unlock() |
| 370 | }() |
| 371 | } |
| 372 | |
| 373 | // updateClientConnStateLocked delivers the ClientConnState to the child |
| 374 | // balancer. Callers must hold the child mutex of the parent endpointsharding |
| 375 | // balancer. |
| 376 | func (bw *balancerWrapper) updateClientConnStateLocked(ccs balancer.ClientConnState) error { |
| 377 | return bw.child.UpdateClientConnState(ccs) |
| 378 | } |
| 379 | |
| 380 | // closeLocked closes the child balancer. Callers must hold the child mutext of |
| 381 | // the parent endpointsharding balancer. |
| 382 | func (bw *balancerWrapper) closeLocked() { |
| 383 | bw.child.Close() |
| 384 | bw.isClosed = true |
| 385 | } |
| 386 | |
| 387 | func (bw *balancerWrapper) resolverErrorLocked(err error) { |
| 388 | bw.child.ResolverError(err) |
| 389 | } |