blob: 04b9ad41169129f38bd871bec14b8fb38669423c [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
Akash Kankanala761955c2024-02-21 19:32:20 +053022 "context"
khenaidoo5fc5cea2021-08-11 17:39:16 -040023 "fmt"
Akash Kankanala761955c2024-02-21 19:32:20 +053024 "strings"
khenaidoo5fc5cea2021-08-11 17:39:16 -040025 "sync"
26
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/connectivity"
Akash Kankanala761955c2024-02-21 19:32:20 +053029 "google.golang.org/grpc/internal/balancer/gracefulswitch"
khenaidoo5fc5cea2021-08-11 17:39:16 -040030 "google.golang.org/grpc/internal/channelz"
31 "google.golang.org/grpc/internal/grpcsync"
32 "google.golang.org/grpc/resolver"
33)
34
Akash Kankanala761955c2024-02-21 19:32:20 +053035type ccbMode int
khenaidoo5fc5cea2021-08-11 17:39:16 -040036
Akash Kankanala761955c2024-02-21 19:32:20 +053037const (
38 ccbModeActive = iota
39 ccbModeIdle
40 ccbModeClosed
41 ccbModeExitingIdle
42)
khenaidoo5fc5cea2021-08-11 17:39:16 -040043
Akash Kankanala761955c2024-02-21 19:32:20 +053044// ccBalancerWrapper sits between the ClientConn and the Balancer.
45//
46// ccBalancerWrapper implements methods corresponding to the ones on the
47// balancer.Balancer interface. The ClientConn is free to call these methods
48// concurrently and the ccBalancerWrapper ensures that calls from the ClientConn
49// to the Balancer happen synchronously and in order.
50//
51// ccBalancerWrapper also implements the balancer.ClientConn interface and is
52// passed to the Balancer implementations. It invokes unexported methods on the
53// ClientConn to handle these calls from the Balancer.
54//
55// It uses the gracefulswitch.Balancer internally to ensure that balancer
56// switches happen in a graceful manner.
khenaidoo5fc5cea2021-08-11 17:39:16 -040057type ccBalancerWrapper struct {
Akash Kankanala761955c2024-02-21 19:32:20 +053058 // The following fields are initialized when the wrapper is created and are
59 // read-only afterwards, and therefore can be accessed without a mutex.
60 cc *ClientConn
61 opts balancer.BuildOptions
khenaidoo5fc5cea2021-08-11 17:39:16 -040062
Akash Kankanala761955c2024-02-21 19:32:20 +053063 // Outgoing (gRPC --> balancer) calls are guaranteed to execute in a
64 // mutually exclusive manner as they are scheduled in the serializer. Fields
65 // accessed *only* in these serializer callbacks, can therefore be accessed
66 // without a mutex.
67 balancer *gracefulswitch.Balancer
68 curBalancerName string
69
70 // mu guards access to the below fields. Access to the serializer and its
71 // cancel function needs to be mutex protected because they are overwritten
72 // when the wrapper exits idle mode.
73 mu sync.Mutex
74 serializer *grpcsync.CallbackSerializer // To serialize all outoing calls.
75 serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time.
76 mode ccbMode // Tracks the current mode of the wrapper.
khenaidoo5fc5cea2021-08-11 17:39:16 -040077}
78
Akash Kankanala761955c2024-02-21 19:32:20 +053079// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
80// is not created until the switchTo() method is invoked.
81func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
82 ctx, cancel := context.WithCancel(context.Background())
khenaidoo5fc5cea2021-08-11 17:39:16 -040083 ccb := &ccBalancerWrapper{
Akash Kankanala761955c2024-02-21 19:32:20 +053084 cc: cc,
85 opts: bopts,
86 serializer: grpcsync.NewCallbackSerializer(ctx),
87 serializerCancel: cancel,
khenaidoo5fc5cea2021-08-11 17:39:16 -040088 }
Akash Kankanala761955c2024-02-21 19:32:20 +053089 ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
khenaidoo5fc5cea2021-08-11 17:39:16 -040090 return ccb
91}
92
Akash Kankanala761955c2024-02-21 19:32:20 +053093// updateClientConnState is invoked by grpc to push a ClientConnState update to
94// the underlying balancer.
khenaidoo5fc5cea2021-08-11 17:39:16 -040095func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
Akash Kankanala761955c2024-02-21 19:32:20 +053096 ccb.mu.Lock()
97 errCh := make(chan error, 1)
98 // Here and everywhere else where Schedule() is called, it is done with the
99 // lock held. But the lock guards only the scheduling part. The actual
100 // callback is called asynchronously without the lock being held.
101 ok := ccb.serializer.Schedule(func(_ context.Context) {
102 // If the addresses specified in the update contain addresses of type
103 // "grpclb" and the selected LB policy is not "grpclb", these addresses
104 // will be filtered out and ccs will be modified with the updated
105 // address list.
106 if ccb.curBalancerName != grpclbName {
107 var addrs []resolver.Address
108 for _, addr := range ccs.ResolverState.Addresses {
109 if addr.Type == resolver.GRPCLB {
110 continue
111 }
112 addrs = append(addrs, addr)
113 }
114 ccs.ResolverState.Addresses = addrs
115 }
116 errCh <- ccb.balancer.UpdateClientConnState(*ccs)
117 })
118 if !ok {
119 // If we are unable to schedule a function with the serializer, it
120 // indicates that it has been closed. A serializer is only closed when
121 // the wrapper is closed or is in idle.
122 ccb.mu.Unlock()
123 return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer")
124 }
125 ccb.mu.Unlock()
126
127 // We get here only if the above call to Schedule succeeds, in which case it
128 // is guaranteed that the scheduled function will run. Therefore it is safe
129 // to block on this channel.
130 err := <-errCh
131 if logger.V(2) && err != nil {
132 logger.Infof("error from balancer.UpdateClientConnState: %v", err)
133 }
134 return err
135}
136
137// updateSubConnState is invoked by grpc to push a subConn state update to the
138// underlying balancer.
139func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
140 ccb.mu.Lock()
141 ccb.serializer.Schedule(func(_ context.Context) {
142 ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
143 })
144 ccb.mu.Unlock()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400145}
146
147func (ccb *ccBalancerWrapper) resolverError(err error) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530148 ccb.mu.Lock()
149 ccb.serializer.Schedule(func(_ context.Context) {
150 ccb.balancer.ResolverError(err)
151 })
152 ccb.mu.Unlock()
153}
154
155// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
156// LB policy identified by name.
157//
158// ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the
159// first good update from the name resolver, it determines the LB policy to use
160// and invokes the switchTo() method. Upon receipt of every subsequent update
161// from the name resolver, it invokes this method.
162//
163// the ccBalancerWrapper keeps track of the current LB policy name, and skips
164// the graceful balancer switching process if the name does not change.
165func (ccb *ccBalancerWrapper) switchTo(name string) {
166 ccb.mu.Lock()
167 ccb.serializer.Schedule(func(_ context.Context) {
168 // TODO: Other languages use case-sensitive balancer registries. We should
169 // switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
170 if strings.EqualFold(ccb.curBalancerName, name) {
171 return
172 }
173 ccb.buildLoadBalancingPolicy(name)
174 })
175 ccb.mu.Unlock()
176}
177
178// buildLoadBalancingPolicy performs the following:
179// - retrieve a balancer builder for the given name. Use the default LB
180// policy, pick_first, if no LB policy with name is found in the registry.
181// - instruct the gracefulswitch balancer to switch to the above builder. This
182// will actually build the new balancer.
183// - update the `curBalancerName` field
184//
185// Must be called from a serializer callback.
186func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {
187 builder := balancer.Get(name)
188 if builder == nil {
189 channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
190 builder = newPickfirstBuilder()
191 } else {
192 channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
193 }
194
195 if err := ccb.balancer.SwitchTo(builder); err != nil {
196 channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
197 return
198 }
199 ccb.curBalancerName = builder.Name()
200}
201
202func (ccb *ccBalancerWrapper) close() {
203 channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing")
204 ccb.closeBalancer(ccbModeClosed)
205}
206
207// enterIdleMode is invoked by grpc when the channel enters idle mode upon
208// expiry of idle_timeout. This call blocks until the balancer is closed.
209func (ccb *ccBalancerWrapper) enterIdleMode() {
210 channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode")
211 ccb.closeBalancer(ccbModeIdle)
212}
213
214// closeBalancer is invoked when the channel is being closed or when it enters
215// idle mode upon expiry of idle_timeout.
216func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {
217 ccb.mu.Lock()
218 if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle {
219 ccb.mu.Unlock()
220 return
221 }
222
223 ccb.mode = m
224 done := ccb.serializer.Done
225 b := ccb.balancer
226 ok := ccb.serializer.Schedule(func(_ context.Context) {
227 // Close the serializer to ensure that no more calls from gRPC are sent
228 // to the balancer.
229 ccb.serializerCancel()
230 // Empty the current balancer name because we don't have a balancer
231 // anymore and also so that we act on the next call to switchTo by
232 // creating a new balancer specified by the new resolver.
233 ccb.curBalancerName = ""
234 })
235 if !ok {
236 ccb.mu.Unlock()
237 return
238 }
239 ccb.mu.Unlock()
240
241 // Give enqueued callbacks a chance to finish.
242 <-done
243 // Spawn a goroutine to close the balancer (since it may block trying to
244 // cleanup all allocated resources) and return early.
245 go b.Close()
246}
247
248// exitIdleMode is invoked by grpc when the channel exits idle mode either
249// because of an RPC or because of an invocation of the Connect() API. This
250// recreates the balancer that was closed previously when entering idle mode.
251//
252// If the channel is not in idle mode, we know for a fact that we are here as a
253// result of the user calling the Connect() method on the ClientConn. In this
254// case, we can simply forward the call to the underlying balancer, instructing
255// it to reconnect to the backends.
256func (ccb *ccBalancerWrapper) exitIdleMode() {
257 ccb.mu.Lock()
258 if ccb.mode == ccbModeClosed {
259 // Request to exit idle is a no-op when wrapper is already closed.
260 ccb.mu.Unlock()
261 return
262 }
263
264 if ccb.mode == ccbModeIdle {
265 // Recreate the serializer which was closed when we entered idle.
266 ctx, cancel := context.WithCancel(context.Background())
267 ccb.serializer = grpcsync.NewCallbackSerializer(ctx)
268 ccb.serializerCancel = cancel
269 }
270
271 // The ClientConn guarantees that mutual exclusion between close() and
272 // exitIdleMode(), and since we just created a new serializer, we can be
273 // sure that the below function will be scheduled.
274 done := make(chan struct{})
275 ccb.serializer.Schedule(func(_ context.Context) {
276 defer close(done)
277
278 ccb.mu.Lock()
279 defer ccb.mu.Unlock()
280
281 if ccb.mode != ccbModeIdle {
282 ccb.balancer.ExitIdle()
283 return
284 }
285
286 // Gracefulswitch balancer does not support a switchTo operation after
287 // being closed. Hence we need to create a new one here.
288 ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
289 ccb.mode = ccbModeActive
290 channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")
291
292 })
293 ccb.mu.Unlock()
294
295 <-done
296}
297
298func (ccb *ccBalancerWrapper) isIdleOrClosed() bool {
299 ccb.mu.Lock()
300 defer ccb.mu.Unlock()
301 return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed
khenaidoo5fc5cea2021-08-11 17:39:16 -0400302}
303
304func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530305 if ccb.isIdleOrClosed() {
306 return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle")
khenaidoo5fc5cea2021-08-11 17:39:16 -0400307 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530308
309 if len(addrs) == 0 {
310 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
khenaidoo5fc5cea2021-08-11 17:39:16 -0400311 }
312 ac, err := ccb.cc.newAddrConn(addrs, opts)
313 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +0530314 channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400315 return nil, err
316 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530317 acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
khenaidoo5fc5cea2021-08-11 17:39:16 -0400318 ac.acbw = acbw
khenaidoo5fc5cea2021-08-11 17:39:16 -0400319 return acbw, nil
320}
321
322func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530323 if ccb.isIdleOrClosed() {
324 // It it safe to ignore this call when the balancer is closed or in idle
325 // because the ClientConn takes care of closing the connections.
326 //
327 // Not returning early from here when the balancer is closed or in idle
328 // leads to a deadlock though, because of the following sequence of
329 // calls when holding cc.mu:
330 // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
331 // ccb.RemoveAddrConn --> cc.removeAddrConn
332 return
333 }
334
335 acbw, ok := sc.(*acBalancerWrapper)
336 if !ok {
337 return
338 }
339 ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400340}
341
342func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530343 if ccb.isIdleOrClosed() {
344 return
345 }
346
khenaidoo5fc5cea2021-08-11 17:39:16 -0400347 acbw, ok := sc.(*acBalancerWrapper)
348 if !ok {
349 return
350 }
351 acbw.UpdateAddresses(addrs)
352}
353
354func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530355 if ccb.isIdleOrClosed() {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400356 return
357 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530358
khenaidoo5fc5cea2021-08-11 17:39:16 -0400359 // Update picker before updating state. Even though the ordering here does
360 // not matter, it can lead to multiple calls of Pick in the common start-up
361 // case where we wait for ready and then perform an RPC. If the picker is
362 // updated later, we could call the "connecting" picker when the state is
363 // updated, and then call the "ready" picker after the picker gets updated.
364 ccb.cc.blockingpicker.updatePicker(s.Picker)
365 ccb.cc.csMgr.updateState(s.ConnectivityState)
366}
367
368func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530369 if ccb.isIdleOrClosed() {
370 return
371 }
372
khenaidoo5fc5cea2021-08-11 17:39:16 -0400373 ccb.cc.resolveNow(o)
374}
375
376func (ccb *ccBalancerWrapper) Target() string {
377 return ccb.cc.target
378}
379
380// acBalancerWrapper is a wrapper on top of ac for balancers.
381// It implements balancer.SubConn interface.
382type acBalancerWrapper struct {
Akash Kankanala761955c2024-02-21 19:32:20 +0530383 ac *addrConn // read-only
384
385 mu sync.Mutex
386 producers map[balancer.ProducerBuilder]*refCountedProducer
387}
388
389func (acbw *acBalancerWrapper) String() string {
390 return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int())
khenaidoo5fc5cea2021-08-11 17:39:16 -0400391}
392
393func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530394 acbw.ac.updateAddrs(addrs)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400395}
396
397func (acbw *acBalancerWrapper) Connect() {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400398 go acbw.ac.connect()
399}
400
Akash Kankanala761955c2024-02-21 19:32:20 +0530401// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
402// ready, blocks until it is or ctx expires. Returns an error when the context
403// expires or the addrConn is shut down.
404func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
405 transport, err := acbw.ac.getTransport(ctx)
406 if err != nil {
407 return nil, err
408 }
409 return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
410}
411
412// Invoke performs a unary RPC. If the addrConn is not ready, returns
413// errSubConnNotReady.
414func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error {
415 cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...)
416 if err != nil {
417 return err
418 }
419 if err := cs.SendMsg(args); err != nil {
420 return err
421 }
422 return cs.RecvMsg(reply)
423}
424
425type refCountedProducer struct {
426 producer balancer.Producer
427 refs int // number of current refs to the producer
428 close func() // underlying producer's close function
429}
430
431func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400432 acbw.mu.Lock()
433 defer acbw.mu.Unlock()
Akash Kankanala761955c2024-02-21 19:32:20 +0530434
435 // Look up existing producer from this builder.
436 pData := acbw.producers[pb]
437 if pData == nil {
438 // Not found; create a new one and add it to the producers map.
439 p, close := pb.Build(acbw)
440 pData = &refCountedProducer{producer: p, close: close}
441 acbw.producers[pb] = pData
442 }
443 // Account for this new reference.
444 pData.refs++
445
446 // Return a cleanup function wrapped in a OnceFunc to remove this reference
447 // and delete the refCountedProducer from the map if the total reference
448 // count goes to zero.
449 unref := func() {
450 acbw.mu.Lock()
451 pData.refs--
452 if pData.refs == 0 {
453 defer pData.close() // Run outside the acbw mutex
454 delete(acbw.producers, pb)
455 }
456 acbw.mu.Unlock()
457 }
458 return pData.producer, grpcsync.OnceFunc(unref)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400459}