VOL-1497 : Add more control to kv/memory access
- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way
Amendments:
- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter
Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go
index 1bf46aa..317c2e7 100644
--- a/vendor/google.golang.org/grpc/balancer/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/balancer.go
@@ -28,6 +28,7 @@
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
@@ -47,6 +48,18 @@
m[strings.ToLower(b.Name())] = b
}
+// unregisterForTesting deletes the balancer with the given name from the
+// balancer map.
+//
+// This function is not thread-safe.
+func unregisterForTesting(name string) {
+ delete(m, name)
+}
+
+func init() {
+ internal.BalancerUnregister = unregisterForTesting
+}
+
// Get returns the resolver builder registered with the given name.
// Note that the compare is done in a case-insenstive fashion.
// If no builder is register with the name, nil will be returned.
diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go
index 5f55b27..245785e 100644
--- a/vendor/google.golang.org/grpc/balancer/base/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go
@@ -40,7 +40,7 @@
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
- csEvltr: &connectivityStateEvaluator{},
+ csEvltr: &balancer.ConnectivityStateEvaluator{},
// Initialize picker to a picker that always return
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
// may call UpdateBalancerState with this picker.
@@ -57,7 +57,7 @@
cc balancer.ClientConn
pickerBuilder PickerBuilder
- csEvltr *connectivityStateEvaluator
+ csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
subConns map[resolver.Address]balancer.SubConn
@@ -137,7 +137,7 @@
}
oldAggrState := b.state
- b.state = b.csEvltr.recordTransition(oldS, s)
+ b.state = b.csEvltr.RecordTransition(oldS, s)
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
@@ -169,44 +169,3 @@
func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, p.err
}
-
-// connectivityStateEvaluator gets updated by addrConns when their
-// states transition, based on which it evaluates the state of
-// ClientConn.
-type connectivityStateEvaluator struct {
- numReady uint64 // Number of addrConns in ready state.
- numConnecting uint64 // Number of addrConns in connecting state.
- numTransientFailure uint64 // Number of addrConns in transientFailure.
-}
-
-// recordTransition records state change happening in every subConn and based on
-// that it evaluates what aggregated state should be.
-// It can only transition between Ready, Connecting and TransientFailure. Other states,
-// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
-// before any subConn is created ClientConn is in idle state. In the end when ClientConn
-// closes it is in Shutdown state.
-//
-// recordTransition should only be called synchronously from the same goroutine.
-func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
- // Update counters.
- for idx, state := range []connectivity.State{oldState, newState} {
- updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
- switch state {
- case connectivity.Ready:
- cse.numReady += updateVal
- case connectivity.Connecting:
- cse.numConnecting += updateVal
- case connectivity.TransientFailure:
- cse.numTransientFailure += updateVal
- }
- }
-
- // Evaluate.
- if cse.numReady > 0 {
- return connectivity.Ready
- }
- if cse.numConnecting > 0 {
- return connectivity.Connecting
- }
- return connectivity.TransientFailure
-}
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index 77b6847..7233ade 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -178,6 +178,28 @@
}
func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) {
+ if ccb.cc.curBalancerName != grpclbName {
+ var containsGRPCLB bool
+ for _, a := range addrs {
+ if a.Type == resolver.GRPCLB {
+ containsGRPCLB = true
+ break
+ }
+ }
+ if containsGRPCLB {
+ // The current balancer is not grpclb, but addresses contain grpclb
+ // address. This means we failed to switch to grpclb, most likely
+ // because grpclb is not registered. Filter out all grpclb addresses
+ // from addrs before sending to balancer.
+ tempAddrs := make([]resolver.Address, 0, len(addrs))
+ for _, a := range addrs {
+ if a.Type != resolver.GRPCLB {
+ tempAddrs = append(tempAddrs, a)
+ }
+ }
+ addrs = tempAddrs
+ }
+ }
select {
case <-ccb.resolverUpdateCh:
default:
diff --git a/vendor/google.golang.org/grpc/balancer_v1_wrapper.go b/vendor/google.golang.org/grpc/balancer_v1_wrapper.go
index ca07c15..42b60fe 100644
--- a/vendor/google.golang.org/grpc/balancer_v1_wrapper.go
+++ b/vendor/google.golang.org/grpc/balancer_v1_wrapper.go
@@ -24,11 +24,9 @@
"sync"
"google.golang.org/grpc/balancer"
- "google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
- "google.golang.org/grpc/status"
)
type balancerWrapperBuilder struct {
@@ -315,12 +313,12 @@
Metadata: a.Metadata,
}]
if !ok && failfast {
- return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available")
+ return nil, nil, balancer.ErrTransientFailure
}
if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) {
// If the returned sc is not ready and RPC is failfast,
// return error, and this RPC will fail.
- return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available")
+ return nil, nil, balancer.ErrTransientFailure
}
}
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 84b6dbe..56d0bf7 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -36,7 +36,6 @@
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/envconfig"
@@ -592,13 +591,12 @@
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
ac := &addrConn{
- cc: cc,
- addrs: addrs,
- scopts: opts,
- dopts: cc.dopts,
- czData: new(channelzData),
- successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1
- resetBackoff: make(chan struct{}),
+ cc: cc,
+ addrs: addrs,
+ scopts: opts,
+ dopts: cc.dopts,
+ czData: new(channelzData),
+ resetBackoff: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Track ac in cc. This needs to be done before any getTransport(...) is called.
@@ -680,11 +678,10 @@
return nil
}
ac.updateConnectivityState(connectivity.Connecting)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.mu.Unlock()
// Start a goroutine connecting to the server asynchronously.
- go ac.resetTransport(false)
+ go ac.resetTransport()
return nil
}
@@ -703,6 +700,12 @@
return true
}
+ // Unless we're busy reconnecting already, let's reconnect from the top of
+ // the list.
+ if ac.state != connectivity.Ready {
+ return false
+ }
+
var curAddrFound bool
for _, a := range addrs {
if reflect.DeepEqual(ac.curAddr, a) {
@@ -713,7 +716,6 @@
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
if curAddrFound {
ac.addrs = addrs
- ac.addrIdx = 0 // Start reconnecting from beginning in the new list.
}
return curAddrFound
@@ -913,7 +915,6 @@
transport transport.ClientTransport // The current transport.
mu sync.Mutex
- addrIdx int // The index in addrs list to start reconnecting from.
curAddr resolver.Address // The current address.
addrs []resolver.Address // All addresses that the resolver resolved to.
@@ -922,33 +923,30 @@
tearDownErr error // The reason this addrConn is torn down.
- backoffIdx int
- // backoffDeadline is the time until which resetTransport needs to
- // wait before increasing backoffIdx count.
- backoffDeadline time.Time
- // connectDeadline is the time by which all connection
- // negotiations must complete.
- connectDeadline time.Time
-
+ backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
- channelzID int64 // channelz unique identification number
- czData *channelzData
-
- successfulHandshake bool
-
+ channelzID int64 // channelz unique identification number.
+ czData *channelzData
healthCheckEnabled bool
}
// Note: this requires a lock on ac.mu.
func (ac *addrConn) updateConnectivityState(s connectivity.State) {
+ if ac.state == s {
+ return
+ }
+
+ updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
+ grpclog.Infof(updateMsg)
ac.state = s
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Subchannel Connectivity change to %v", s),
+ Desc: updateMsg,
Severity: channelz.CtINFO,
})
}
+ ac.cc.handleSubConnStateChange(ac.acbw, s)
}
// adjustParams updates parameters used to create transports upon
@@ -965,173 +963,219 @@
}
}
-// resetTransport makes sure that a healthy ac.transport exists.
-//
-// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or
-// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer
-// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to
-// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the
-// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs
-// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received.
-//
-// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received.
-func (ac *addrConn) resetTransport(resolveNow bool) {
- for {
- // If this is the first in a line of resets, we want to resolve immediately. The only other time we
- // want to reset is if we have tried all the addresses handed to us.
- if resolveNow {
- ac.mu.Lock()
+func (ac *addrConn) resetTransport() {
+ for i := 0; ; i++ {
+ tryNextAddrFromStart := grpcsync.NewEvent()
+
+ ac.mu.Lock()
+ if i > 0 {
ac.cc.resolveNow(resolver.ResolveNowOption{})
- ac.mu.Unlock()
}
-
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
-
- // The transport that was used before is no longer viable.
- ac.transport = nil
- // If the connection is READY, a failure must have occurred.
- // Otherwise, we'll consider this is a transient failure when:
- // We've exhausted all addresses
- // We're in CONNECTING
- // And it's not the very first addr to try TODO(deklerk) find a better way to do this than checking ac.successfulHandshake
- if ac.state == connectivity.Ready || (ac.addrIdx == len(ac.addrs)-1 && ac.state == connectivity.Connecting && !ac.successfulHandshake) {
- ac.updateConnectivityState(connectivity.TransientFailure)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
- }
- ac.transport = nil
+ addrs := ac.addrs
+ backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
ac.mu.Unlock()
- if err := ac.nextAddr(); err != nil {
- return
- }
+ addrLoop:
+ for _, addr := range addrs {
+ ac.mu.Lock()
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
-
- backoffIdx := ac.backoffIdx
- backoffFor := ac.dopts.bs.Backoff(backoffIdx)
-
- // This will be the duration that dial gets to finish.
- dialDuration := getMinConnectTimeout()
- if backoffFor > dialDuration {
- // Give dial more time as we keep failing to connect.
- dialDuration = backoffFor
- }
- start := time.Now()
- connectDeadline := start.Add(dialDuration)
- ac.backoffDeadline = start.Add(backoffFor)
- ac.connectDeadline = connectDeadline
-
- ac.mu.Unlock()
-
- ac.cc.mu.RLock()
- ac.dopts.copts.KeepaliveParams = ac.cc.mkp
- ac.cc.mu.RUnlock()
-
- ac.mu.Lock()
-
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
-
- if ac.state != connectivity.Connecting {
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
ac.updateConnectivityState(connectivity.Connecting)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ ac.transport = nil
+ ac.mu.Unlock()
+
+ // This will be the duration that dial gets to finish.
+ dialDuration := getMinConnectTimeout()
+ if dialDuration < backoffFor {
+ // Give dial more time as we keep failing to connect.
+ dialDuration = backoffFor
+ }
+ connectDeadline := time.Now().Add(dialDuration)
+
+ ac.mu.Lock()
+ ac.cc.mu.RLock()
+ ac.dopts.copts.KeepaliveParams = ac.cc.mkp
+ ac.cc.mu.RUnlock()
+
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
+
+ copts := ac.dopts.copts
+ if ac.scopts.CredsBundle != nil {
+ copts.CredsBundle = ac.scopts.CredsBundle
+ }
+ hctx, hcancel := context.WithCancel(ac.ctx)
+ defer hcancel()
+ ac.mu.Unlock()
+
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
+ Severity: channelz.CtINFO,
+ })
+ }
+
+ reconnect := grpcsync.NewEvent()
+ prefaceReceived := make(chan struct{})
+ newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
+ if err == nil {
+ ac.mu.Lock()
+ ac.curAddr = addr
+ ac.transport = newTr
+ ac.mu.Unlock()
+
+ healthCheckConfig := ac.cc.healthCheckConfig()
+ // LB channel health checking is only enabled when all the four requirements below are met:
+ // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
+ // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
+ // 3. a service config with non-empty healthCheckConfig field is provided,
+ // 4. the current load balancer allows it.
+ healthcheckManagingState := false
+ if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
+ if ac.cc.dopts.healthCheckFunc == nil {
+ // TODO: add a link to the health check doc in the error message.
+ grpclog.Error("the client side LB channel health check function has not been set.")
+ } else {
+ // TODO(deklerk) refactor to just return transport
+ go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
+ healthcheckManagingState = true
+ }
+ }
+ if !healthcheckManagingState {
+ ac.mu.Lock()
+ ac.updateConnectivityState(connectivity.Ready)
+ ac.mu.Unlock()
+ }
+ } else {
+ hcancel()
+ if err == errConnClosing {
+ return
+ }
+
+ if tryNextAddrFromStart.HasFired() {
+ break addrLoop
+ }
+ continue
+ }
+
+ ac.mu.Lock()
+ reqHandshake := ac.dopts.reqHandshake
+ ac.mu.Unlock()
+
+ <-reconnect.Done()
+ hcancel()
+
+ if reqHandshake == envconfig.RequireHandshakeHybrid {
+ // In RequireHandshakeHybrid mode, we must check to see whether
+ // server preface has arrived yet to decide whether to start
+ // reconnecting at the top of the list (server preface received)
+ // or continue with the next addr in the list as if the
+ // connection were not successful (server preface not received).
+ select {
+ case <-prefaceReceived:
+ // We received a server preface - huzzah! We consider this
+ // a success and restart from the top of the addr list.
+ ac.mu.Lock()
+ ac.backoffIdx = 0
+ ac.mu.Unlock()
+ break addrLoop
+ default:
+ // Despite having set state to READY, in hybrid mode we
+ // consider this a failure and continue connecting at the
+ // next addr in the list.
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
+
+ ac.updateConnectivityState(connectivity.TransientFailure)
+ ac.mu.Unlock()
+
+ if tryNextAddrFromStart.HasFired() {
+ break addrLoop
+ }
+ }
+ } else {
+ // In RequireHandshakeOn mode, we would have already waited for
+ // the server preface, so we consider this a success and restart
+ // from the top of the addr list. In RequireHandshakeOff mode,
+ // we don't care to wait for the server preface before
+ // considering this a success, so we also restart from the top
+ // of the addr list.
+ ac.mu.Lock()
+ ac.backoffIdx = 0
+ ac.mu.Unlock()
+ break addrLoop
+ }
}
- addr := ac.addrs[ac.addrIdx]
- copts := ac.dopts.copts
- if ac.scopts.CredsBundle != nil {
- copts.CredsBundle = ac.scopts.CredsBundle
+ // After exhausting all addresses, or after need to reconnect after a
+ // READY, the addrConn enters TRANSIENT_FAILURE.
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
}
+ ac.updateConnectivityState(connectivity.TransientFailure)
+
+ // Backoff.
+ b := ac.resetBackoff
+ timer := time.NewTimer(backoffFor)
+ acctx := ac.ctx
ac.mu.Unlock()
- if channelz.IsOn() {
- channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
- Severity: channelz.CtINFO,
- })
+ select {
+ case <-timer.C:
+ ac.mu.Lock()
+ ac.backoffIdx++
+ ac.mu.Unlock()
+ case <-b:
+ timer.Stop()
+ case <-acctx.Done():
+ timer.Stop()
+ return
}
-
- if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil {
- continue
- }
-
- return
}
}
-// createTransport creates a connection to one of the backends in addrs.
-func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
- oneReset := sync.Once{}
- skipReset := make(chan struct{})
- allowedToReset := make(chan struct{})
- prefaceReceived := make(chan struct{})
+// createTransport creates a connection to one of the backends in addrs. It
+// sets ac.transport in the success case, or it returns an error if it was
+// unable to successfully create a transport.
+//
+// If waitForHandshake is enabled, it blocks until server preface arrives.
+func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
onCloseCalled := make(chan struct{})
- var prefaceMu sync.Mutex
- var serverPrefaceReceived bool
- var clientPrefaceWrote bool
-
- hcCtx, hcCancel := context.WithCancel(ac.ctx)
-
- onGoAway := func(r transport.GoAwayReason) {
- hcCancel()
- ac.mu.Lock()
- ac.adjustParams(r)
- ac.mu.Unlock()
- select {
- case <-skipReset: // The outer resetTransport loop will handle reconnection.
- return
- case <-allowedToReset: // We're in the clear to reset.
- go oneReset.Do(func() { ac.resetTransport(false) })
- }
- }
-
- prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
-
- onClose := func() {
- hcCancel()
- close(onCloseCalled)
- prefaceTimer.Stop()
-
- select {
- case <-skipReset: // The outer resetTransport loop will handle reconnection.
- return
- case <-allowedToReset: // We're in the clear to reset.
- oneReset.Do(func() { ac.resetTransport(false) })
- }
- }
-
target := transport.TargetInfo{
Addr: addr.Addr,
Metadata: addr.Metadata,
Authority: ac.cc.authority,
}
+ prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
+
+ onGoAway := func(r transport.GoAwayReason) {
+ ac.mu.Lock()
+ ac.adjustParams(r)
+ ac.mu.Unlock()
+ reconnect.Fire()
+ }
+
+ onClose := func() {
+ close(onCloseCalled)
+ prefaceTimer.Stop()
+ reconnect.Fire()
+ }
+
onPrefaceReceipt := func() {
close(prefaceReceived)
prefaceTimer.Stop()
-
- // TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
- ac.mu.Lock()
-
- prefaceMu.Lock()
- serverPrefaceReceived = true
- if clientPrefaceWrote {
- ac.successfulHandshake = true
- }
- prefaceMu.Unlock()
-
- ac.mu.Unlock()
}
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
@@ -1143,13 +1187,6 @@
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
if err == nil {
- prefaceMu.Lock()
- clientPrefaceWrote = true
- if serverPrefaceReceived || ac.dopts.reqHandshake == envconfig.RequireHandshakeOff {
- ac.successfulHandshake = true
- }
- prefaceMu.Unlock()
-
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-prefaceTimer.C:
@@ -1160,8 +1197,7 @@
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
- close(allowedToReset)
- return nil
+ return nil, errors.New("connection closed")
}
} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
go func() {
@@ -1186,70 +1222,32 @@
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
- // We don't want to reset during this close because we prefer to kick out of this function and let the loop
- // in resetTransport take care of reconnecting.
- close(skipReset)
-
- return errConnClosing
+ return nil, errConnClosing
}
ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
-
- // We don't want to reset during this close because we prefer to kick out of this function and let the loop
- // in resetTransport take care of reconnecting.
- close(skipReset)
-
- return err
+ return nil, err
}
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
- close(skipReset)
newTr.Close()
- return nil
+ return nil, errConnClosing
}
- ac.transport = newTr
ac.mu.Unlock()
- healthCheckConfig := ac.cc.healthCheckConfig()
- // LB channel health checking is only enabled when all the four requirements below are met:
- // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
- // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
- // 3. a service config with non-empty healthCheckConfig field is provided,
- // 4. the current load balancer allows it.
- if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
- if internal.HealthCheckFunc != nil {
- go ac.startHealthCheck(hcCtx, newTr, addr, healthCheckConfig.ServiceName)
- close(allowedToReset)
- return nil
- }
- // TODO: add a link to the health check doc in the error message.
- grpclog.Error("the client side LB channel health check function has not been set.")
- }
-
- // No LB channel health check case
+ // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
-
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
-
- // unblock onGoAway/onClose callback.
- close(skipReset)
- return errConnClosing
+ newTr.Close()
+ return nil, errConnClosing
}
-
- ac.updateConnectivityState(connectivity.Ready)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
- ac.curAddr = addr
-
ac.mu.Unlock()
- // Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader
- // goroutine failing races with all the code in this method that sets the connection to "ready".
- close(allowedToReset)
- return nil
+ return newTr, nil
}
func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
@@ -1269,19 +1267,12 @@
firstReady = false
ac.curAddr = addr
}
- if ac.state != connectivity.Ready {
- ac.updateConnectivityState(connectivity.Ready)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
- }
+ ac.updateConnectivityState(connectivity.Ready)
} else {
- if ac.state != connectivity.TransientFailure {
- ac.updateConnectivityState(connectivity.TransientFailure)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
- }
+ ac.updateConnectivityState(connectivity.TransientFailure)
}
}
-
- err := internal.HealthCheckFunc(ctx, newStream, reportHealth, serviceName)
+ err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
if channelz.IsOn() {
@@ -1297,55 +1288,6 @@
}
}
-// nextAddr increments the addrIdx if there are more addresses to try. If
-// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
-// increment the backoffIdx.
-//
-// nextAddr must be called without ac.mu being held.
-func (ac *addrConn) nextAddr() error {
- ac.mu.Lock()
-
- // If a handshake has been observed, we want the next usage to start at
- // index 0 immediately.
- if ac.successfulHandshake {
- ac.successfulHandshake = false
- ac.backoffDeadline = time.Time{}
- ac.connectDeadline = time.Time{}
- ac.addrIdx = 0
- ac.backoffIdx = 0
- ac.mu.Unlock()
- return nil
- }
-
- if ac.addrIdx < len(ac.addrs)-1 {
- ac.addrIdx++
- ac.mu.Unlock()
- return nil
- }
-
- ac.addrIdx = 0
- ac.backoffIdx++
-
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return errConnClosing
- }
- ac.cc.resolveNow(resolver.ResolveNowOption{})
- backoffDeadline := ac.backoffDeadline
- b := ac.resetBackoff
- ac.mu.Unlock()
- timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
- select {
- case <-timer.C:
- case <-b:
- timer.Stop()
- case <-ac.ctx.Done():
- timer.Stop()
- return ac.ctx.Err()
- }
- return nil
-}
-
func (ac *addrConn) resetConnectBackoff() {
ac.mu.Lock()
close(ac.resetBackoff)
@@ -1394,7 +1336,6 @@
ac.updateConnectivityState(connectivity.Shutdown)
ac.cancel()
ac.tearDownErr = err
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.curAddr = resolver.Address{}
if err == errConnDrain && curTr != nil {
// GracefulClose(...) may be executed multiple times when
diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go
index fe00a25..f286462 100644
--- a/vendor/google.golang.org/grpc/dialoptions.go
+++ b/vendor/google.golang.org/grpc/dialoptions.go
@@ -60,6 +60,7 @@
disableServiceConfig bool
disableRetry bool
disableHealthCheck bool
+ healthCheckFunc internal.HealthChecker
}
// DialOption configures how we set up the connection.
@@ -94,10 +95,8 @@
// WithWaitForHandshake blocks until the initial settings frame is received from
// the server before assigning RPCs to the connection.
//
-// Deprecated: this will become the default behavior in the 1.17 release, and
-// will be removed after the 1.18 release. To override the default behavior in
-// the 1.17 release, either use this dial option or set the environment
-// variable GRPC_GO_READY_BEFORE_HANDSHAKE=on.
+// Deprecated: this is the default behavior, and this option will be removed
+// after the 1.18 release.
func WithWaitForHandshake() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.reqHandshake = envconfig.RequireHandshakeOn
@@ -338,6 +337,7 @@
func init() {
internal.WithContextDialer = withContextDialer
internal.WithResolverBuilder = withResolverBuilder
+ internal.WithHealthCheckFunc = withHealthCheckFunc
}
// WithDialer returns a DialOption that specifies a function to use for dialing
@@ -468,10 +468,22 @@
o.disableHealthCheck = true
})
}
+
+// withHealthCheckFunc replaces the default health check function with the provided one. It makes
+// tests easier to change the health check function.
+//
+// For testing purpose only.
+func withHealthCheckFunc(f internal.HealthChecker) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.healthCheckFunc = f
+ })
+}
+
func defaultDialOptions() dialOptions {
return dialOptions{
- disableRetry: !envconfig.Retry,
- reqHandshake: envconfig.RequireHandshake,
+ disableRetry: !envconfig.Retry,
+ reqHandshake: envconfig.RequireHandshake,
+ healthCheckFunc: internal.HealthCheckFunc,
copts: transport.ConnectOptions{
WriteBufferSize: defaultWriteBufSize,
ReadBufferSize: defaultReadBufSize,
diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
index 3021a31..041520d 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
@@ -40,7 +40,7 @@
db dbWrapper
idGen idGenerator
// EntryPerPage defines the number of channelz entries to be shown on a web page.
- EntryPerPage = 50
+ EntryPerPage = int64(50)
curState int32
maxTraceEntry = defaultMaxTraceEntry
)
@@ -113,20 +113,20 @@
// boolean indicating whether there's more top channels to be queried for.
//
// The arg id specifies that only top channel with id at or above it will be included
-// in the result. The returned slice is up to a length of EntryPerPage, and is
-// sorted in ascending id order.
-func GetTopChannels(id int64) ([]*ChannelMetric, bool) {
- return db.get().GetTopChannels(id)
+// in the result. The returned slice is up to a length of the arg maxResults or
+// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
+func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
+ return db.get().GetTopChannels(id, maxResults)
}
// GetServers returns a slice of server's ServerMetric, along with a
// boolean indicating whether there's more servers to be queried for.
//
// The arg id specifies that only server with id at or above it will be included
-// in the result. The returned slice is up to a length of EntryPerPage, and is
-// sorted in ascending id order.
-func GetServers(id int64) ([]*ServerMetric, bool) {
- return db.get().GetServers(id)
+// in the result. The returned slice is up to a length of the arg maxResults or
+// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
+func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
+ return db.get().GetServers(id, maxResults)
}
// GetServerSockets returns a slice of server's (identified by id) normal socket's
@@ -134,10 +134,10 @@
// be queried for.
//
// The arg startID specifies that only sockets with id at or above it will be
-// included in the result. The returned slice is up to a length of EntryPerPage,
-// and is sorted in ascending id order.
-func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
- return db.get().GetServerSockets(id, startID)
+// included in the result. The returned slice is up to a length of the arg maxResults
+// or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
+func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
+ return db.get().GetServerSockets(id, startID, maxResults)
}
// GetChannel returns the ChannelMetric for the channel (identified by id).
@@ -155,6 +155,11 @@
return db.get().GetSocket(id)
}
+// GetServer returns the ServerMetric for the server (identified by id).
+func GetServer(id int64) *ServerMetric {
+ return db.get().GetServer(id)
+}
+
// RegisterChannel registers the given channel c in channelz database with ref
// as its reference name, and add it to the child list of its parent (identified
// by pid). pid = 0 means no parent. It returns the unique channelz tracking id
@@ -447,29 +452,32 @@
return n
}
-func min(a, b int) int {
+func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
-func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
+func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
+ if maxResults <= 0 {
+ maxResults = EntryPerPage
+ }
c.mu.RLock()
- l := len(c.topLevelChannels)
+ l := int64(len(c.topLevelChannels))
ids := make([]int64, 0, l)
- cns := make([]*channel, 0, min(l, EntryPerPage))
+ cns := make([]*channel, 0, min(l, maxResults))
for k := range c.topLevelChannels {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
- count := 0
+ count := int64(0)
var end bool
var t []*ChannelMetric
for i, v := range ids[idx:] {
- if count == EntryPerPage {
+ if count == maxResults {
break
}
if cn, ok := c.channels[v]; ok {
@@ -499,21 +507,24 @@
return t, end
}
-func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
+func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
+ if maxResults <= 0 {
+ maxResults = EntryPerPage
+ }
c.mu.RLock()
- l := len(c.servers)
+ l := int64(len(c.servers))
ids := make([]int64, 0, l)
- ss := make([]*server, 0, min(l, EntryPerPage))
+ ss := make([]*server, 0, min(l, maxResults))
for k := range c.servers {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
- count := 0
+ count := int64(0)
var end bool
var s []*ServerMetric
for i, v := range ids[idx:] {
- if count == EntryPerPage {
+ if count == maxResults {
break
}
if svr, ok := c.servers[v]; ok {
@@ -541,7 +552,10 @@
return s, end
}
-func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
+func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
+ if maxResults <= 0 {
+ maxResults = EntryPerPage
+ }
var svr *server
var ok bool
c.mu.RLock()
@@ -551,18 +565,18 @@
return nil, true
}
svrskts := svr.sockets
- l := len(svrskts)
+ l := int64(len(svrskts))
ids := make([]int64, 0, l)
- sks := make([]*normalSocket, 0, min(l, EntryPerPage))
+ sks := make([]*normalSocket, 0, min(l, maxResults))
for k := range svrskts {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
- count := 0
+ count := int64(0)
var end bool
for i, v := range ids[idx:] {
- if count == EntryPerPage {
+ if count == maxResults {
break
}
if ns, ok := c.normalSockets[v]; ok {
@@ -655,6 +669,23 @@
return nil
}
+func (c *channelMap) GetServer(id int64) *ServerMetric {
+ sm := &ServerMetric{}
+ var svr *server
+ var ok bool
+ c.mu.RLock()
+ if svr, ok = c.servers[id]; !ok {
+ c.mu.RUnlock()
+ return nil
+ }
+ sm.ListenSockets = copyMap(svr.listenSockets)
+ c.mu.RUnlock()
+ sm.ID = svr.id
+ sm.RefName = svr.refName
+ sm.ServerData = svr.s.ChannelzMetric()
+ return sm
+}
+
type idGenerator struct {
id int64
}
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index a3e02b6..d2193b3 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -34,7 +34,7 @@
type RequireHandshakeSetting int
const (
- // RequireHandshakeHybrid (default, deprecated) indicates to wait for
+ // RequireHandshakeHybrid (default, deprecated) indicates to not wait for
// handshake before considering a connection ready, but wait before
// considering successful.
RequireHandshakeHybrid RequireHandshakeSetting = iota
@@ -59,6 +59,7 @@
func init() {
switch strings.ToLower(os.Getenv(requireHandshakeStr)) {
case "on":
+ default:
RequireHandshake = RequireHandshakeOn
case "off":
RequireHandshake = RequireHandshakeOff
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index f8932b1..eaa54d4 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -23,14 +23,21 @@
import "context"
var (
- // WithContextDialer is exported by clientconn.go
+ // WithContextDialer is exported by dialoptions.go
WithContextDialer interface{} // func(context.Context, string) (net.Conn, error) grpc.DialOption
- // WithResolverBuilder is exported by clientconn.go
+ // WithResolverBuilder is exported by dialoptions.go
WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption
+ // WithHealthCheckFunc is not exported by dialoptions.go
+ WithHealthCheckFunc interface{} // func (HealthChecker) DialOption
// HealthCheckFunc is used to provide client-side LB channel health checking
- HealthCheckFunc func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
+ HealthCheckFunc HealthChecker
+ // BalancerUnregister is exported by package balancer to unregister a balancer.
+ BalancerUnregister func(name string)
)
+// HealthChecker defines the signature of the client-side LB channel health checking function.
+type HealthChecker func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
+
const (
// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode.
CredsBundleModeFallback = "fallback"
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 39208b1..babcaee 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -91,10 +91,10 @@
maxSendHeaderListSize *uint32
bdpEst *bdpEstimator
- // onSuccess is a callback that client transport calls upon
+ // onPrefaceReceipt is a callback that client transport calls upon
// receiving server preface to signal that a succefull HTTP2
// connection was established.
- onSuccess func()
+ onPrefaceReceipt func()
maxConcurrentStreams uint32
streamQuota int64
@@ -145,7 +145,7 @@
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
-func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
+func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
@@ -240,7 +240,7 @@
kp: kp,
statsHandler: opts.StatsHandler,
initialWindowSize: initialWindowSize,
- onSuccess: onSuccess,
+ onPrefaceReceipt: onPrefaceReceipt,
nextID: 1,
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
@@ -362,6 +362,9 @@
ctx: s.ctx,
ctxDone: s.ctx.Done(),
recv: s.buf,
+ closeStream: func(err error) {
+ t.CloseStream(s, err)
+ },
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@@ -780,7 +783,7 @@
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
- go t.onClose()
+ t.onClose()
return err
}
@@ -1210,7 +1213,7 @@
t.Close() // this kicks off resetTransport, so must be last before return
return
}
- t.onSuccess()
+ t.onPrefaceReceipt()
t.handleSettings(sf, true)
// loop to keep reading incoming messages on this transport.
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 4d7e890..2580aa7 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -110,15 +110,15 @@
return b.c
}
-//
// recvBufferReader implements io.Reader interface to read the data from
// recvBuffer.
type recvBufferReader struct {
- ctx context.Context
- ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
- recv *recvBuffer
- last []byte // Stores the remaining data in the previous calls.
- err error
+ closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
+ ctx context.Context
+ ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
+ recv *recvBuffer
+ last []byte // Stores the remaining data in the previous calls.
+ err error
}
// Read reads the next len(p) bytes from last. If last is drained, it tries to
@@ -128,31 +128,53 @@
if r.err != nil {
return 0, r.err
}
- n, r.err = r.read(p)
- return n, r.err
-}
-
-func (r *recvBufferReader) read(p []byte) (n int, err error) {
if r.last != nil && len(r.last) > 0 {
// Read remaining data left in last call.
copied := copy(p, r.last)
r.last = r.last[copied:]
return copied, nil
}
+ if r.closeStream != nil {
+ n, r.err = r.readClient(p)
+ } else {
+ n, r.err = r.read(p)
+ }
+ return n, r.err
+}
+
+func (r *recvBufferReader) read(p []byte) (n int, err error) {
select {
case <-r.ctxDone:
return 0, ContextErr(r.ctx.Err())
case m := <-r.recv.get():
- r.recv.load()
- if m.err != nil {
- return 0, m.err
- }
- copied := copy(p, m.data)
- r.last = m.data[copied:]
- return copied, nil
+ return r.readAdditional(m, p)
}
}
+func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
+ // If the context is canceled, then closes the stream with nil metadata.
+ // closeStream writes its error parameter to r.recv as a recvMsg.
+ // r.readAdditional acts on that message and returns the necessary error.
+ select {
+ case <-r.ctxDone:
+ r.closeStream(ContextErr(r.ctx.Err()))
+ m := <-r.recv.get()
+ return r.readAdditional(m, p)
+ case m := <-r.recv.get():
+ return r.readAdditional(m, p)
+ }
+}
+
+func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
+ r.recv.load()
+ if m.err != nil {
+ return 0, m.err
+ }
+ copied := copy(p, m.data)
+ r.last = m.data[copied:]
+ return copied, nil
+}
+
type streamState uint32
const (
@@ -511,8 +533,8 @@
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
-func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
- return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess, onGoAway, onClose)
+func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
+ return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose)
}
// Options provides additional hints and information for message
diff --git a/vendor/google.golang.org/grpc/keepalive/keepalive.go b/vendor/google.golang.org/grpc/keepalive/keepalive.go
index 78eea1f..899e72d 100644
--- a/vendor/google.golang.org/grpc/keepalive/keepalive.go
+++ b/vendor/google.golang.org/grpc/keepalive/keepalive.go
@@ -57,7 +57,7 @@
// random jitter of +/-10% will be added to MaxConnectionAge to spread out
// connection storms.
MaxConnectionAge time.Duration // The current default value is infinity.
- // MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after
+ // MaxConnectionAgeGrace is an additive period after MaxConnectionAge after
// which the connection will be forcibly closed.
MaxConnectionAgeGrace time.Duration // The current default value is infinity.
// After a duration of this time if the server doesn't see any activity it
diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
index 9d76025..50991ea 100644
--- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
+++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
@@ -39,7 +39,7 @@
}
// split2 returns the values from strings.SplitN(s, sep, 2).
-// If sep is not found, it returns ("", s, false) instead.
+// If sep is not found, it returns ("", "", false) instead.
func split2(s, sep string) (string, string, bool) {
spl := strings.SplitN(s, sep, 2)
if len(spl) < 2 {
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index 86f00e5..8d0d3dc 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -253,8 +253,8 @@
}
}
-// FailFast configures the action to take when an RPC is attempted on broken
-// connections or unreachable servers. If failFast is true, the RPC will fail
+// WaitForReady configures the action to take when an RPC is attempted on broken
+// connections or unreachable servers. If waitForReady is false, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
// connection is available (or the call is canceled or times out) and will
// retry the call if it fails due to a transient error. gRPC will not retry if
@@ -262,7 +262,14 @@
// the data. Please refer to
// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md.
//
-// By default, RPCs are "Fail Fast".
+// By default, RPCs don't "wait for ready".
+func WaitForReady(waitForReady bool) CallOption {
+ return FailFastCallOption{FailFast: !waitForReady}
+}
+
+// FailFast is the opposite of WaitForReady.
+//
+// Deprecated: use WaitForReady.
func FailFast(failFast bool) CallOption {
return FailFastCallOption{FailFast: failFast}
}
@@ -678,23 +685,17 @@
// Code returns the error code for err if it was produced by the rpc system.
// Otherwise, it returns codes.Unknown.
//
-// Deprecated: use status.FromError and Code method instead.
+// Deprecated: use status.Code instead.
func Code(err error) codes.Code {
- if s, ok := status.FromError(err); ok {
- return s.Code()
- }
- return codes.Unknown
+ return status.Code(err)
}
// ErrorDesc returns the error description of err if it was produced by the rpc system.
// Otherwise, it returns err.Error() or empty string when err is nil.
//
-// Deprecated: use status.FromError and Message method instead.
+// Deprecated: use status.Convert and Message method instead.
func ErrorDesc(err error) string {
- if s, ok := status.FromError(err); ok {
- return s.Message()
- }
- return err.Error()
+ return status.Convert(err).Message()
}
// Errorf returns an error containing an error code and a description;
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 0c266d6..d06279a 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -462,10 +462,7 @@
pushback := 0
hasPushback := false
if cs.attempt.s != nil {
- if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil {
- // Context error; stop now.
- return toErr
- } else if !to {
+ if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to {
return err
}
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index 260f27c..45eace5 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
-const Version = "1.17.0"
+const Version = "1.18.0"
diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh
index 94d3d54..94a5064 100755
--- a/vendor/google.golang.org/grpc/vet.sh
+++ b/vendor/google.golang.org/grpc/vet.sh
@@ -76,6 +76,10 @@
# - Ensure all source files contain a copyright message.
git ls-files "*.go" | xargs grep -L "\(Copyright [0-9]\{4,\} gRPC authors\)\|DO NOT EDIT" 2>&1 | fail_on_output
+# - Make sure all tests in grpc and grpc/test use leakcheck via Teardown.
+(! grep 'func Test[^(]' *_test.go)
+(! grep 'func Test[^(]' test/*.go)
+
# - Do not import math/rand for real library code. Use internal/grpcrand for
# thread safety.
git ls-files "*.go" | xargs grep -l '"math/rand"' 2>&1 | (! grep -v '^examples\|^stress\|grpcrand')
@@ -120,7 +124,7 @@
### END HACK HACK HACK
# TODO(menghanl): fix errors in transport_test.
-staticcheck -ignore '
+staticcheck -go 1.9 -ignore '
balancer.go:SA1019
balancer_test.go:SA1019
clientconn_test.go:SA1019
@@ -132,5 +136,6 @@
stats/stats_test.go:SA1019
test/channelz_test.go:SA1019
test/end2end_test.go:SA1019
+test/healthcheck_test.go:SA1019
' ./...
misspell -error .