[VOL-5486] Upgrade library versions
Change-Id: I8b4e88699e03f44ee13e467867f45ae3f0a63c4b
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/google.golang.org/grpc/balancer_wrapper.go b/vendor/google.golang.org/grpc/balancer_wrapper.go
new file mode 100644
index 0000000..948a21e
--- /dev/null
+++ b/vendor/google.golang.org/grpc/balancer_wrapper.go
@@ -0,0 +1,520 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpc
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/experimental/stats"
+ "google.golang.org/grpc/internal"
+ "google.golang.org/grpc/internal/balancer/gracefulswitch"
+ "google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/grpcsync"
+ "google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/status"
+)
+
+var (
+ setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
+ // noOpRegisterHealthListenerFn is used when client side health checking is
+ // disabled. It sends a single READY update on the registered listener.
+ noOpRegisterHealthListenerFn = func(_ context.Context, listener func(balancer.SubConnState)) func() {
+ listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
+ return func() {}
+ }
+)
+
+// ccBalancerWrapper sits between the ClientConn and the Balancer.
+//
+// ccBalancerWrapper implements methods corresponding to the ones on the
+// balancer.Balancer interface. The ClientConn is free to call these methods
+// concurrently and the ccBalancerWrapper ensures that calls from the ClientConn
+// to the Balancer happen in order by performing them in the serializer, without
+// any mutexes held.
+//
+// ccBalancerWrapper also implements the balancer.ClientConn interface and is
+// passed to the Balancer implementations. It invokes unexported methods on the
+// ClientConn to handle these calls from the Balancer.
+//
+// It uses the gracefulswitch.Balancer internally to ensure that balancer
+// switches happen in a graceful manner.
+type ccBalancerWrapper struct {
+ internal.EnforceClientConnEmbedding
+ // The following fields are initialized when the wrapper is created and are
+ // read-only afterwards, and therefore can be accessed without a mutex.
+ cc *ClientConn
+ opts balancer.BuildOptions
+ serializer *grpcsync.CallbackSerializer
+ serializerCancel context.CancelFunc
+
+ // The following fields are only accessed within the serializer or during
+ // initialization.
+ curBalancerName string
+ balancer *gracefulswitch.Balancer
+
+ // The following field is protected by mu. Caller must take cc.mu before
+ // taking mu.
+ mu sync.Mutex
+ closed bool
+}
+
+// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
+// underlying balancer is not created until the updateClientConnState() method
+// is invoked.
+func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
+ ctx, cancel := context.WithCancel(cc.ctx)
+ ccb := &ccBalancerWrapper{
+ cc: cc,
+ opts: balancer.BuildOptions{
+ DialCreds: cc.dopts.copts.TransportCredentials,
+ CredsBundle: cc.dopts.copts.CredsBundle,
+ Dialer: cc.dopts.copts.Dialer,
+ Authority: cc.authority,
+ CustomUserAgent: cc.dopts.copts.UserAgent,
+ ChannelzParent: cc.channelz,
+ Target: cc.parsedTarget,
+ },
+ serializer: grpcsync.NewCallbackSerializer(ctx),
+ serializerCancel: cancel,
+ }
+ ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
+ return ccb
+}
+
+func (ccb *ccBalancerWrapper) MetricsRecorder() stats.MetricsRecorder {
+ return ccb.cc.metricsRecorderList
+}
+
+// updateClientConnState is invoked by grpc to push a ClientConnState update to
+// the underlying balancer. This is always executed from the serializer, so
+// it is safe to call into the balancer here.
+func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
+ errCh := make(chan error)
+ uccs := func(ctx context.Context) {
+ defer close(errCh)
+ if ctx.Err() != nil || ccb.balancer == nil {
+ return
+ }
+ name := gracefulswitch.ChildName(ccs.BalancerConfig)
+ if ccb.curBalancerName != name {
+ ccb.curBalancerName = name
+ channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name)
+ }
+ err := ccb.balancer.UpdateClientConnState(*ccs)
+ if logger.V(2) && err != nil {
+ logger.Infof("error from balancer.UpdateClientConnState: %v", err)
+ }
+ errCh <- err
+ }
+ onFailure := func() { close(errCh) }
+
+ // UpdateClientConnState can race with Close, and when the latter wins, the
+ // serializer is closed, and the attempt to schedule the callback will fail.
+ // It is acceptable to ignore this failure. But since we want to handle the
+ // state update in a blocking fashion (when we successfully schedule the
+ // callback), we have to use the ScheduleOr method and not the MaybeSchedule
+ // method on the serializer.
+ ccb.serializer.ScheduleOr(uccs, onFailure)
+ return <-errCh
+}
+
+// resolverError is invoked by grpc to push a resolver error to the underlying
+// balancer. The call to the balancer is executed from the serializer.
+func (ccb *ccBalancerWrapper) resolverError(err error) {
+ ccb.serializer.TrySchedule(func(ctx context.Context) {
+ if ctx.Err() != nil || ccb.balancer == nil {
+ return
+ }
+ ccb.balancer.ResolverError(err)
+ })
+}
+
+// close initiates async shutdown of the wrapper. cc.mu must be held when
+// calling this function. To determine the wrapper has finished shutting down,
+// the channel should block on ccb.serializer.Done() without cc.mu held.
+func (ccb *ccBalancerWrapper) close() {
+ ccb.mu.Lock()
+ ccb.closed = true
+ ccb.mu.Unlock()
+ channelz.Info(logger, ccb.cc.channelz, "ccBalancerWrapper: closing")
+ ccb.serializer.TrySchedule(func(context.Context) {
+ if ccb.balancer == nil {
+ return
+ }
+ ccb.balancer.Close()
+ ccb.balancer = nil
+ })
+ ccb.serializerCancel()
+}
+
+// exitIdle invokes the balancer's exitIdle method in the serializer.
+func (ccb *ccBalancerWrapper) exitIdle() {
+ ccb.serializer.TrySchedule(func(ctx context.Context) {
+ if ctx.Err() != nil || ccb.balancer == nil {
+ return
+ }
+ ccb.balancer.ExitIdle()
+ })
+}
+
+func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
+ ccb.cc.mu.Lock()
+ defer ccb.cc.mu.Unlock()
+
+ ccb.mu.Lock()
+ if ccb.closed {
+ ccb.mu.Unlock()
+ return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed")
+ }
+ ccb.mu.Unlock()
+
+ if len(addrs) == 0 {
+ return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
+ }
+ ac, err := ccb.cc.newAddrConnLocked(addrs, opts)
+ if err != nil {
+ channelz.Warningf(logger, ccb.cc.channelz, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
+ return nil, err
+ }
+ acbw := &acBalancerWrapper{
+ ccb: ccb,
+ ac: ac,
+ producers: make(map[balancer.ProducerBuilder]*refCountedProducer),
+ stateListener: opts.StateListener,
+ healthData: newHealthData(connectivity.Idle),
+ }
+ ac.acbw = acbw
+ return acbw, nil
+}
+
+func (ccb *ccBalancerWrapper) RemoveSubConn(balancer.SubConn) {
+ // The graceful switch balancer will never call this.
+ logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc")
+}
+
+func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
+ acbw, ok := sc.(*acBalancerWrapper)
+ if !ok {
+ return
+ }
+ acbw.UpdateAddresses(addrs)
+}
+
+func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
+ ccb.cc.mu.Lock()
+ defer ccb.cc.mu.Unlock()
+ if ccb.cc.conns == nil {
+ // The CC has been closed; ignore this update.
+ return
+ }
+
+ ccb.mu.Lock()
+ if ccb.closed {
+ ccb.mu.Unlock()
+ return
+ }
+ ccb.mu.Unlock()
+ // Update picker before updating state. Even though the ordering here does
+ // not matter, it can lead to multiple calls of Pick in the common start-up
+ // case where we wait for ready and then perform an RPC. If the picker is
+ // updated later, we could call the "connecting" picker when the state is
+ // updated, and then call the "ready" picker after the picker gets updated.
+
+ // Note that there is no need to check if the balancer wrapper was closed,
+ // as we know the graceful switch LB policy will not call cc if it has been
+ // closed.
+ ccb.cc.pickerWrapper.updatePicker(s.Picker)
+ ccb.cc.csMgr.updateState(s.ConnectivityState)
+}
+
+func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
+ ccb.cc.mu.RLock()
+ defer ccb.cc.mu.RUnlock()
+
+ ccb.mu.Lock()
+ if ccb.closed {
+ ccb.mu.Unlock()
+ return
+ }
+ ccb.mu.Unlock()
+ ccb.cc.resolveNowLocked(o)
+}
+
+func (ccb *ccBalancerWrapper) Target() string {
+ return ccb.cc.target
+}
+
+// acBalancerWrapper is a wrapper on top of ac for balancers.
+// It implements balancer.SubConn interface.
+type acBalancerWrapper struct {
+ internal.EnforceSubConnEmbedding
+ ac *addrConn // read-only
+ ccb *ccBalancerWrapper // read-only
+ stateListener func(balancer.SubConnState)
+
+ producersMu sync.Mutex
+ producers map[balancer.ProducerBuilder]*refCountedProducer
+
+ // Access to healthData is protected by healthMu.
+ healthMu sync.Mutex
+ // healthData is stored as a pointer to detect when the health listener is
+ // dropped or updated. This is required as closures can't be compared for
+ // equality.
+ healthData *healthData
+}
+
+// healthData holds data related to health state reporting.
+type healthData struct {
+ // connectivityState stores the most recent connectivity state delivered
+ // to the LB policy. This is stored to avoid sending updates when the
+ // SubConn has already exited connectivity state READY.
+ connectivityState connectivity.State
+ // closeHealthProducer stores function to close the ref counted health
+ // producer. The health producer is automatically closed when the SubConn
+ // state changes.
+ closeHealthProducer func()
+}
+
+func newHealthData(s connectivity.State) *healthData {
+ return &healthData{
+ connectivityState: s,
+ closeHealthProducer: func() {},
+ }
+}
+
+// updateState is invoked by grpc to push a subConn state update to the
+// underlying balancer.
+func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolver.Address, err error) {
+ acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
+ if ctx.Err() != nil || acbw.ccb.balancer == nil {
+ return
+ }
+ // Invalidate all producers on any state change.
+ acbw.closeProducers()
+
+ // Even though it is optional for balancers, gracefulswitch ensures
+ // opts.StateListener is set, so this cannot ever be nil.
+ // TODO: delete this comment when UpdateSubConnState is removed.
+ scs := balancer.SubConnState{ConnectivityState: s, ConnectionError: err}
+ if s == connectivity.Ready {
+ setConnectedAddress(&scs, curAddr)
+ }
+ // Invalidate the health listener by updating the healthData.
+ acbw.healthMu.Lock()
+ // A race may occur if a health listener is registered soon after the
+ // connectivity state is set but before the stateListener is called.
+ // Two cases may arise:
+ // 1. The new state is not READY: RegisterHealthListener has checks to
+ // ensure no updates are sent when the connectivity state is not
+ // READY.
+ // 2. The new state is READY: This means that the old state wasn't Ready.
+ // The RegisterHealthListener API mentions that a health listener
+ // must not be registered when a SubConn is not ready to avoid such
+ // races. When this happens, the LB policy would get health updates
+ // on the old listener. When the LB policy registers a new listener
+ // on receiving the connectivity update, the health updates will be
+ // sent to the new health listener.
+ acbw.healthData = newHealthData(scs.ConnectivityState)
+ acbw.healthMu.Unlock()
+
+ acbw.stateListener(scs)
+ })
+}
+
+func (acbw *acBalancerWrapper) String() string {
+ return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelz.ID)
+}
+
+func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
+ acbw.ac.updateAddrs(addrs)
+}
+
+func (acbw *acBalancerWrapper) Connect() {
+ go acbw.ac.connect()
+}
+
+func (acbw *acBalancerWrapper) Shutdown() {
+ acbw.closeProducers()
+ acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
+}
+
+// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
+// ready, blocks until it is or ctx expires. Returns an error when the context
+// expires or the addrConn is shut down.
+func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
+ transport := acbw.ac.getReadyTransport()
+ if transport == nil {
+ return nil, status.Errorf(codes.Unavailable, "SubConn state is not Ready")
+
+ }
+ return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
+}
+
+// Invoke performs a unary RPC. If the addrConn is not ready, returns
+// errSubConnNotReady.
+func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error {
+ cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...)
+ if err != nil {
+ return err
+ }
+ if err := cs.SendMsg(args); err != nil {
+ return err
+ }
+ return cs.RecvMsg(reply)
+}
+
+type refCountedProducer struct {
+ producer balancer.Producer
+ refs int // number of current refs to the producer
+ close func() // underlying producer's close function
+}
+
+func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
+ acbw.producersMu.Lock()
+ defer acbw.producersMu.Unlock()
+
+ // Look up existing producer from this builder.
+ pData := acbw.producers[pb]
+ if pData == nil {
+ // Not found; create a new one and add it to the producers map.
+ p, closeFn := pb.Build(acbw)
+ pData = &refCountedProducer{producer: p, close: closeFn}
+ acbw.producers[pb] = pData
+ }
+ // Account for this new reference.
+ pData.refs++
+
+ // Return a cleanup function wrapped in a OnceFunc to remove this reference
+ // and delete the refCountedProducer from the map if the total reference
+ // count goes to zero.
+ unref := func() {
+ acbw.producersMu.Lock()
+ // If closeProducers has already closed this producer instance, refs is
+ // set to 0, so the check after decrementing will never pass, and the
+ // producer will not be double-closed.
+ pData.refs--
+ if pData.refs == 0 {
+ defer pData.close() // Run outside the acbw mutex
+ delete(acbw.producers, pb)
+ }
+ acbw.producersMu.Unlock()
+ }
+ return pData.producer, sync.OnceFunc(unref)
+}
+
+func (acbw *acBalancerWrapper) closeProducers() {
+ acbw.producersMu.Lock()
+ defer acbw.producersMu.Unlock()
+ for pb, pData := range acbw.producers {
+ pData.refs = 0
+ pData.close()
+ delete(acbw.producers, pb)
+ }
+}
+
+// healthProducerRegisterFn is a type alias for the health producer's function
+// for registering listeners.
+type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func()
+
+// healthListenerRegFn returns a function to register a listener for health
+// updates. If client side health checks are disabled, the registered listener
+// will get a single READY (raw connectivity state) update.
+//
+// Client side health checking is enabled when all the following
+// conditions are satisfied:
+// 1. Health checking is not disabled using the dial option.
+// 2. The health package is imported.
+// 3. The health check config is present in the service config.
+func (acbw *acBalancerWrapper) healthListenerRegFn() func(context.Context, func(balancer.SubConnState)) func() {
+ if acbw.ccb.cc.dopts.disableHealthCheck {
+ return noOpRegisterHealthListenerFn
+ }
+ regHealthLisFn := internal.RegisterClientHealthCheckListener
+ if regHealthLisFn == nil {
+ // The health package is not imported.
+ return noOpRegisterHealthListenerFn
+ }
+ cfg := acbw.ac.cc.healthCheckConfig()
+ if cfg == nil {
+ return noOpRegisterHealthListenerFn
+ }
+ return func(ctx context.Context, listener func(balancer.SubConnState)) func() {
+ return regHealthLisFn.(healthProducerRegisterFn)(ctx, acbw, cfg.ServiceName, listener)
+ }
+}
+
+// RegisterHealthListener accepts a health listener from the LB policy. It sends
+// updates to the health listener as long as the SubConn's connectivity state
+// doesn't change and a new health listener is not registered. To invalidate
+// the currently registered health listener, acbw updates the healthData. If a
+// nil listener is registered, the active health listener is dropped.
+func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) {
+ acbw.healthMu.Lock()
+ defer acbw.healthMu.Unlock()
+ acbw.healthData.closeHealthProducer()
+ // listeners should not be registered when the connectivity state
+ // isn't Ready. This may happen when the balancer registers a listener
+ // after the connectivityState is updated, but before it is notified
+ // of the update.
+ if acbw.healthData.connectivityState != connectivity.Ready {
+ return
+ }
+ // Replace the health data to stop sending updates to any previously
+ // registered health listeners.
+ hd := newHealthData(connectivity.Ready)
+ acbw.healthData = hd
+ if listener == nil {
+ return
+ }
+
+ registerFn := acbw.healthListenerRegFn()
+ acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
+ if ctx.Err() != nil || acbw.ccb.balancer == nil {
+ return
+ }
+ // Don't send updates if a new listener is registered.
+ acbw.healthMu.Lock()
+ defer acbw.healthMu.Unlock()
+ if acbw.healthData != hd {
+ return
+ }
+ // Serialize the health updates from the health producer with
+ // other calls into the LB policy.
+ listenerWrapper := func(scs balancer.SubConnState) {
+ acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
+ if ctx.Err() != nil || acbw.ccb.balancer == nil {
+ return
+ }
+ acbw.healthMu.Lock()
+ defer acbw.healthMu.Unlock()
+ if acbw.healthData != hd {
+ return
+ }
+ listener(scs)
+ })
+ }
+
+ hd.closeHealthProducer = registerFn(ctx, listenerWrapper)
+ })
+}