[VOL-2312] Logging - Integrate voltctl with new etcd-based dynamic loglevel mechanism. Testing is in progress

Change-Id: I2e13bb79008c9a49ebb6f58e575f51efebe6dbfd
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 78e6d17..a7643df 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -38,13 +38,13 @@
 	"google.golang.org/grpc/grpclog"
 	"google.golang.org/grpc/internal/backoff"
 	"google.golang.org/grpc/internal/channelz"
-	"google.golang.org/grpc/internal/envconfig"
 	"google.golang.org/grpc/internal/grpcsync"
 	"google.golang.org/grpc/internal/transport"
 	"google.golang.org/grpc/keepalive"
 	"google.golang.org/grpc/resolver"
 	_ "google.golang.org/grpc/resolver/dns"         // To register dns resolver.
 	_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
+	"google.golang.org/grpc/serviceconfig"
 	"google.golang.org/grpc/status"
 )
 
@@ -532,24 +532,6 @@
 	}
 }
 
-// gRPC should resort to default service config when:
-// * resolver service config is disabled
-// * or, resolver does not return a service config or returns an invalid one.
-func (cc *ClientConn) fallbackToDefaultServiceConfig(sc string) bool {
-	if cc.dopts.disableServiceConfig {
-		return true
-	}
-	// The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type.
-	// Right now, we assume that empty service config string means resolver does not return a config.
-	if sc == "" {
-		return true
-	}
-	// TODO: the logic below is temporary. Once we finish the logic to validate service config
-	// in resolver, we will replace the logic below.
-	_, err := parseServiceConfig(sc)
-	return err != nil
-}
-
 func (cc *ClientConn) updateResolverState(s resolver.State) error {
 	cc.mu.Lock()
 	defer cc.mu.Unlock()
@@ -560,44 +542,37 @@
 		return nil
 	}
 
-	if cc.fallbackToDefaultServiceConfig(s.ServiceConfig) {
+	if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
 		if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
 			cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
 		}
-	} else {
-		// TODO: the parsing logic below will be moved inside resolver.
-		sc, err := parseServiceConfig(s.ServiceConfig)
-		if err != nil {
-			return err
-		}
-		if cc.sc == nil || cc.sc.rawJSONString != s.ServiceConfig {
-			cc.applyServiceConfig(sc)
-		}
+	} else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
+		cc.applyServiceConfig(sc)
 	}
 
-	// update the service config that will be sent to balancer.
-	if cc.sc != nil {
-		s.ServiceConfig = cc.sc.rawJSONString
-	}
-
+	var balCfg serviceconfig.LoadBalancingConfig
 	if cc.dopts.balancerBuilder == nil {
 		// Only look at balancer types and switch balancer if balancer dial
 		// option is not set.
-		var isGRPCLB bool
-		for _, a := range s.Addresses {
-			if a.Type == resolver.GRPCLB {
-				isGRPCLB = true
-				break
-			}
-		}
 		var newBalancerName string
-		// TODO: use new loadBalancerConfig field with appropriate priority.
-		if isGRPCLB {
-			newBalancerName = grpclbName
-		} else if cc.sc != nil && cc.sc.LB != nil {
-			newBalancerName = *cc.sc.LB
+		if cc.sc != nil && cc.sc.lbConfig != nil {
+			newBalancerName = cc.sc.lbConfig.name
+			balCfg = cc.sc.lbConfig.cfg
 		} else {
-			newBalancerName = PickFirstBalancerName
+			var isGRPCLB bool
+			for _, a := range s.Addresses {
+				if a.Type == resolver.GRPCLB {
+					isGRPCLB = true
+					break
+				}
+			}
+			if isGRPCLB {
+				newBalancerName = grpclbName
+			} else if cc.sc != nil && cc.sc.LB != nil {
+				newBalancerName = *cc.sc.LB
+			} else {
+				newBalancerName = PickFirstBalancerName
+			}
 		}
 		cc.switchBalancer(newBalancerName)
 	} else if cc.balancerWrapper == nil {
@@ -607,8 +582,7 @@
 		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
 	}
 
-	cc.balancerWrapper.updateResolverState(s)
-	cc.firstResolveEvent.Fire()
+	cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
 	return nil
 }
 
@@ -621,7 +595,7 @@
 //
 // Caller must hold cc.mu.
 func (cc *ClientConn) switchBalancer(name string) {
-	if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
+	if strings.EqualFold(cc.curBalancerName, name) {
 		return
 	}
 
@@ -760,6 +734,8 @@
 		ac.mu.Unlock()
 		return nil
 	}
+	// Update connectivity state within the lock to prevent subsequent or
+	// concurrent calls from resetting the transport more than once.
 	ac.updateConnectivityState(connectivity.Connecting)
 	ac.mu.Unlock()
 
@@ -770,7 +746,16 @@
 
 // tryUpdateAddrs tries to update ac.addrs with the new addresses list.
 //
-// It checks whether current connected address of ac is in the new addrs list.
+// If ac is Connecting, it returns false. The caller should tear down the ac and
+// create a new one. Note that the backoff will be reset when this happens.
+//
+// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
+// addresses will be picked up by retry in the next iteration after backoff.
+//
+// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
+//
+// If ac is Ready, it checks whether current connected address of ac is in the
+// new addrs list.
 //  - If true, it updates ac.addrs and returns true. The ac will keep using
 //    the existing connection.
 //  - If false, it does nothing and returns false.
@@ -778,17 +763,18 @@
 	ac.mu.Lock()
 	defer ac.mu.Unlock()
 	grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
-	if ac.state == connectivity.Shutdown {
+	if ac.state == connectivity.Shutdown ||
+		ac.state == connectivity.TransientFailure ||
+		ac.state == connectivity.Idle {
 		ac.addrs = addrs
 		return true
 	}
 
-	// Unless we're busy reconnecting already, let's reconnect from the top of
-	// the list.
-	if ac.state != connectivity.Ready {
+	if ac.state == connectivity.Connecting {
 		return false
 	}
 
+	// ac.state is Ready, try to find the connected address.
 	var curAddrFound bool
 	for _, a := range addrs {
 		if reflect.DeepEqual(ac.curAddr, a) {
@@ -1037,6 +1023,9 @@
 		// The spec doesn't mention what should be done for multiple addresses.
 		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
 		connectDeadline := time.Now().Add(dialDuration)
+
+		ac.updateConnectivityState(connectivity.Connecting)
+		ac.transport = nil
 		ac.mu.Unlock()
 
 		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
@@ -1071,55 +1060,32 @@
 
 		ac.mu.Lock()
 		if ac.state == connectivity.Shutdown {
-			newTr.Close()
 			ac.mu.Unlock()
+			newTr.Close()
 			return
 		}
 		ac.curAddr = addr
 		ac.transport = newTr
 		ac.backoffIdx = 0
 
-		healthCheckConfig := ac.cc.healthCheckConfig()
-		// LB channel health checking is only enabled when all the four requirements below are met:
-		// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
-		// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
-		// 3. a service config with non-empty healthCheckConfig field is provided,
-		// 4. the current load balancer allows it.
 		hctx, hcancel := context.WithCancel(ac.ctx)
-		healthcheckManagingState := false
-		if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
-			if ac.cc.dopts.healthCheckFunc == nil {
-				// TODO: add a link to the health check doc in the error message.
-				grpclog.Error("the client side LB channel health check function has not been set.")
-			} else {
-				// TODO(deklerk) refactor to just return transport
-				go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
-				healthcheckManagingState = true
-			}
-		}
-		if !healthcheckManagingState {
-			ac.updateConnectivityState(connectivity.Ready)
-		}
+		ac.startHealthCheck(hctx)
 		ac.mu.Unlock()
 
 		// Block until the created transport is down. And when this happens,
 		// we restart from the top of the addr list.
 		<-reconnect.Done()
 		hcancel()
-
-		// Need to reconnect after a READY, the addrConn enters
-		// TRANSIENT_FAILURE.
+		// restart connecting - the top of the loop will set state to
+		// CONNECTING.  This is against the current connectivity semantics doc,
+		// however it allows for graceful behavior for RPCs not yet dispatched
+		// - unfortunate timing would otherwise lead to the RPC failing even
+		// though the TRANSIENT_FAILURE state (called for by the doc) would be
+		// instantaneous.
 		//
-		// This will set addrConn to TRANSIENT_FAILURE for a very short period
-		// of time, and turns CONNECTING. It seems reasonable to skip this, but
-		// READY-CONNECTING is not a valid transition.
-		ac.mu.Lock()
-		if ac.state == connectivity.Shutdown {
-			ac.mu.Unlock()
-			return
-		}
-		ac.updateConnectivityState(connectivity.TransientFailure)
-		ac.mu.Unlock()
+		// Ideally we should transition to Idle here and block until there is
+		// RPC activity that leads to the balancer requesting a reconnect of
+		// the associated SubConn.
 	}
 }
 
@@ -1133,8 +1099,6 @@
 			ac.mu.Unlock()
 			return nil, resolver.Address{}, nil, errConnClosing
 		}
-		ac.updateConnectivityState(connectivity.Connecting)
-		ac.transport = nil
 
 		ac.cc.mu.RLock()
 		ac.dopts.copts.KeepaliveParams = ac.cc.mkp
@@ -1178,14 +1142,35 @@
 		Authority: ac.cc.authority,
 	}
 
+	once := sync.Once{}
 	onGoAway := func(r transport.GoAwayReason) {
 		ac.mu.Lock()
 		ac.adjustParams(r)
+		once.Do(func() {
+			if ac.state == connectivity.Ready {
+				// Prevent this SubConn from being used for new RPCs by setting its
+				// state to Connecting.
+				//
+				// TODO: this should be Idle when grpc-go properly supports it.
+				ac.updateConnectivityState(connectivity.Connecting)
+			}
+		})
 		ac.mu.Unlock()
 		reconnect.Fire()
 	}
 
 	onClose := func() {
+		ac.mu.Lock()
+		once.Do(func() {
+			if ac.state == connectivity.Ready {
+				// Prevent this SubConn from being used for new RPCs by setting its
+				// state to Connecting.
+				//
+				// TODO: this should be Idle when grpc-go properly supports it.
+				ac.updateConnectivityState(connectivity.Connecting)
+			}
+		})
+		ac.mu.Unlock()
 		close(onCloseCalled)
 		reconnect.Fire()
 	}
@@ -1207,60 +1192,99 @@
 		return nil, nil, err
 	}
 
-	if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
-		select {
-		case <-time.After(connectDeadline.Sub(time.Now())):
-			// We didn't get the preface in time.
-			newTr.Close()
-			grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
-			return nil, nil, errors.New("timed out waiting for server handshake")
-		case <-prefaceReceived:
-			// We got the preface - huzzah! things are good.
-		case <-onCloseCalled:
-			// The transport has already closed - noop.
-			return nil, nil, errors.New("connection closed")
-			// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
-		}
+	select {
+	case <-time.After(connectDeadline.Sub(time.Now())):
+		// We didn't get the preface in time.
+		newTr.Close()
+		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
+		return nil, nil, errors.New("timed out waiting for server handshake")
+	case <-prefaceReceived:
+		// We got the preface - huzzah! things are good.
+	case <-onCloseCalled:
+		// The transport has already closed - noop.
+		return nil, nil, errors.New("connection closed")
+		// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
 	}
 	return newTr, reconnect, nil
 }
 
-func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
-	// Set up the health check helper functions
-	newStream := func() (interface{}, error) {
-		return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
+// startHealthCheck starts the health checking stream (RPC) to watch the health
+// stats of this connection if health checking is requested and configured.
+//
+// LB channel health checking is enabled when all requirements below are met:
+// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
+// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
+// 3. a service config with non-empty healthCheckConfig field is provided
+// 4. the load balancer requests it
+//
+// It sets addrConn to READY if the health checking stream is not started.
+//
+// Caller must hold ac.mu.
+func (ac *addrConn) startHealthCheck(ctx context.Context) {
+	var healthcheckManagingState bool
+	defer func() {
+		if !healthcheckManagingState {
+			ac.updateConnectivityState(connectivity.Ready)
+		}
+	}()
+
+	if ac.cc.dopts.disableHealthCheck {
+		return
 	}
-	firstReady := true
-	reportHealth := func(ok bool) {
+	healthCheckConfig := ac.cc.healthCheckConfig()
+	if healthCheckConfig == nil {
+		return
+	}
+	if !ac.scopts.HealthCheckEnabled {
+		return
+	}
+	healthCheckFunc := ac.cc.dopts.healthCheckFunc
+	if healthCheckFunc == nil {
+		// The health package is not imported to set health check function.
+		//
+		// TODO: add a link to the health check doc in the error message.
+		grpclog.Error("Health check is requested but health check function is not set.")
+		return
+	}
+
+	healthcheckManagingState = true
+
+	// Set up the health check helper functions.
+	currentTr := ac.transport
+	newStream := func(method string) (interface{}, error) {
+		ac.mu.Lock()
+		if ac.transport != currentTr {
+			ac.mu.Unlock()
+			return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
+		}
+		ac.mu.Unlock()
+		return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
+	}
+	setConnectivityState := func(s connectivity.State) {
 		ac.mu.Lock()
 		defer ac.mu.Unlock()
-		if ac.transport != newTr {
+		if ac.transport != currentTr {
 			return
 		}
-		if ok {
-			if firstReady {
-				firstReady = false
-				ac.curAddr = addr
-			}
-			ac.updateConnectivityState(connectivity.Ready)
-		} else {
-			ac.updateConnectivityState(connectivity.TransientFailure)
-		}
+		ac.updateConnectivityState(s)
 	}
-	err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
-	if err != nil {
-		if status.Code(err) == codes.Unimplemented {
-			if channelz.IsOn() {
-				channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
-					Desc:     "Subchannel health check is unimplemented at server side, thus health check is disabled",
-					Severity: channelz.CtError,
-				})
+	// Start the health checking stream.
+	go func() {
+		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
+		if err != nil {
+			if status.Code(err) == codes.Unimplemented {
+				if channelz.IsOn() {
+					channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+						Desc:     "Subchannel health check is unimplemented at server side, thus health check is disabled",
+						Severity: channelz.CtError,
+					})
+				}
+				grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
+			} else {
+				grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
 			}
-			grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
-		} else {
-			grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
 		}
-	}
+	}()
 }
 
 func (ac *addrConn) resetConnectBackoff() {