blob: 360db08ebc13d91fbab31fb45b7293eebbe82c5b [file] [log] [blame]
Abhay Kumara61c5222025-11-10 07:32:50 +00001/*
2 *
3 * Copyright 2024 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package endpointsharding implements a load balancing policy that manages
20// homogeneous child policies each owning a single endpoint.
21//
22// # Experimental
23//
24// Notice: This package is EXPERIMENTAL and may be changed or removed in a
25// later release.
26package endpointsharding
27
28import (
29 "errors"
30 rand "math/rand/v2"
31 "sync"
32 "sync/atomic"
33
34 "google.golang.org/grpc/balancer"
35 "google.golang.org/grpc/balancer/base"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/resolver"
38)
39
40var randIntN = rand.IntN
41
42// ChildState is the balancer state of a child along with the endpoint which
43// identifies the child balancer.
44type ChildState struct {
45 Endpoint resolver.Endpoint
46 State balancer.State
47
48 // Balancer exposes only the ExitIdler interface of the child LB policy.
49 // Other methods of the child policy are called only by endpointsharding.
50 Balancer ExitIdler
51}
52
53// ExitIdler provides access to only the ExitIdle method of the child balancer.
54type ExitIdler interface {
55 // ExitIdle instructs the LB policy to reconnect to backends / exit the
56 // IDLE state, if appropriate and possible. Note that SubConns that enter
57 // the IDLE state will not reconnect until SubConn.Connect is called.
58 ExitIdle()
59}
60
61// Options are the options to configure the behaviour of the
62// endpointsharding balancer.
63type Options struct {
64 // DisableAutoReconnect allows the balancer to keep child balancer in the
65 // IDLE state until they are explicitly triggered to exit using the
66 // ChildState obtained from the endpointsharding picker. When set to false,
67 // the endpointsharding balancer will automatically call ExitIdle on child
68 // connections that report IDLE.
69 DisableAutoReconnect bool
70}
71
72// ChildBuilderFunc creates a new balancer with the ClientConn. It has the same
73// type as the balancer.Builder.Build method.
74type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer
75
76// NewBalancer returns a load balancing policy that manages homogeneous child
77// policies each owning a single endpoint. The endpointsharding balancer
78// forwards the LoadBalancingConfig in ClientConn state updates to its children.
79func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions, childBuilder ChildBuilderFunc, esOpts Options) balancer.Balancer {
80 es := &endpointSharding{
81 cc: cc,
82 bOpts: opts,
83 esOpts: esOpts,
84 childBuilder: childBuilder,
85 }
86 es.children.Store(resolver.NewEndpointMap[*balancerWrapper]())
87 return es
88}
89
90// endpointSharding is a balancer that wraps child balancers. It creates a child
91// balancer with child config for every unique Endpoint received. It updates the
92// child states on any update from parent or child.
93type endpointSharding struct {
94 cc balancer.ClientConn
95 bOpts balancer.BuildOptions
96 esOpts Options
97 childBuilder ChildBuilderFunc
98
99 // childMu synchronizes calls to any single child. It must be held for all
100 // calls into a child. To avoid deadlocks, do not acquire childMu while
101 // holding mu.
102 childMu sync.Mutex
103 children atomic.Pointer[resolver.EndpointMap[*balancerWrapper]]
104
105 // inhibitChildUpdates is set during UpdateClientConnState/ResolverError
106 // calls (calls to children will each produce an update, only want one
107 // update).
108 inhibitChildUpdates atomic.Bool
109
110 // mu synchronizes access to the state stored in balancerWrappers in the
111 // children field. mu must not be held during calls into a child since
112 // synchronous calls back from the child may require taking mu, causing a
113 // deadlock. To avoid deadlocks, do not acquire childMu while holding mu.
114 mu sync.Mutex
115}
116
117// rotateEndpoints returns a slice of all the input endpoints rotated a random
118// amount.
119func rotateEndpoints(es []resolver.Endpoint) []resolver.Endpoint {
120 les := len(es)
121 if les == 0 {
122 return es
123 }
124 r := randIntN(les)
125 // Make a copy to avoid mutating data beyond the end of es.
126 ret := make([]resolver.Endpoint, les)
127 copy(ret, es[r:])
128 copy(ret[les-r:], es[:r])
129 return ret
130}
131
132// UpdateClientConnState creates a child for new endpoints and deletes children
133// for endpoints that are no longer present. It also updates all the children,
134// and sends a single synchronous update of the childrens' aggregated state at
135// the end of the UpdateClientConnState operation. If any endpoint has no
136// addresses it will ignore that endpoint. Otherwise, returns first error found
137// from a child, but fully processes the new update.
138func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error {
139 es.childMu.Lock()
140 defer es.childMu.Unlock()
141
142 es.inhibitChildUpdates.Store(true)
143 defer func() {
144 es.inhibitChildUpdates.Store(false)
145 es.updateState()
146 }()
147 var ret error
148
149 children := es.children.Load()
150 newChildren := resolver.NewEndpointMap[*balancerWrapper]()
151
152 // Update/Create new children.
153 for _, endpoint := range rotateEndpoints(state.ResolverState.Endpoints) {
154 if _, ok := newChildren.Get(endpoint); ok {
155 // Endpoint child was already created, continue to avoid duplicate
156 // update.
157 continue
158 }
159 childBalancer, ok := children.Get(endpoint)
160 if ok {
161 // Endpoint attributes may have changed, update the stored endpoint.
162 es.mu.Lock()
163 childBalancer.childState.Endpoint = endpoint
164 es.mu.Unlock()
165 } else {
166 childBalancer = &balancerWrapper{
167 childState: ChildState{Endpoint: endpoint},
168 ClientConn: es.cc,
169 es: es,
170 }
171 childBalancer.childState.Balancer = childBalancer
172 childBalancer.child = es.childBuilder(childBalancer, es.bOpts)
173 }
174 newChildren.Set(endpoint, childBalancer)
175 if err := childBalancer.updateClientConnStateLocked(balancer.ClientConnState{
176 BalancerConfig: state.BalancerConfig,
177 ResolverState: resolver.State{
178 Endpoints: []resolver.Endpoint{endpoint},
179 Attributes: state.ResolverState.Attributes,
180 },
181 }); err != nil && ret == nil {
182 // Return first error found, and always commit full processing of
183 // updating children. If desired to process more specific errors
184 // across all endpoints, caller should make these specific
185 // validations, this is a current limitation for simplicity sake.
186 ret = err
187 }
188 }
189 // Delete old children that are no longer present.
190 for _, e := range children.Keys() {
191 child, _ := children.Get(e)
192 if _, ok := newChildren.Get(e); !ok {
193 child.closeLocked()
194 }
195 }
196 es.children.Store(newChildren)
197 if newChildren.Len() == 0 {
198 return balancer.ErrBadResolverState
199 }
200 return ret
201}
202
203// ResolverError forwards the resolver error to all of the endpointSharding's
204// children and sends a single synchronous update of the childStates at the end
205// of the ResolverError operation.
206func (es *endpointSharding) ResolverError(err error) {
207 es.childMu.Lock()
208 defer es.childMu.Unlock()
209 es.inhibitChildUpdates.Store(true)
210 defer func() {
211 es.inhibitChildUpdates.Store(false)
212 es.updateState()
213 }()
214 children := es.children.Load()
215 for _, child := range children.Values() {
216 child.resolverErrorLocked(err)
217 }
218}
219
220func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
221 // UpdateSubConnState is deprecated.
222}
223
224func (es *endpointSharding) Close() {
225 es.childMu.Lock()
226 defer es.childMu.Unlock()
227 children := es.children.Load()
228 for _, child := range children.Values() {
229 child.closeLocked()
230 }
231}
232
233func (es *endpointSharding) ExitIdle() {
234 es.childMu.Lock()
235 defer es.childMu.Unlock()
236 for _, bw := range es.children.Load().Values() {
237 if !bw.isClosed {
238 bw.child.ExitIdle()
239 }
240 }
241}
242
243// updateState updates this component's state. It sends the aggregated state,
244// and a picker with round robin behavior with all the child states present if
245// needed.
246func (es *endpointSharding) updateState() {
247 if es.inhibitChildUpdates.Load() {
248 return
249 }
250 var readyPickers, connectingPickers, idlePickers, transientFailurePickers []balancer.Picker
251
252 es.mu.Lock()
253 defer es.mu.Unlock()
254
255 children := es.children.Load()
256 childStates := make([]ChildState, 0, children.Len())
257
258 for _, child := range children.Values() {
259 childState := child.childState
260 childStates = append(childStates, childState)
261 childPicker := childState.State.Picker
262 switch childState.State.ConnectivityState {
263 case connectivity.Ready:
264 readyPickers = append(readyPickers, childPicker)
265 case connectivity.Connecting:
266 connectingPickers = append(connectingPickers, childPicker)
267 case connectivity.Idle:
268 idlePickers = append(idlePickers, childPicker)
269 case connectivity.TransientFailure:
270 transientFailurePickers = append(transientFailurePickers, childPicker)
271 // connectivity.Shutdown shouldn't appear.
272 }
273 }
274
275 // Construct the round robin picker based off the aggregated state. Whatever
276 // the aggregated state, use the pickers present that are currently in that
277 // state only.
278 var aggState connectivity.State
279 var pickers []balancer.Picker
280 if len(readyPickers) >= 1 {
281 aggState = connectivity.Ready
282 pickers = readyPickers
283 } else if len(connectingPickers) >= 1 {
284 aggState = connectivity.Connecting
285 pickers = connectingPickers
286 } else if len(idlePickers) >= 1 {
287 aggState = connectivity.Idle
288 pickers = idlePickers
289 } else if len(transientFailurePickers) >= 1 {
290 aggState = connectivity.TransientFailure
291 pickers = transientFailurePickers
292 } else {
293 aggState = connectivity.TransientFailure
294 pickers = []balancer.Picker{base.NewErrPicker(errors.New("no children to pick from"))}
295 } // No children (resolver error before valid update).
296 p := &pickerWithChildStates{
297 pickers: pickers,
298 childStates: childStates,
299 next: uint32(randIntN(len(pickers))),
300 }
301 es.cc.UpdateState(balancer.State{
302 ConnectivityState: aggState,
303 Picker: p,
304 })
305}
306
307// pickerWithChildStates delegates to the pickers it holds in a round robin
308// fashion. It also contains the childStates of all the endpointSharding's
309// children.
310type pickerWithChildStates struct {
311 pickers []balancer.Picker
312 childStates []ChildState
313 next uint32
314}
315
316func (p *pickerWithChildStates) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
317 nextIndex := atomic.AddUint32(&p.next, 1)
318 picker := p.pickers[nextIndex%uint32(len(p.pickers))]
319 return picker.Pick(info)
320}
321
322// ChildStatesFromPicker returns the state of all the children managed by the
323// endpoint sharding balancer that created this picker.
324func ChildStatesFromPicker(picker balancer.Picker) []ChildState {
325 p, ok := picker.(*pickerWithChildStates)
326 if !ok {
327 return nil
328 }
329 return p.childStates
330}
331
332// balancerWrapper is a wrapper of a balancer. It ID's a child balancer by
333// endpoint, and persists recent child balancer state.
334type balancerWrapper struct {
335 // The following fields are initialized at build time and read-only after
336 // that and therefore do not need to be guarded by a mutex.
337
338 // child contains the wrapped balancer. Access its methods only through
339 // methods on balancerWrapper to ensure proper synchronization
340 child balancer.Balancer
341 balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns
342
343 es *endpointSharding
344
345 // Access to the following fields is guarded by es.mu.
346
347 childState ChildState
348 isClosed bool
349}
350
351func (bw *balancerWrapper) UpdateState(state balancer.State) {
352 bw.es.mu.Lock()
353 bw.childState.State = state
354 bw.es.mu.Unlock()
355 if state.ConnectivityState == connectivity.Idle && !bw.es.esOpts.DisableAutoReconnect {
356 bw.ExitIdle()
357 }
358 bw.es.updateState()
359}
360
361// ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to
362// avoid deadlocks due to synchronous balancer state updates.
363func (bw *balancerWrapper) ExitIdle() {
364 go func() {
365 bw.es.childMu.Lock()
366 if !bw.isClosed {
367 bw.child.ExitIdle()
368 }
369 bw.es.childMu.Unlock()
370 }()
371}
372
373// updateClientConnStateLocked delivers the ClientConnState to the child
374// balancer. Callers must hold the child mutex of the parent endpointsharding
375// balancer.
376func (bw *balancerWrapper) updateClientConnStateLocked(ccs balancer.ClientConnState) error {
377 return bw.child.UpdateClientConnState(ccs)
378}
379
380// closeLocked closes the child balancer. Callers must hold the child mutext of
381// the parent endpointsharding balancer.
382func (bw *balancerWrapper) closeLocked() {
383 bw.child.Close()
384 bw.isClosed = true
385}
386
387func (bw *balancerWrapper) resolverErrorLocked(err error) {
388 bw.child.ResolverError(err)
389}