gRPC migration
Change-Id: Ib390f6dde0d5a8d6db12ccd7da41135570ad1354
diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go
index 0044789..45baa2a 100644
--- a/vendor/google.golang.org/grpc/picker_wrapper.go
+++ b/vendor/google.golang.org/grpc/picker_wrapper.go
@@ -20,7 +20,6 @@
import (
"context"
- "fmt"
"io"
"sync"
@@ -32,78 +31,49 @@
"google.golang.org/grpc/status"
)
-// v2PickerWrapper wraps a balancer.Picker while providing the
-// balancer.V2Picker API. It requires a pickerWrapper to generate errors
-// including the latest connectionError. To be deleted when balancer.Picker is
-// updated to the balancer.V2Picker API.
-type v2PickerWrapper struct {
- picker balancer.Picker
- connErr *connErr
-}
-
-func (v *v2PickerWrapper) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
- sc, done, err := v.picker.Pick(info.Ctx, info)
- if err != nil {
- if err == balancer.ErrTransientFailure {
- return balancer.PickResult{}, balancer.TransientFailureError(fmt.Errorf("%v, latest connection error: %v", err, v.connErr.connectionError()))
- }
- return balancer.PickResult{}, err
- }
- return balancer.PickResult{SubConn: sc, Done: done}, nil
-}
-
// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {
mu sync.Mutex
done bool
blockingCh chan struct{}
- picker balancer.V2Picker
+ picker balancer.Picker
- // The latest connection error. TODO: remove when V1 picker is deprecated;
- // balancer should be responsible for providing the error.
- *connErr
-}
-
-type connErr struct {
- mu sync.Mutex
- err error
-}
-
-func (c *connErr) updateConnectionError(err error) {
- c.mu.Lock()
- c.err = err
- c.mu.Unlock()
-}
-
-func (c *connErr) connectionError() error {
- c.mu.Lock()
- err := c.err
- c.mu.Unlock()
- return err
+ // The latest connection happened.
+ connErrMu sync.Mutex
+ connErr error
}
func newPickerWrapper() *pickerWrapper {
- return &pickerWrapper{blockingCh: make(chan struct{}), connErr: &connErr{}}
+ bp := &pickerWrapper{blockingCh: make(chan struct{})}
+ return bp
+}
+
+func (bp *pickerWrapper) updateConnectionError(err error) {
+ bp.connErrMu.Lock()
+ bp.connErr = err
+ bp.connErrMu.Unlock()
+}
+
+func (bp *pickerWrapper) connectionError() error {
+ bp.connErrMu.Lock()
+ err := bp.connErr
+ bp.connErrMu.Unlock()
+ return err
}
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
-func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
- pw.updatePickerV2(&v2PickerWrapper{picker: p, connErr: pw.connErr})
-}
-
-// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
-func (pw *pickerWrapper) updatePickerV2(p balancer.V2Picker) {
- pw.mu.Lock()
- if pw.done {
- pw.mu.Unlock()
+func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
+ bp.mu.Lock()
+ if bp.done {
+ bp.mu.Unlock()
return
}
- pw.picker = p
- // pw.blockingCh should never be nil.
- close(pw.blockingCh)
- pw.blockingCh = make(chan struct{})
- pw.mu.Unlock()
+ bp.picker = p
+ // bp.blockingCh should never be nil.
+ close(bp.blockingCh)
+ bp.blockingCh = make(chan struct{})
+ bp.mu.Unlock()
}
func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
@@ -130,85 +100,83 @@
// - the current picker returns other errors and failfast is false.
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
-func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
+func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
var ch chan struct{}
- var lastPickErr error
for {
- pw.mu.Lock()
- if pw.done {
- pw.mu.Unlock()
+ bp.mu.Lock()
+ if bp.done {
+ bp.mu.Unlock()
return nil, nil, ErrClientConnClosing
}
- if pw.picker == nil {
- ch = pw.blockingCh
+ if bp.picker == nil {
+ ch = bp.blockingCh
}
- if ch == pw.blockingCh {
+ if ch == bp.blockingCh {
// This could happen when either:
- // - pw.picker is nil (the previous if condition), or
+ // - bp.picker is nil (the previous if condition), or
// - has called pick on the current picker.
- pw.mu.Unlock()
+ bp.mu.Unlock()
select {
case <-ctx.Done():
- var errStr string
- if lastPickErr != nil {
- errStr = "latest balancer error: " + lastPickErr.Error()
- } else if connectionErr := pw.connectionError(); connectionErr != nil {
- errStr = "latest connection error: " + connectionErr.Error()
- } else {
- errStr = ctx.Err().Error()
+ if connectionErr := bp.connectionError(); connectionErr != nil {
+ switch ctx.Err() {
+ case context.DeadlineExceeded:
+ return nil, nil, status.Errorf(codes.DeadlineExceeded, "latest connection error: %v", connectionErr)
+ case context.Canceled:
+ return nil, nil, status.Errorf(codes.Canceled, "latest connection error: %v", connectionErr)
+ }
}
- switch ctx.Err() {
- case context.DeadlineExceeded:
- return nil, nil, status.Error(codes.DeadlineExceeded, errStr)
- case context.Canceled:
- return nil, nil, status.Error(codes.Canceled, errStr)
- }
+ return nil, nil, ctx.Err()
case <-ch:
}
continue
}
- ch = pw.blockingCh
- p := pw.picker
- pw.mu.Unlock()
+ ch = bp.blockingCh
+ p := bp.picker
+ bp.mu.Unlock()
- pickResult, err := p.Pick(info)
+ subConn, done, err := p.Pick(ctx, opts)
if err != nil {
- if err == balancer.ErrNoSubConnAvailable {
+ switch err {
+ case balancer.ErrNoSubConnAvailable:
continue
- }
- if tfe, ok := err.(interface{ IsTransientFailure() bool }); ok && tfe.IsTransientFailure() {
+ case balancer.ErrTransientFailure:
if !failfast {
- lastPickErr = err
continue
}
- return nil, nil, status.Error(codes.Unavailable, err.Error())
+ return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
+ case context.DeadlineExceeded:
+ return nil, nil, status.Error(codes.DeadlineExceeded, err.Error())
+ case context.Canceled:
+ return nil, nil, status.Error(codes.Canceled, err.Error())
+ default:
+ if _, ok := status.FromError(err); ok {
+ return nil, nil, err
+ }
+ // err is some other error.
+ return nil, nil, status.Error(codes.Unknown, err.Error())
}
- if _, ok := status.FromError(err); ok {
- return nil, nil, err
- }
- // err is some other error.
- return nil, nil, status.Error(codes.Unknown, err.Error())
}
- acw, ok := pickResult.SubConn.(*acBalancerWrapper)
+ acw, ok := subConn.(*acBalancerWrapper)
if !ok {
grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
continue
}
if t, ok := acw.getAddrConn().getReadyTransport(); ok {
if channelz.IsOn() {
- return t, doneChannelzWrapper(acw, pickResult.Done), nil
+ return t, doneChannelzWrapper(acw, done), nil
}
- return t, pickResult.Done, nil
+ return t, done, nil
}
- if pickResult.Done != nil {
+ if done != nil {
// Calling done with nil error, no bytes sent and no bytes received.
// DoneInfo with default value works.
- pickResult.Done(balancer.DoneInfo{})
+ done(balancer.DoneInfo{})
}
grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
// If ok == false, ac.state is not READY.
@@ -218,12 +186,12 @@
}
}
-func (pw *pickerWrapper) close() {
- pw.mu.Lock()
- defer pw.mu.Unlock()
- if pw.done {
+func (bp *pickerWrapper) close() {
+ bp.mu.Lock()
+ defer bp.mu.Unlock()
+ if bp.done {
return
}
- pw.done = true
- close(pw.blockingCh)
+ bp.done = true
+ close(bp.blockingCh)
}