[VOL-5486] Fix deprecated versions
Change-Id: If0b888d6c2f33b2f415c8b03b08dc994bb3df3f4
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go
index 917c242..b126401 100644
--- a/vendor/google.golang.org/grpc/balancer/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/balancer.go
@@ -27,8 +27,11 @@
"net"
"strings"
+ "google.golang.org/grpc/channelz"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
+ estats "google.golang.org/grpc/experimental/stats"
+ "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
@@ -38,6 +41,8 @@
var (
// m is a map from name to balancer builder.
m = make(map[string]Builder)
+
+ logger = grpclog.Component("balancer")
)
// Register registers the balancer builder to the balancer map. b.Name
@@ -50,7 +55,14 @@
// an init() function), and is not thread-safe. If multiple Balancers are
// registered with the same name, the one registered last will take effect.
func Register(b Builder) {
- m[strings.ToLower(b.Name())] = b
+ name := strings.ToLower(b.Name())
+ if name != b.Name() {
+ // TODO: Skip the use of strings.ToLower() to index the map after v1.59
+ // is released to switch to case sensitive balancer registry. Also,
+ // remove this warning and update the docstrings for Register and Get.
+ logger.Warningf("Balancer registered with name %q. grpc-go will be switching to case sensitive balancer registries soon", b.Name())
+ }
+ m[name] = b
}
// unregisterForTesting deletes the balancer with the given name from the
@@ -63,58 +75,52 @@
func init() {
internal.BalancerUnregister = unregisterForTesting
+ internal.ConnectedAddress = connectedAddress
+ internal.SetConnectedAddress = setConnectedAddress
}
// Get returns the resolver builder registered with the given name.
// Note that the compare is done in a case-insensitive fashion.
// If no builder is register with the name, nil will be returned.
func Get(name string) Builder {
+ if strings.ToLower(name) != name {
+ // TODO: Skip the use of strings.ToLower() to index the map after v1.59
+ // is released to switch to case sensitive balancer registry. Also,
+ // remove this warning and update the docstrings for Register and Get.
+ logger.Warningf("Balancer retrieved for name %q. grpc-go will be switching to case sensitive balancer registries soon", name)
+ }
if b, ok := m[strings.ToLower(name)]; ok {
return b
}
return nil
}
-// SubConn represents a gRPC sub connection.
-// Each sub connection contains a list of addresses. gRPC will
-// try to connect to them (in sequence), and stop trying the
-// remainder once one connection is successful.
-//
-// The reconnect backoff will be applied on the list, not a single address.
-// For example, try_on_all_addresses -> backoff -> try_on_all_addresses.
-//
-// All SubConns start in IDLE, and will not try to connect. To trigger
-// the connecting, Balancers must call Connect.
-// When the connection encounters an error, it will reconnect immediately.
-// When the connection becomes IDLE, it will not reconnect unless Connect is
-// called.
-//
-// This interface is to be implemented by gRPC. Users should not need a
-// brand new implementation of this interface. For the situations like
-// testing, the new implementation should embed this interface. This allows
-// gRPC to add new methods to this interface.
-type SubConn interface {
- // UpdateAddresses updates the addresses used in this SubConn.
- // gRPC checks if currently-connected address is still in the new list.
- // If it's in the list, the connection will be kept.
- // If it's not in the list, the connection will gracefully closed, and
- // a new connection will be created.
- //
- // This will trigger a state transition for the SubConn.
- UpdateAddresses([]resolver.Address)
- // Connect starts the connecting for this SubConn.
- Connect()
-}
-
// NewSubConnOptions contains options to create new SubConn.
type NewSubConnOptions struct {
// CredsBundle is the credentials bundle that will be used in the created
// SubConn. If it's nil, the original creds from grpc DialOptions will be
// used.
+ //
+ // Deprecated: Use the Attributes field in resolver.Address to pass
+ // arbitrary data to the credential handshaker.
CredsBundle credentials.Bundle
// HealthCheckEnabled indicates whether health check service should be
// enabled on this SubConn
HealthCheckEnabled bool
+ // StateListener is called when the state of the subconn changes. If nil,
+ // Balancer.UpdateSubConnState will be called instead. Will never be
+ // invoked until after Connect() is called on the SubConn created with
+ // these options.
+ StateListener func(SubConnState)
+}
+
+// State contains the balancer's state relevant to the gRPC ClientConn.
+type State struct {
+ // State contains the connectivity state of the balancer, which is used to
+ // determine the state of the ClientConn.
+ ConnectivityState connectivity.State
+ // Picker is used to choose connections (SubConns) for RPCs.
+ Picker Picker
}
// ClientConn represents a gRPC ClientConn.
@@ -123,48 +129,92 @@
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
+//
+// NOTICE: This interface is intended to be implemented by gRPC, or intercepted
+// by custom load balancing polices. Users should not need their own complete
+// implementation of this interface -- they should always delegate to a
+// ClientConn passed to Builder.Build() by embedding it in their
+// implementations. An embedded ClientConn must never be nil, or runtime panics
+// will occur.
type ClientConn interface {
// NewSubConn is called by balancer to create a new SubConn.
// It doesn't block and wait for the connections to be established.
// Behaviors of the SubConn can be controlled by options.
+ //
+ // Deprecated: please be aware that in a future version, SubConns will only
+ // support one address per SubConn.
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
// RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown.
- RemoveSubConn(SubConn)
-
- // UpdateBalancerState is called by balancer to notify gRPC that some internal
- // state in balancer has changed.
//
- // gRPC will update the connectivity state of the ClientConn, and will call pick
- // on the new picker to pick new SubConn.
- UpdateBalancerState(s connectivity.State, p Picker)
+ // Deprecated: use SubConn.Shutdown instead.
+ RemoveSubConn(SubConn)
+ // UpdateAddresses updates the addresses used in the passed in SubConn.
+ // gRPC checks if the currently connected address is still in the new list.
+ // If so, the connection will be kept. Else, the connection will be
+ // gracefully closed, and a new connection will be created.
+ //
+ // This may trigger a state transition for the SubConn.
+ //
+ // Deprecated: this method will be removed. Create new SubConns for new
+ // addresses instead.
+ UpdateAddresses(SubConn, []resolver.Address)
+
+ // UpdateState notifies gRPC that the balancer's internal state has
+ // changed.
+ //
+ // gRPC will update the connectivity state of the ClientConn, and will call
+ // Pick on the new Picker to pick new SubConns.
+ UpdateState(State)
// ResolveNow is called by balancer to notify gRPC to do a name resolving.
- ResolveNow(resolver.ResolveNowOption)
+ ResolveNow(resolver.ResolveNowOptions)
// Target returns the dial target for this ClientConn.
//
// Deprecated: Use the Target field in the BuildOptions instead.
Target() string
+
+ // MetricsRecorder provides the metrics recorder that balancers can use to
+ // record metrics. Balancer implementations which do not register metrics on
+ // metrics registry and record on them can ignore this method. The returned
+ // MetricsRecorder is guaranteed to never be nil.
+ MetricsRecorder() estats.MetricsRecorder
+
+ // EnforceClientConnEmbedding is included to force implementers to embed
+ // another implementation of this interface, allowing gRPC to add methods
+ // without breaking users.
+ internal.EnforceClientConnEmbedding
}
// BuildOptions contains additional information for Build.
type BuildOptions struct {
- // DialCreds is the transport credential the Balancer implementation can
- // use to dial to a remote load balancer server. The Balancer implementations
- // can ignore this if it does not need to talk to another party securely.
+ // DialCreds is the transport credentials to use when communicating with a
+ // remote load balancer server. Balancer implementations which do not
+ // communicate with a remote load balancer server can ignore this field.
DialCreds credentials.TransportCredentials
- // CredsBundle is the credentials bundle that the Balancer can use.
+ // CredsBundle is the credentials bundle to use when communicating with a
+ // remote load balancer server. Balancer implementations which do not
+ // communicate with a remote load balancer server can ignore this field.
CredsBundle credentials.Bundle
- // Dialer is the custom dialer the Balancer implementation can use to dial
- // to a remote load balancer server. The Balancer implementations
- // can ignore this if it doesn't need to talk to remote balancer.
+ // Dialer is the custom dialer to use when communicating with a remote load
+ // balancer server. Balancer implementations which do not communicate with a
+ // remote load balancer server can ignore this field.
Dialer func(context.Context, string) (net.Conn, error)
- // ChannelzParentID is the entity parent's channelz unique identification number.
- ChannelzParentID int64
- // Target contains the parsed address info of the dial target. It is the same resolver.Target as
- // passed to the resolver.
- // See the documentation for the resolver.Target type for details about what it contains.
+ // Authority is the server name to use as part of the authentication
+ // handshake when communicating with a remote load balancer server. Balancer
+ // implementations which do not communicate with a remote load balancer
+ // server can ignore this field.
+ Authority string
+ // ChannelzParent is the parent ClientConn's channelz channel.
+ ChannelzParent channelz.Identifier
+ // CustomUserAgent is the custom user agent set on the parent ClientConn.
+ // The balancer should set the same custom user agent if it creates a
+ // ClientConn.
+ CustomUserAgent string
+ // Target contains the parsed address info of the dial target. It is the
+ // same resolver.Target as passed to the resolver. See the documentation for
+ // the resolver.Target type for details about what it contains.
Target resolver.Target
}
@@ -185,11 +235,14 @@
ParseConfig(LoadBalancingConfigJSON json.RawMessage) (serviceconfig.LoadBalancingConfig, error)
}
-// PickOptions contains addition information for the Pick operation.
-type PickOptions struct {
+// PickInfo contains additional information for the Pick operation.
+type PickInfo struct {
// FullMethodName is the method name that NewClientStream() is called
// with. The canonical format is /service/Method.
FullMethodName string
+ // Ctx is the RPC's context, and may contain relevant RPC-level information
+ // like the outgoing header metadata.
+ Ctx context.Context
}
// DoneInfo contains additional information for done.
@@ -205,56 +258,79 @@
// ServerLoad is the load received from server. It's usually sent as part of
// trailing metadata.
//
- // The only supported type now is *orca_v1.LoadReport.
- ServerLoad interface{}
+ // The only supported type now is *orca_v3.LoadReport.
+ ServerLoad any
}
var (
// ErrNoSubConnAvailable indicates no SubConn is available for pick().
- // gRPC will block the RPC until a new picker is available via UpdateBalancerState().
+ // gRPC will block the RPC until a new picker is available via UpdateState().
ErrNoSubConnAvailable = errors.New("no SubConn is available")
// ErrTransientFailure indicates all SubConns are in TransientFailure.
// WaitForReady RPCs will block, non-WaitForReady RPCs will fail.
+ //
+ // Deprecated: return an appropriate error based on the last resolution or
+ // connection attempt instead. The behavior is the same for any non-gRPC
+ // status error.
ErrTransientFailure = errors.New("all SubConns are in TransientFailure")
)
+// PickResult contains information related to a connection chosen for an RPC.
+type PickResult struct {
+ // SubConn is the connection to use for this pick, if its state is Ready.
+ // If the state is not Ready, gRPC will block the RPC until a new Picker is
+ // provided by the balancer (using ClientConn.UpdateState). The SubConn
+ // must be one returned by ClientConn.NewSubConn.
+ SubConn SubConn
+
+ // Done is called when the RPC is completed. If the SubConn is not ready,
+ // this will be called with a nil parameter. If the SubConn is not a valid
+ // type, Done may not be called. May be nil if the balancer does not wish
+ // to be notified when the RPC completes.
+ Done func(DoneInfo)
+
+ // Metadata provides a way for LB policies to inject arbitrary per-call
+ // metadata. Any metadata returned here will be merged with existing
+ // metadata added by the client application.
+ //
+ // LB policies with child policies are responsible for propagating metadata
+ // injected by their children to the ClientConn, as part of Pick().
+ Metadata metadata.MD
+}
+
+// TransientFailureError returns e. It exists for backward compatibility and
+// will be deleted soon.
+//
+// Deprecated: no longer necessary, picker errors are treated this way by
+// default.
+func TransientFailureError(e error) error { return e }
+
// Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed.
//
-// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
+// The pickers used by gRPC can be updated by ClientConn.UpdateState().
type Picker interface {
- // Pick returns the SubConn to be used to send the RPC.
- // The returned SubConn must be one returned by NewSubConn().
+ // Pick returns the connection to use for this RPC and related information.
//
- // This functions is expected to return:
- // - a SubConn that is known to be READY;
- // - ErrNoSubConnAvailable if no SubConn is available, but progress is being
- // made (for example, some SubConn is in CONNECTING mode);
- // - other errors if no active connecting is happening (for example, all SubConn
- // are in TRANSIENT_FAILURE mode).
+ // Pick should not block. If the balancer needs to do I/O or any blocking
+ // or time-consuming work to service this call, it should return
+ // ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
+ // the Picker is updated (using ClientConn.UpdateState).
//
- // If a SubConn is returned:
- // - If it is READY, gRPC will send the RPC on it;
- // - If it is not ready, or becomes not ready after it's returned, gRPC will
- // block until UpdateBalancerState() is called and will call pick on the
- // new picker. The done function returned from Pick(), if not nil, will be
- // called with nil error, no bytes sent and no bytes received.
+ // If an error is returned:
//
- // If the returned error is not nil:
- // - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState()
- // - If the error is ErrTransientFailure:
- // - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState()
- // is called to pick again;
- // - Otherwise, RPC will fail with unavailable error.
- // - Else (error is other non-nil error):
- // - The RPC will fail with unavailable error.
+ // - If the error is ErrNoSubConnAvailable, gRPC will block until a new
+ // Picker is provided by the balancer (using ClientConn.UpdateState).
//
- // The returned done() function will be called once the rpc has finished,
- // with the final status of that RPC. If the SubConn returned is not a
- // valid SubConn type, done may not be called. done may be nil if balancer
- // doesn't care about the RPC status.
- Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
+ // - If the error is a status error (implemented by the grpc/status
+ // package), gRPC will terminate the RPC with the code and message
+ // provided.
+ //
+ // - For all other errors, wait for ready RPCs will wait, but non-wait for
+ // ready RPCs will be terminated with this error's Error() string and
+ // status code Unavailable.
+ Pick(info PickInfo) (PickResult, error)
}
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
@@ -262,38 +338,46 @@
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
//
-// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
-// to be called synchronously from the same goroutine.
-// There's no guarantee on picker.Pick, it may be called anytime.
+// UpdateClientConnState, ResolverError, UpdateSubConnState, and Close are
+// guaranteed to be called synchronously from the same goroutine. There's no
+// guarantee on picker.Pick, it may be called anytime.
type Balancer interface {
- // HandleSubConnStateChange is called by gRPC when the connectivity state
- // of sc has changed.
- // Balancer is expected to aggregate all the state of SubConn and report
- // that back to gRPC.
- // Balancer should also generate and update Pickers when its internal state has
- // been changed by the new state.
+ // UpdateClientConnState is called by gRPC when the state of the ClientConn
+ // changes. If the error returned is ErrBadResolverState, the ClientConn
+ // will begin calling ResolveNow on the active name resolver with
+ // exponential backoff until a subsequent call to UpdateClientConnState
+ // returns a nil error. Any other errors are currently ignored.
+ UpdateClientConnState(ClientConnState) error
+ // ResolverError is called by gRPC when the name resolver reports an error.
+ ResolverError(error)
+ // UpdateSubConnState is called by gRPC when the state of a SubConn
+ // changes.
//
- // Deprecated: if V2Balancer is implemented by the Balancer,
- // UpdateSubConnState will be called instead.
- HandleSubConnStateChange(sc SubConn, state connectivity.State)
- // HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
- // balancers.
- // Balancer can create new SubConn or remove SubConn with the addresses.
- // An empty address slice and a non-nil error will be passed if the resolver returns
- // non-nil error to gRPC.
- //
- // Deprecated: if V2Balancer is implemented by the Balancer,
- // UpdateClientConnState will be called instead.
- HandleResolvedAddrs([]resolver.Address, error)
- // Close closes the balancer. The balancer is not required to call
- // ClientConn.RemoveSubConn for its existing SubConns.
+ // Deprecated: Use NewSubConnOptions.StateListener when creating the
+ // SubConn instead.
+ UpdateSubConnState(SubConn, SubConnState)
+ // Close closes the balancer. The balancer is not currently required to
+ // call SubConn.Shutdown for its existing SubConns; however, this will be
+ // required in a future release, so it is recommended.
Close()
+ // ExitIdle instructs the LB policy to reconnect to backends / exit the
+ // IDLE state, if appropriate and possible. Note that SubConns that enter
+ // the IDLE state will not reconnect until SubConn.Connect is called.
+ ExitIdle()
}
-// SubConnState describes the state of a SubConn.
-type SubConnState struct {
- ConnectivityState connectivity.State
- // TODO: add last connection error
+// ExitIdler is an optional interface for balancers to implement. If
+// implemented, ExitIdle will be called when ClientConn.Connect is called, if
+// the ClientConn is idle. If unimplemented, ClientConn.Connect will cause
+// all SubConns to connect.
+//
+// Deprecated: All balancers must implement this interface. This interface will
+// be removed in a future release.
+type ExitIdler interface {
+ // ExitIdle instructs the LB policy to reconnect to backends / exit the
+ // IDLE state, if appropriate and possible. Note that SubConns that enter
+ // the IDLE state will not reconnect until SubConn.Connect is called.
+ ExitIdle()
}
// ClientConnState describes the state of a ClientConn relevant to the
@@ -308,66 +392,3 @@
// ErrBadResolverState may be returned by UpdateClientConnState to indicate a
// problem with the provided name resolver data.
var ErrBadResolverState = errors.New("bad resolver state")
-
-// V2Balancer is defined for documentation purposes. If a Balancer also
-// implements V2Balancer, its UpdateClientConnState method will be called
-// instead of HandleResolvedAddrs and its UpdateSubConnState will be called
-// instead of HandleSubConnStateChange.
-type V2Balancer interface {
- // UpdateClientConnState is called by gRPC when the state of the ClientConn
- // changes. If the error returned is ErrBadResolverState, the ClientConn
- // will begin calling ResolveNow on the active name resolver with
- // exponential backoff until a subsequent call to UpdateClientConnState
- // returns a nil error. Any other errors are currently ignored.
- UpdateClientConnState(ClientConnState) error
- // ResolverError is called by gRPC when the name resolver reports an error.
- ResolverError(error)
- // UpdateSubConnState is called by gRPC when the state of a SubConn
- // changes.
- UpdateSubConnState(SubConn, SubConnState)
- // Close closes the balancer. The balancer is not required to call
- // ClientConn.RemoveSubConn for its existing SubConns.
- Close()
-}
-
-// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns
-// and returns one aggregated connectivity state.
-//
-// It's not thread safe.
-type ConnectivityStateEvaluator struct {
- numReady uint64 // Number of addrConns in ready state.
- numConnecting uint64 // Number of addrConns in connecting state.
- numTransientFailure uint64 // Number of addrConns in transientFailure.
-}
-
-// RecordTransition records state change happening in subConn and based on that
-// it evaluates what aggregated state should be.
-//
-// - If at least one SubConn in Ready, the aggregated state is Ready;
-// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
-// - Else the aggregated state is TransientFailure.
-//
-// Idle and Shutdown are not considered.
-func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
- // Update counters.
- for idx, state := range []connectivity.State{oldState, newState} {
- updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
- switch state {
- case connectivity.Ready:
- cse.numReady += updateVal
- case connectivity.Connecting:
- cse.numConnecting += updateVal
- case connectivity.TransientFailure:
- cse.numTransientFailure += updateVal
- }
- }
-
- // Evaluate.
- if cse.numReady > 0 {
- return connectivity.Ready
- }
- if cse.numConnecting > 0 {
- return connectivity.Connecting
- }
- return connectivity.TransientFailure
-}
diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go
index 1a5c1aa..4d57687 100644
--- a/vendor/google.golang.org/grpc/balancer/base/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go
@@ -19,7 +19,8 @@
package base
import (
- "context"
+ "errors"
+ "fmt"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
@@ -27,34 +28,36 @@
"google.golang.org/grpc/resolver"
)
+var logger = grpclog.Component("balancer")
+
type baseBuilder struct {
name string
pickerBuilder PickerBuilder
config Config
}
-func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
- return &baseBalancer{
+func (bb *baseBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
+ bal := &baseBalancer{
cc: cc,
pickerBuilder: bb.pickerBuilder,
- subConns: make(map[resolver.Address]balancer.SubConn),
+ subConns: resolver.NewAddressMapV2[balancer.SubConn](),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
- // Initialize picker to a picker that always return
- // ErrNoSubConnAvailable, because when state of a SubConn changes, we
- // may call UpdateBalancerState with this picker.
- picker: NewErrPicker(balancer.ErrNoSubConnAvailable),
- config: bb.config,
+ config: bb.config,
+ state: connectivity.Connecting,
}
+ // Initialize picker to a picker that always returns
+ // ErrNoSubConnAvailable, because when state of a SubConn changes, we
+ // may call UpdateState with this picker.
+ bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
+ return bal
}
func (bb *baseBuilder) Name() string {
return bb.name
}
-var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer
-
type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder
@@ -62,87 +65,145 @@
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
- subConns map[resolver.Address]balancer.SubConn
+ subConns *resolver.AddressMapV2[balancer.SubConn]
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
config Config
+
+ resolverErr error // the last error reported by the resolver; cleared on successful resolution
+ connErr error // the last connection error; cleared upon leaving TransientFailure
}
-func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
- panic("not implemented")
-}
+func (b *baseBalancer) ResolverError(err error) {
+ b.resolverErr = err
+ if b.subConns.Len() == 0 {
+ b.state = connectivity.TransientFailure
+ }
-func (b *baseBalancer) ResolverError(error) {
- // Ignore
+ if b.state != connectivity.TransientFailure {
+ // The picker will not change since the balancer does not currently
+ // report an error.
+ return
+ }
+ b.regeneratePicker()
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: b.state,
+ Picker: b.picker,
+ })
}
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
- // TODO: handle s.ResolverState.Err (log if not nil) once implemented.
// TODO: handle s.ResolverState.ServiceConfig?
- if grpclog.V(2) {
- grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
+ if logger.V(2) {
+ logger.Info("base.baseBalancer: got new ClientConn state: ", s)
}
+ // Successful resolution; clear resolver error and ensure we return nil.
+ b.resolverErr = nil
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
- addrsSet := make(map[resolver.Address]struct{})
+ addrsSet := resolver.NewAddressMapV2[any]()
for _, a := range s.ResolverState.Addresses {
- addrsSet[a] = struct{}{}
- if _, ok := b.subConns[a]; !ok {
+ addrsSet.Set(a, nil)
+ if _, ok := b.subConns.Get(a); !ok {
// a is a new address (not existing in b.subConns).
- sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
+ var sc balancer.SubConn
+ opts := balancer.NewSubConnOptions{
+ HealthCheckEnabled: b.config.HealthCheck,
+ StateListener: func(scs balancer.SubConnState) { b.updateSubConnState(sc, scs) },
+ }
+ sc, err := b.cc.NewSubConn([]resolver.Address{a}, opts)
if err != nil {
- grpclog.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
+ logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
- b.subConns[a] = sc
+ b.subConns.Set(a, sc)
b.scStates[sc] = connectivity.Idle
+ b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
}
}
- for a, sc := range b.subConns {
+ for _, a := range b.subConns.Keys() {
+ sc, _ := b.subConns.Get(a)
// a was removed by resolver.
- if _, ok := addrsSet[a]; !ok {
- b.cc.RemoveSubConn(sc)
- delete(b.subConns, a)
+ if _, ok := addrsSet.Get(a); !ok {
+ sc.Shutdown()
+ b.subConns.Delete(a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
- // The entry will be deleted in HandleSubConnStateChange.
+ // The entry will be deleted in updateSubConnState.
}
}
+ // If resolver state contains no addresses, return an error so ClientConn
+ // will trigger re-resolve. Also records this as a resolver error, so when
+ // the overall state turns transient failure, the error message will have
+ // the zero address information.
+ if len(s.ResolverState.Addresses) == 0 {
+ b.ResolverError(errors.New("produced zero addresses"))
+ return balancer.ErrBadResolverState
+ }
+
+ b.regeneratePicker()
+ b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
return nil
}
+// mergeErrors builds an error from the last connection error and the last
+// resolver error. Must only be called if b.state is TransientFailure.
+func (b *baseBalancer) mergeErrors() error {
+ // connErr must always be non-nil unless there are no SubConns, in which
+ // case resolverErr must be non-nil.
+ if b.connErr == nil {
+ return fmt.Errorf("last resolver error: %v", b.resolverErr)
+ }
+ if b.resolverErr == nil {
+ return fmt.Errorf("last connection error: %v", b.connErr)
+ }
+ return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr)
+}
+
// regeneratePicker takes a snapshot of the balancer, and generates a picker
// from it. The picker is
-// - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
-// - built by the pickerBuilder with all READY SubConns otherwise.
+// - errPicker if the balancer is in TransientFailure,
+// - built by the pickerBuilder with all READY SubConns otherwise.
func (b *baseBalancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {
- b.picker = NewErrPicker(balancer.ErrTransientFailure)
+ b.picker = NewErrPicker(b.mergeErrors())
return
}
- readySCs := make(map[resolver.Address]balancer.SubConn)
+ readySCs := make(map[balancer.SubConn]SubConnInfo)
// Filter out all ready SCs from full subConn map.
- for addr, sc := range b.subConns {
+ for _, addr := range b.subConns.Keys() {
+ sc, _ := b.subConns.Get(addr)
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
- readySCs[addr] = sc
+ readySCs[sc] = SubConnInfo{Address: addr}
}
}
- b.picker = b.pickerBuilder.Build(readySCs)
+ b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
}
-func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
- panic("not implemented")
-}
-
+// UpdateSubConnState is a nop because a StateListener is always set in NewSubConn.
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
+ logger.Errorf("base.baseBalancer: UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
+}
+
+func (b *baseBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
- if grpclog.V(2) {
- grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
+ if logger.V(2) {
+ logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
}
oldS, ok := b.scStates[sc]
if !ok {
- if grpclog.V(2) {
- grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
+ if logger.V(2) {
+ logger.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
+ }
+ return
+ }
+ if oldS == connectivity.TransientFailure &&
+ (s == connectivity.Connecting || s == connectivity.Idle) {
+ // Once a subconn enters TRANSIENT_FAILURE, ignore subsequent IDLE or
+ // CONNECTING transitions to prevent the aggregated state from being
+ // always CONNECTING when many backends exist but are all down.
+ if s == connectivity.Idle {
+ sc.Connect()
}
return
}
@@ -151,41 +212,51 @@
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
- // When an address was removed by resolver, b called RemoveSubConn but
- // kept the sc's state in scStates. Remove state for this sc here.
+ // When an address was removed by resolver, b called Shutdown but kept
+ // the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
+ case connectivity.TransientFailure:
+ // Save error to be reported via picker.
+ b.connErr = state.ConnectionError
}
- oldAggrState := b.state
b.state = b.csEvltr.RecordTransition(oldS, s)
// Regenerate picker when one of the following happens:
- // - this sc became ready from not-ready
- // - this sc became not-ready from ready
- // - the aggregated state of balancer became TransientFailure from non-TransientFailure
- // - the aggregated state of balancer became non-TransientFailure from TransientFailure
+ // - this sc entered or left ready
+ // - the aggregated state of balancer is TransientFailure
+ // (may need to update error message)
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
- (b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
+ b.state == connectivity.TransientFailure {
b.regeneratePicker()
}
-
- b.cc.UpdateBalancerState(b.state, b.picker)
+ b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}
// Close is a nop because base balancer doesn't have internal state to clean up,
-// and it doesn't need to call RemoveSubConn for the SubConns.
+// and it doesn't need to call Shutdown for the SubConns.
func (b *baseBalancer) Close() {
}
-// NewErrPicker returns a picker that always returns err on Pick().
+// ExitIdle is a nop because the base balancer attempts to stay connected to
+// all SubConns at all times.
+func (b *baseBalancer) ExitIdle() {
+}
+
+// NewErrPicker returns a Picker that always returns err on Pick().
func NewErrPicker(err error) balancer.Picker {
return &errPicker{err: err}
}
+// NewErrPickerV2 is temporarily defined for backward compatibility reasons.
+//
+// Deprecated: use NewErrPicker instead.
+var NewErrPickerV2 = NewErrPicker
+
type errPicker struct {
err error // Pick() always returns this err.
}
-func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
- return nil, nil, p.err
+func (p *errPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
+ return balancer.PickResult{}, p.err
}
diff --git a/vendor/google.golang.org/grpc/balancer/base/base.go b/vendor/google.golang.org/grpc/balancer/base/base.go
index 34b1f29..e31d76e 100644
--- a/vendor/google.golang.org/grpc/balancer/base/base.go
+++ b/vendor/google.golang.org/grpc/balancer/base/base.go
@@ -37,15 +37,22 @@
// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
- // Build takes a slice of ready SubConns, and returns a picker that will be
- // used by gRPC to pick a SubConn.
- Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
+ // Build returns a picker that will be used by gRPC to pick a SubConn.
+ Build(info PickerBuildInfo) balancer.Picker
}
-// NewBalancerBuilder returns a balancer builder. The balancers
-// built by this builder will use the picker builder to build pickers.
-func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
- return NewBalancerBuilderWithConfig(name, pb, Config{})
+// PickerBuildInfo contains information needed by the picker builder to
+// construct a picker.
+type PickerBuildInfo struct {
+ // ReadySCs is a map from all ready SubConns to the Addresses used to
+ // create them.
+ ReadySCs map[balancer.SubConn]SubConnInfo
+}
+
+// SubConnInfo contains information about a SubConn created by the base
+// balancer.
+type SubConnInfo struct {
+ Address resolver.Address // the address used to create this SubConn
}
// Config contains the config info about the base balancer builder.
@@ -54,8 +61,8 @@
HealthCheck bool
}
-// NewBalancerBuilderWithConfig returns a base balancer builder configured by the provided config.
-func NewBalancerBuilderWithConfig(name string, pb PickerBuilder, config Config) balancer.Builder {
+// NewBalancerBuilder returns a base balancer builder configured by the provided config.
+func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder {
return &baseBuilder{
name: name,
pickerBuilder: pb,
diff --git a/vendor/google.golang.org/grpc/balancer/conn_state_evaluator.go b/vendor/google.golang.org/grpc/balancer/conn_state_evaluator.go
new file mode 100644
index 0000000..c334135
--- /dev/null
+++ b/vendor/google.golang.org/grpc/balancer/conn_state_evaluator.go
@@ -0,0 +1,74 @@
+/*
+ *
+ * Copyright 2022 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package balancer
+
+import "google.golang.org/grpc/connectivity"
+
+// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns
+// and returns one aggregated connectivity state.
+//
+// It's not thread safe.
+type ConnectivityStateEvaluator struct {
+ numReady uint64 // Number of addrConns in ready state.
+ numConnecting uint64 // Number of addrConns in connecting state.
+ numTransientFailure uint64 // Number of addrConns in transient failure state.
+ numIdle uint64 // Number of addrConns in idle state.
+}
+
+// RecordTransition records state change happening in subConn and based on that
+// it evaluates what aggregated state should be.
+//
+// - If at least one SubConn in Ready, the aggregated state is Ready;
+// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
+// - Else if at least one SubConn is Idle, the aggregated state is Idle;
+// - Else if at least one SubConn is TransientFailure (or there are no SubConns), the aggregated state is Transient Failure.
+//
+// Shutdown is not considered.
+func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
+ // Update counters.
+ for idx, state := range []connectivity.State{oldState, newState} {
+ updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
+ switch state {
+ case connectivity.Ready:
+ cse.numReady += updateVal
+ case connectivity.Connecting:
+ cse.numConnecting += updateVal
+ case connectivity.TransientFailure:
+ cse.numTransientFailure += updateVal
+ case connectivity.Idle:
+ cse.numIdle += updateVal
+ }
+ }
+ return cse.CurrentState()
+}
+
+// CurrentState returns the current aggregate conn state by evaluating the counters
+func (cse *ConnectivityStateEvaluator) CurrentState() connectivity.State {
+ // Evaluate.
+ if cse.numReady > 0 {
+ return connectivity.Ready
+ }
+ if cse.numConnecting > 0 {
+ return connectivity.Connecting
+ }
+ if cse.numIdle > 0 {
+ return connectivity.Idle
+ }
+ return connectivity.TransientFailure
+}
diff --git a/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go b/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go
new file mode 100644
index 0000000..360db08
--- /dev/null
+++ b/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go
@@ -0,0 +1,389 @@
+/*
+ *
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package endpointsharding implements a load balancing policy that manages
+// homogeneous child policies each owning a single endpoint.
+//
+// # Experimental
+//
+// Notice: This package is EXPERIMENTAL and may be changed or removed in a
+// later release.
+package endpointsharding
+
+import (
+ "errors"
+ rand "math/rand/v2"
+ "sync"
+ "sync/atomic"
+
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/balancer/base"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/resolver"
+)
+
+var randIntN = rand.IntN
+
+// ChildState is the balancer state of a child along with the endpoint which
+// identifies the child balancer.
+type ChildState struct {
+ Endpoint resolver.Endpoint
+ State balancer.State
+
+ // Balancer exposes only the ExitIdler interface of the child LB policy.
+ // Other methods of the child policy are called only by endpointsharding.
+ Balancer ExitIdler
+}
+
+// ExitIdler provides access to only the ExitIdle method of the child balancer.
+type ExitIdler interface {
+ // ExitIdle instructs the LB policy to reconnect to backends / exit the
+ // IDLE state, if appropriate and possible. Note that SubConns that enter
+ // the IDLE state will not reconnect until SubConn.Connect is called.
+ ExitIdle()
+}
+
+// Options are the options to configure the behaviour of the
+// endpointsharding balancer.
+type Options struct {
+ // DisableAutoReconnect allows the balancer to keep child balancer in the
+ // IDLE state until they are explicitly triggered to exit using the
+ // ChildState obtained from the endpointsharding picker. When set to false,
+ // the endpointsharding balancer will automatically call ExitIdle on child
+ // connections that report IDLE.
+ DisableAutoReconnect bool
+}
+
+// ChildBuilderFunc creates a new balancer with the ClientConn. It has the same
+// type as the balancer.Builder.Build method.
+type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer
+
+// NewBalancer returns a load balancing policy that manages homogeneous child
+// policies each owning a single endpoint. The endpointsharding balancer
+// forwards the LoadBalancingConfig in ClientConn state updates to its children.
+func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions, childBuilder ChildBuilderFunc, esOpts Options) balancer.Balancer {
+ es := &endpointSharding{
+ cc: cc,
+ bOpts: opts,
+ esOpts: esOpts,
+ childBuilder: childBuilder,
+ }
+ es.children.Store(resolver.NewEndpointMap[*balancerWrapper]())
+ return es
+}
+
+// endpointSharding is a balancer that wraps child balancers. It creates a child
+// balancer with child config for every unique Endpoint received. It updates the
+// child states on any update from parent or child.
+type endpointSharding struct {
+ cc balancer.ClientConn
+ bOpts balancer.BuildOptions
+ esOpts Options
+ childBuilder ChildBuilderFunc
+
+ // childMu synchronizes calls to any single child. It must be held for all
+ // calls into a child. To avoid deadlocks, do not acquire childMu while
+ // holding mu.
+ childMu sync.Mutex
+ children atomic.Pointer[resolver.EndpointMap[*balancerWrapper]]
+
+ // inhibitChildUpdates is set during UpdateClientConnState/ResolverError
+ // calls (calls to children will each produce an update, only want one
+ // update).
+ inhibitChildUpdates atomic.Bool
+
+ // mu synchronizes access to the state stored in balancerWrappers in the
+ // children field. mu must not be held during calls into a child since
+ // synchronous calls back from the child may require taking mu, causing a
+ // deadlock. To avoid deadlocks, do not acquire childMu while holding mu.
+ mu sync.Mutex
+}
+
+// rotateEndpoints returns a slice of all the input endpoints rotated a random
+// amount.
+func rotateEndpoints(es []resolver.Endpoint) []resolver.Endpoint {
+ les := len(es)
+ if les == 0 {
+ return es
+ }
+ r := randIntN(les)
+ // Make a copy to avoid mutating data beyond the end of es.
+ ret := make([]resolver.Endpoint, les)
+ copy(ret, es[r:])
+ copy(ret[les-r:], es[:r])
+ return ret
+}
+
+// UpdateClientConnState creates a child for new endpoints and deletes children
+// for endpoints that are no longer present. It also updates all the children,
+// and sends a single synchronous update of the childrens' aggregated state at
+// the end of the UpdateClientConnState operation. If any endpoint has no
+// addresses it will ignore that endpoint. Otherwise, returns first error found
+// from a child, but fully processes the new update.
+func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error {
+ es.childMu.Lock()
+ defer es.childMu.Unlock()
+
+ es.inhibitChildUpdates.Store(true)
+ defer func() {
+ es.inhibitChildUpdates.Store(false)
+ es.updateState()
+ }()
+ var ret error
+
+ children := es.children.Load()
+ newChildren := resolver.NewEndpointMap[*balancerWrapper]()
+
+ // Update/Create new children.
+ for _, endpoint := range rotateEndpoints(state.ResolverState.Endpoints) {
+ if _, ok := newChildren.Get(endpoint); ok {
+ // Endpoint child was already created, continue to avoid duplicate
+ // update.
+ continue
+ }
+ childBalancer, ok := children.Get(endpoint)
+ if ok {
+ // Endpoint attributes may have changed, update the stored endpoint.
+ es.mu.Lock()
+ childBalancer.childState.Endpoint = endpoint
+ es.mu.Unlock()
+ } else {
+ childBalancer = &balancerWrapper{
+ childState: ChildState{Endpoint: endpoint},
+ ClientConn: es.cc,
+ es: es,
+ }
+ childBalancer.childState.Balancer = childBalancer
+ childBalancer.child = es.childBuilder(childBalancer, es.bOpts)
+ }
+ newChildren.Set(endpoint, childBalancer)
+ if err := childBalancer.updateClientConnStateLocked(balancer.ClientConnState{
+ BalancerConfig: state.BalancerConfig,
+ ResolverState: resolver.State{
+ Endpoints: []resolver.Endpoint{endpoint},
+ Attributes: state.ResolverState.Attributes,
+ },
+ }); err != nil && ret == nil {
+ // Return first error found, and always commit full processing of
+ // updating children. If desired to process more specific errors
+ // across all endpoints, caller should make these specific
+ // validations, this is a current limitation for simplicity sake.
+ ret = err
+ }
+ }
+ // Delete old children that are no longer present.
+ for _, e := range children.Keys() {
+ child, _ := children.Get(e)
+ if _, ok := newChildren.Get(e); !ok {
+ child.closeLocked()
+ }
+ }
+ es.children.Store(newChildren)
+ if newChildren.Len() == 0 {
+ return balancer.ErrBadResolverState
+ }
+ return ret
+}
+
+// ResolverError forwards the resolver error to all of the endpointSharding's
+// children and sends a single synchronous update of the childStates at the end
+// of the ResolverError operation.
+func (es *endpointSharding) ResolverError(err error) {
+ es.childMu.Lock()
+ defer es.childMu.Unlock()
+ es.inhibitChildUpdates.Store(true)
+ defer func() {
+ es.inhibitChildUpdates.Store(false)
+ es.updateState()
+ }()
+ children := es.children.Load()
+ for _, child := range children.Values() {
+ child.resolverErrorLocked(err)
+ }
+}
+
+func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
+ // UpdateSubConnState is deprecated.
+}
+
+func (es *endpointSharding) Close() {
+ es.childMu.Lock()
+ defer es.childMu.Unlock()
+ children := es.children.Load()
+ for _, child := range children.Values() {
+ child.closeLocked()
+ }
+}
+
+func (es *endpointSharding) ExitIdle() {
+ es.childMu.Lock()
+ defer es.childMu.Unlock()
+ for _, bw := range es.children.Load().Values() {
+ if !bw.isClosed {
+ bw.child.ExitIdle()
+ }
+ }
+}
+
+// updateState updates this component's state. It sends the aggregated state,
+// and a picker with round robin behavior with all the child states present if
+// needed.
+func (es *endpointSharding) updateState() {
+ if es.inhibitChildUpdates.Load() {
+ return
+ }
+ var readyPickers, connectingPickers, idlePickers, transientFailurePickers []balancer.Picker
+
+ es.mu.Lock()
+ defer es.mu.Unlock()
+
+ children := es.children.Load()
+ childStates := make([]ChildState, 0, children.Len())
+
+ for _, child := range children.Values() {
+ childState := child.childState
+ childStates = append(childStates, childState)
+ childPicker := childState.State.Picker
+ switch childState.State.ConnectivityState {
+ case connectivity.Ready:
+ readyPickers = append(readyPickers, childPicker)
+ case connectivity.Connecting:
+ connectingPickers = append(connectingPickers, childPicker)
+ case connectivity.Idle:
+ idlePickers = append(idlePickers, childPicker)
+ case connectivity.TransientFailure:
+ transientFailurePickers = append(transientFailurePickers, childPicker)
+ // connectivity.Shutdown shouldn't appear.
+ }
+ }
+
+ // Construct the round robin picker based off the aggregated state. Whatever
+ // the aggregated state, use the pickers present that are currently in that
+ // state only.
+ var aggState connectivity.State
+ var pickers []balancer.Picker
+ if len(readyPickers) >= 1 {
+ aggState = connectivity.Ready
+ pickers = readyPickers
+ } else if len(connectingPickers) >= 1 {
+ aggState = connectivity.Connecting
+ pickers = connectingPickers
+ } else if len(idlePickers) >= 1 {
+ aggState = connectivity.Idle
+ pickers = idlePickers
+ } else if len(transientFailurePickers) >= 1 {
+ aggState = connectivity.TransientFailure
+ pickers = transientFailurePickers
+ } else {
+ aggState = connectivity.TransientFailure
+ pickers = []balancer.Picker{base.NewErrPicker(errors.New("no children to pick from"))}
+ } // No children (resolver error before valid update).
+ p := &pickerWithChildStates{
+ pickers: pickers,
+ childStates: childStates,
+ next: uint32(randIntN(len(pickers))),
+ }
+ es.cc.UpdateState(balancer.State{
+ ConnectivityState: aggState,
+ Picker: p,
+ })
+}
+
+// pickerWithChildStates delegates to the pickers it holds in a round robin
+// fashion. It also contains the childStates of all the endpointSharding's
+// children.
+type pickerWithChildStates struct {
+ pickers []balancer.Picker
+ childStates []ChildState
+ next uint32
+}
+
+func (p *pickerWithChildStates) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
+ nextIndex := atomic.AddUint32(&p.next, 1)
+ picker := p.pickers[nextIndex%uint32(len(p.pickers))]
+ return picker.Pick(info)
+}
+
+// ChildStatesFromPicker returns the state of all the children managed by the
+// endpoint sharding balancer that created this picker.
+func ChildStatesFromPicker(picker balancer.Picker) []ChildState {
+ p, ok := picker.(*pickerWithChildStates)
+ if !ok {
+ return nil
+ }
+ return p.childStates
+}
+
+// balancerWrapper is a wrapper of a balancer. It ID's a child balancer by
+// endpoint, and persists recent child balancer state.
+type balancerWrapper struct {
+ // The following fields are initialized at build time and read-only after
+ // that and therefore do not need to be guarded by a mutex.
+
+ // child contains the wrapped balancer. Access its methods only through
+ // methods on balancerWrapper to ensure proper synchronization
+ child balancer.Balancer
+ balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns
+
+ es *endpointSharding
+
+ // Access to the following fields is guarded by es.mu.
+
+ childState ChildState
+ isClosed bool
+}
+
+func (bw *balancerWrapper) UpdateState(state balancer.State) {
+ bw.es.mu.Lock()
+ bw.childState.State = state
+ bw.es.mu.Unlock()
+ if state.ConnectivityState == connectivity.Idle && !bw.es.esOpts.DisableAutoReconnect {
+ bw.ExitIdle()
+ }
+ bw.es.updateState()
+}
+
+// ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to
+// avoid deadlocks due to synchronous balancer state updates.
+func (bw *balancerWrapper) ExitIdle() {
+ go func() {
+ bw.es.childMu.Lock()
+ if !bw.isClosed {
+ bw.child.ExitIdle()
+ }
+ bw.es.childMu.Unlock()
+ }()
+}
+
+// updateClientConnStateLocked delivers the ClientConnState to the child
+// balancer. Callers must hold the child mutex of the parent endpointsharding
+// balancer.
+func (bw *balancerWrapper) updateClientConnStateLocked(ccs balancer.ClientConnState) error {
+ return bw.child.UpdateClientConnState(ccs)
+}
+
+// closeLocked closes the child balancer. Callers must hold the child mutext of
+// the parent endpointsharding balancer.
+func (bw *balancerWrapper) closeLocked() {
+ bw.child.Close()
+ bw.isClosed = true
+}
+
+func (bw *balancerWrapper) resolverErrorLocked(err error) {
+ bw.child.ResolverError(err)
+}
diff --git a/vendor/google.golang.org/grpc/balancer/grpclb/state/state.go b/vendor/google.golang.org/grpc/balancer/grpclb/state/state.go
new file mode 100644
index 0000000..4ecfa1c
--- /dev/null
+++ b/vendor/google.golang.org/grpc/balancer/grpclb/state/state.go
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright 2020 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package state declares grpclb types to be set by resolvers wishing to pass
+// information to grpclb via resolver.State Attributes.
+package state
+
+import (
+ "google.golang.org/grpc/resolver"
+)
+
+// keyType is the key to use for storing State in Attributes.
+type keyType string
+
+const key = keyType("grpc.grpclb.state")
+
+// State contains gRPCLB-relevant data passed from the name resolver.
+type State struct {
+ // BalancerAddresses contains the remote load balancer address(es). If
+ // set, overrides any resolver-provided addresses with Type of GRPCLB.
+ BalancerAddresses []resolver.Address
+}
+
+// Set returns a copy of the provided state with attributes containing s. s's
+// data should not be mutated after calling Set.
+func Set(state resolver.State, s *State) resolver.State {
+ state.Attributes = state.Attributes.WithValue(key, s)
+ return state
+}
+
+// Get returns the grpclb State in the resolver.State, or nil if not present.
+// The returned data should not be mutated.
+func Get(state resolver.State) *State {
+ s, _ := state.Attributes.Value(key).(*State)
+ return s
+}
diff --git a/vendor/google.golang.org/grpc/balancer/pickfirst/internal/internal.go b/vendor/google.golang.org/grpc/balancer/pickfirst/internal/internal.go
new file mode 100644
index 0000000..7d66cb4
--- /dev/null
+++ b/vendor/google.golang.org/grpc/balancer/pickfirst/internal/internal.go
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package internal contains code internal to the pickfirst package.
+package internal
+
+import (
+ rand "math/rand/v2"
+ "time"
+)
+
+var (
+ // RandShuffle pseudo-randomizes the order of addresses.
+ RandShuffle = rand.Shuffle
+ // TimeAfterFunc allows mocking the timer for testing connection delay
+ // related functionality.
+ TimeAfterFunc = func(d time.Duration, f func()) func() {
+ timer := time.AfterFunc(d, f)
+ return func() { timer.Stop() }
+ }
+)
diff --git a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go
new file mode 100644
index 0000000..b15c10e
--- /dev/null
+++ b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go
@@ -0,0 +1,291 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package pickfirst contains the pick_first load balancing policy.
+package pickfirst
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ rand "math/rand/v2"
+
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/balancer/pickfirst/internal"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal/envconfig"
+ internalgrpclog "google.golang.org/grpc/internal/grpclog"
+ "google.golang.org/grpc/internal/pretty"
+ "google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/serviceconfig"
+
+ _ "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" // For automatically registering the new pickfirst if required.
+)
+
+func init() {
+ if envconfig.NewPickFirstEnabled {
+ return
+ }
+ balancer.Register(pickfirstBuilder{})
+}
+
+var logger = grpclog.Component("pick-first-lb")
+
+const (
+ // Name is the name of the pick_first balancer.
+ Name = "pick_first"
+ logPrefix = "[pick-first-lb %p] "
+)
+
+type pickfirstBuilder struct{}
+
+func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
+ b := &pickfirstBalancer{cc: cc}
+ b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
+ return b
+}
+
+func (pickfirstBuilder) Name() string {
+ return Name
+}
+
+type pfConfig struct {
+ serviceconfig.LoadBalancingConfig `json:"-"`
+
+ // If set to true, instructs the LB policy to shuffle the order of the list
+ // of endpoints received from the name resolver before attempting to
+ // connect to them.
+ ShuffleAddressList bool `json:"shuffleAddressList"`
+}
+
+func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
+ var cfg pfConfig
+ if err := json.Unmarshal(js, &cfg); err != nil {
+ return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
+ }
+ return cfg, nil
+}
+
+type pickfirstBalancer struct {
+ logger *internalgrpclog.PrefixLogger
+ state connectivity.State
+ cc balancer.ClientConn
+ subConn balancer.SubConn
+}
+
+func (b *pickfirstBalancer) ResolverError(err error) {
+ if b.logger.V(2) {
+ b.logger.Infof("Received error from the name resolver: %v", err)
+ }
+ if b.subConn == nil {
+ b.state = connectivity.TransientFailure
+ }
+
+ if b.state != connectivity.TransientFailure {
+ // The picker will not change since the balancer does not currently
+ // report an error.
+ return
+ }
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: connectivity.TransientFailure,
+ Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
+ })
+}
+
+// Shuffler is an interface for shuffling an address list.
+type Shuffler interface {
+ ShuffleAddressListForTesting(n int, swap func(i, j int))
+}
+
+// ShuffleAddressListForTesting pseudo-randomizes the order of addresses. n
+// is the number of elements. swap swaps the elements with indexes i and j.
+func ShuffleAddressListForTesting(n int, swap func(i, j int)) { rand.Shuffle(n, swap) }
+
+func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
+ if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
+ // The resolver reported an empty address list. Treat it like an error by
+ // calling b.ResolverError.
+ if b.subConn != nil {
+ // Shut down the old subConn. All addresses were removed, so it is
+ // no longer valid.
+ b.subConn.Shutdown()
+ b.subConn = nil
+ }
+ b.ResolverError(errors.New("produced zero addresses"))
+ return balancer.ErrBadResolverState
+ }
+ // We don't have to guard this block with the env var because ParseConfig
+ // already does so.
+ cfg, ok := state.BalancerConfig.(pfConfig)
+ if state.BalancerConfig != nil && !ok {
+ return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
+ }
+
+ if b.logger.V(2) {
+ b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))
+ }
+
+ var addrs []resolver.Address
+ if endpoints := state.ResolverState.Endpoints; len(endpoints) != 0 {
+ // Perform the optional shuffling described in gRFC A62. The shuffling will
+ // change the order of endpoints but not touch the order of the addresses
+ // within each endpoint. - A61
+ if cfg.ShuffleAddressList {
+ endpoints = append([]resolver.Endpoint{}, endpoints...)
+ internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
+ }
+
+ // "Flatten the list by concatenating the ordered list of addresses for each
+ // of the endpoints, in order." - A61
+ for _, endpoint := range endpoints {
+ // "In the flattened list, interleave addresses from the two address
+ // families, as per RFC-8304 section 4." - A61
+ // TODO: support the above language.
+ addrs = append(addrs, endpoint.Addresses...)
+ }
+ } else {
+ // Endpoints not set, process addresses until we migrate resolver
+ // emissions fully to Endpoints. The top channel does wrap emitted
+ // addresses with endpoints, however some balancers such as weighted
+ // target do not forward the corresponding correct endpoints down/split
+ // endpoints properly. Once all balancers correctly forward endpoints
+ // down, can delete this else conditional.
+ addrs = state.ResolverState.Addresses
+ if cfg.ShuffleAddressList {
+ addrs = append([]resolver.Address{}, addrs...)
+ internal.RandShuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
+ }
+ }
+
+ if b.subConn != nil {
+ b.cc.UpdateAddresses(b.subConn, addrs)
+ return nil
+ }
+
+ var subConn balancer.SubConn
+ subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{
+ StateListener: func(state balancer.SubConnState) {
+ b.updateSubConnState(subConn, state)
+ },
+ })
+ if err != nil {
+ if b.logger.V(2) {
+ b.logger.Infof("Failed to create new SubConn: %v", err)
+ }
+ b.state = connectivity.TransientFailure
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: connectivity.TransientFailure,
+ Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
+ })
+ return balancer.ErrBadResolverState
+ }
+ b.subConn = subConn
+ b.state = connectivity.Idle
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: connectivity.Connecting,
+ Picker: &picker{err: balancer.ErrNoSubConnAvailable},
+ })
+ b.subConn.Connect()
+ return nil
+}
+
+// UpdateSubConnState is unused as a StateListener is always registered when
+// creating SubConns.
+func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
+ b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
+}
+
+func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
+ if b.logger.V(2) {
+ b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state)
+ }
+ if b.subConn != subConn {
+ if b.logger.V(2) {
+ b.logger.Infof("Ignored state change because subConn is not recognized")
+ }
+ return
+ }
+ if state.ConnectivityState == connectivity.Shutdown {
+ b.subConn = nil
+ return
+ }
+
+ switch state.ConnectivityState {
+ case connectivity.Ready:
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: state.ConnectivityState,
+ Picker: &picker{result: balancer.PickResult{SubConn: subConn}},
+ })
+ case connectivity.Connecting:
+ if b.state == connectivity.TransientFailure {
+ // We stay in TransientFailure until we are Ready. See A62.
+ return
+ }
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: state.ConnectivityState,
+ Picker: &picker{err: balancer.ErrNoSubConnAvailable},
+ })
+ case connectivity.Idle:
+ if b.state == connectivity.TransientFailure {
+ // We stay in TransientFailure until we are Ready. Also kick the
+ // subConn out of Idle into Connecting. See A62.
+ b.subConn.Connect()
+ return
+ }
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: state.ConnectivityState,
+ Picker: &idlePicker{subConn: subConn},
+ })
+ case connectivity.TransientFailure:
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: state.ConnectivityState,
+ Picker: &picker{err: state.ConnectionError},
+ })
+ }
+ b.state = state.ConnectivityState
+}
+
+func (b *pickfirstBalancer) Close() {
+}
+
+func (b *pickfirstBalancer) ExitIdle() {
+ if b.subConn != nil && b.state == connectivity.Idle {
+ b.subConn.Connect()
+ }
+}
+
+type picker struct {
+ result balancer.PickResult
+ err error
+}
+
+func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
+ return p.result, p.err
+}
+
+// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
+// CONNECTING when Pick is called.
+type idlePicker struct {
+ subConn balancer.SubConn
+}
+
+func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
+ i.subConn.Connect()
+ return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
+}
diff --git a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
new file mode 100644
index 0000000..9ffdd28
--- /dev/null
+++ b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
@@ -0,0 +1,913 @@
+/*
+ *
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package pickfirstleaf contains the pick_first load balancing policy which
+// will be the universal leaf policy after dualstack changes are implemented.
+//
+// # Experimental
+//
+// Notice: This package is EXPERIMENTAL and may be changed or removed in a
+// later release.
+package pickfirstleaf
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net"
+ "net/netip"
+ "sync"
+ "time"
+
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/balancer/pickfirst/internal"
+ "google.golang.org/grpc/connectivity"
+ expstats "google.golang.org/grpc/experimental/stats"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal/envconfig"
+ internalgrpclog "google.golang.org/grpc/internal/grpclog"
+ "google.golang.org/grpc/internal/pretty"
+ "google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/serviceconfig"
+)
+
+func init() {
+ if envconfig.NewPickFirstEnabled {
+ // Register as the default pick_first balancer.
+ Name = "pick_first"
+ }
+ balancer.Register(pickfirstBuilder{})
+}
+
+// enableHealthListenerKeyType is a unique key type used in resolver
+// attributes to indicate whether the health listener usage is enabled.
+type enableHealthListenerKeyType struct{}
+
+var (
+ logger = grpclog.Component("pick-first-leaf-lb")
+ // Name is the name of the pick_first_leaf balancer.
+ // It is changed to "pick_first" in init() if this balancer is to be
+ // registered as the default pickfirst.
+ Name = "pick_first_leaf"
+ disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
+ Name: "grpc.lb.pick_first.disconnections",
+ Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
+ Unit: "{disconnection}",
+ Labels: []string{"grpc.target"},
+ Default: false,
+ })
+ connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
+ Name: "grpc.lb.pick_first.connection_attempts_succeeded",
+ Description: "EXPERIMENTAL. Number of successful connection attempts.",
+ Unit: "{attempt}",
+ Labels: []string{"grpc.target"},
+ Default: false,
+ })
+ connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
+ Name: "grpc.lb.pick_first.connection_attempts_failed",
+ Description: "EXPERIMENTAL. Number of failed connection attempts.",
+ Unit: "{attempt}",
+ Labels: []string{"grpc.target"},
+ Default: false,
+ })
+)
+
+const (
+ // TODO: change to pick-first when this becomes the default pick_first policy.
+ logPrefix = "[pick-first-leaf-lb %p] "
+ // connectionDelayInterval is the time to wait for during the happy eyeballs
+ // pass before starting the next connection attempt.
+ connectionDelayInterval = 250 * time.Millisecond
+)
+
+type ipAddrFamily int
+
+const (
+ // ipAddrFamilyUnknown represents strings that can't be parsed as an IP
+ // address.
+ ipAddrFamilyUnknown ipAddrFamily = iota
+ ipAddrFamilyV4
+ ipAddrFamilyV6
+)
+
+type pickfirstBuilder struct{}
+
+func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions) balancer.Balancer {
+ b := &pickfirstBalancer{
+ cc: cc,
+ target: bo.Target.String(),
+ metricsRecorder: cc.MetricsRecorder(),
+
+ subConns: resolver.NewAddressMapV2[*scData](),
+ state: connectivity.Connecting,
+ cancelConnectionTimer: func() {},
+ }
+ b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
+ return b
+}
+
+func (b pickfirstBuilder) Name() string {
+ return Name
+}
+
+func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
+ var cfg pfConfig
+ if err := json.Unmarshal(js, &cfg); err != nil {
+ return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
+ }
+ return cfg, nil
+}
+
+// EnableHealthListener updates the state to configure pickfirst for using a
+// generic health listener.
+func EnableHealthListener(state resolver.State) resolver.State {
+ state.Attributes = state.Attributes.WithValue(enableHealthListenerKeyType{}, true)
+ return state
+}
+
+type pfConfig struct {
+ serviceconfig.LoadBalancingConfig `json:"-"`
+
+ // If set to true, instructs the LB policy to shuffle the order of the list
+ // of endpoints received from the name resolver before attempting to
+ // connect to them.
+ ShuffleAddressList bool `json:"shuffleAddressList"`
+}
+
+// scData keeps track of the current state of the subConn.
+// It is not safe for concurrent access.
+type scData struct {
+ // The following fields are initialized at build time and read-only after
+ // that.
+ subConn balancer.SubConn
+ addr resolver.Address
+
+ rawConnectivityState connectivity.State
+ // The effective connectivity state based on raw connectivity, health state
+ // and after following sticky TransientFailure behaviour defined in A62.
+ effectiveState connectivity.State
+ lastErr error
+ connectionFailedInFirstPass bool
+}
+
+func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
+ sd := &scData{
+ rawConnectivityState: connectivity.Idle,
+ effectiveState: connectivity.Idle,
+ addr: addr,
+ }
+ sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
+ StateListener: func(state balancer.SubConnState) {
+ b.updateSubConnState(sd, state)
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+ sd.subConn = sc
+ return sd, nil
+}
+
+type pickfirstBalancer struct {
+ // The following fields are initialized at build time and read-only after
+ // that and therefore do not need to be guarded by a mutex.
+ logger *internalgrpclog.PrefixLogger
+ cc balancer.ClientConn
+ target string
+ metricsRecorder expstats.MetricsRecorder // guaranteed to be non nil
+
+ // The mutex is used to ensure synchronization of updates triggered
+ // from the idle picker and the already serialized resolver,
+ // SubConn state updates.
+ mu sync.Mutex
+ // State reported to the channel based on SubConn states and resolver
+ // updates.
+ state connectivity.State
+ // scData for active subonns mapped by address.
+ subConns *resolver.AddressMapV2[*scData]
+ addressList addressList
+ firstPass bool
+ numTF int
+ cancelConnectionTimer func()
+ healthCheckingEnabled bool
+}
+
+// ResolverError is called by the ClientConn when the name resolver produces
+// an error or when pickfirst determined the resolver update to be invalid.
+func (b *pickfirstBalancer) ResolverError(err error) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ b.resolverErrorLocked(err)
+}
+
+func (b *pickfirstBalancer) resolverErrorLocked(err error) {
+ if b.logger.V(2) {
+ b.logger.Infof("Received error from the name resolver: %v", err)
+ }
+
+ // The picker will not change since the balancer does not currently
+ // report an error. If the balancer hasn't received a single good resolver
+ // update yet, transition to TRANSIENT_FAILURE.
+ if b.state != connectivity.TransientFailure && b.addressList.size() > 0 {
+ if b.logger.V(2) {
+ b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.")
+ }
+ return
+ }
+
+ b.updateBalancerState(balancer.State{
+ ConnectivityState: connectivity.TransientFailure,
+ Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
+ })
+}
+
+func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ b.cancelConnectionTimer()
+ if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
+ // Cleanup state pertaining to the previous resolver state.
+ // Treat an empty address list like an error by calling b.ResolverError.
+ b.closeSubConnsLocked()
+ b.addressList.updateAddrs(nil)
+ b.resolverErrorLocked(errors.New("produced zero addresses"))
+ return balancer.ErrBadResolverState
+ }
+ b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKeyType{}) != nil
+ cfg, ok := state.BalancerConfig.(pfConfig)
+ if state.BalancerConfig != nil && !ok {
+ return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState)
+ }
+
+ if b.logger.V(2) {
+ b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))
+ }
+
+ var newAddrs []resolver.Address
+ if endpoints := state.ResolverState.Endpoints; len(endpoints) != 0 {
+ // Perform the optional shuffling described in gRFC A62. The shuffling
+ // will change the order of endpoints but not touch the order of the
+ // addresses within each endpoint. - A61
+ if cfg.ShuffleAddressList {
+ endpoints = append([]resolver.Endpoint{}, endpoints...)
+ internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
+ }
+
+ // "Flatten the list by concatenating the ordered list of addresses for
+ // each of the endpoints, in order." - A61
+ for _, endpoint := range endpoints {
+ newAddrs = append(newAddrs, endpoint.Addresses...)
+ }
+ } else {
+ // Endpoints not set, process addresses until we migrate resolver
+ // emissions fully to Endpoints. The top channel does wrap emitted
+ // addresses with endpoints, however some balancers such as weighted
+ // target do not forward the corresponding correct endpoints down/split
+ // endpoints properly. Once all balancers correctly forward endpoints
+ // down, can delete this else conditional.
+ newAddrs = state.ResolverState.Addresses
+ if cfg.ShuffleAddressList {
+ newAddrs = append([]resolver.Address{}, newAddrs...)
+ internal.RandShuffle(len(newAddrs), func(i, j int) { newAddrs[i], newAddrs[j] = newAddrs[j], newAddrs[i] })
+ }
+ }
+
+ // If an address appears in multiple endpoints or in the same endpoint
+ // multiple times, we keep it only once. We will create only one SubConn
+ // for the address because an AddressMap is used to store SubConns.
+ // Not de-duplicating would result in attempting to connect to the same
+ // SubConn multiple times in the same pass. We don't want this.
+ newAddrs = deDupAddresses(newAddrs)
+ newAddrs = interleaveAddresses(newAddrs)
+
+ prevAddr := b.addressList.currentAddress()
+ prevSCData, found := b.subConns.Get(prevAddr)
+ prevAddrsCount := b.addressList.size()
+ isPrevRawConnectivityStateReady := found && prevSCData.rawConnectivityState == connectivity.Ready
+ b.addressList.updateAddrs(newAddrs)
+
+ // If the previous ready SubConn exists in new address list,
+ // keep this connection and don't create new SubConns.
+ if isPrevRawConnectivityStateReady && b.addressList.seekTo(prevAddr) {
+ return nil
+ }
+
+ b.reconcileSubConnsLocked(newAddrs)
+ // If it's the first resolver update or the balancer was already READY
+ // (but the new address list does not contain the ready SubConn) or
+ // CONNECTING, enter CONNECTING.
+ // We may be in TRANSIENT_FAILURE due to a previous empty address list,
+ // we should still enter CONNECTING because the sticky TF behaviour
+ // mentioned in A62 applies only when the TRANSIENT_FAILURE is reported
+ // due to connectivity failures.
+ if isPrevRawConnectivityStateReady || b.state == connectivity.Connecting || prevAddrsCount == 0 {
+ // Start connection attempt at first address.
+ b.forceUpdateConcludedStateLocked(balancer.State{
+ ConnectivityState: connectivity.Connecting,
+ Picker: &picker{err: balancer.ErrNoSubConnAvailable},
+ })
+ b.startFirstPassLocked()
+ } else if b.state == connectivity.TransientFailure {
+ // If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
+ // we're READY. See A62.
+ b.startFirstPassLocked()
+ }
+ return nil
+}
+
+// UpdateSubConnState is unused as a StateListener is always registered when
+// creating SubConns.
+func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
+ b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
+}
+
+func (b *pickfirstBalancer) Close() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ b.closeSubConnsLocked()
+ b.cancelConnectionTimer()
+ b.state = connectivity.Shutdown
+}
+
+// ExitIdle moves the balancer out of idle state. It can be called concurrently
+// by the idlePicker and clientConn so access to variables should be
+// synchronized.
+func (b *pickfirstBalancer) ExitIdle() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ if b.state == connectivity.Idle {
+ // Move the balancer into CONNECTING state immediately. This is done to
+ // avoid staying in IDLE if a resolver update arrives before the first
+ // SubConn reports CONNECTING.
+ b.updateBalancerState(balancer.State{
+ ConnectivityState: connectivity.Connecting,
+ Picker: &picker{err: balancer.ErrNoSubConnAvailable},
+ })
+ b.startFirstPassLocked()
+ }
+}
+
+func (b *pickfirstBalancer) startFirstPassLocked() {
+ b.firstPass = true
+ b.numTF = 0
+ // Reset the connection attempt record for existing SubConns.
+ for _, sd := range b.subConns.Values() {
+ sd.connectionFailedInFirstPass = false
+ }
+ b.requestConnectionLocked()
+}
+
+func (b *pickfirstBalancer) closeSubConnsLocked() {
+ for _, sd := range b.subConns.Values() {
+ sd.subConn.Shutdown()
+ }
+ b.subConns = resolver.NewAddressMapV2[*scData]()
+}
+
+// deDupAddresses ensures that each address appears only once in the slice.
+func deDupAddresses(addrs []resolver.Address) []resolver.Address {
+ seenAddrs := resolver.NewAddressMapV2[*scData]()
+ retAddrs := []resolver.Address{}
+
+ for _, addr := range addrs {
+ if _, ok := seenAddrs.Get(addr); ok {
+ continue
+ }
+ retAddrs = append(retAddrs, addr)
+ }
+ return retAddrs
+}
+
+// interleaveAddresses interleaves addresses of both families (IPv4 and IPv6)
+// as per RFC-8305 section 4.
+// Whichever address family is first in the list is followed by an address of
+// the other address family; that is, if the first address in the list is IPv6,
+// then the first IPv4 address should be moved up in the list to be second in
+// the list. It doesn't support configuring "First Address Family Count", i.e.
+// there will always be a single member of the first address family at the
+// beginning of the interleaved list.
+// Addresses that are neither IPv4 nor IPv6 are treated as part of a third
+// "unknown" family for interleaving.
+// See: https://datatracker.ietf.org/doc/html/rfc8305#autoid-6
+func interleaveAddresses(addrs []resolver.Address) []resolver.Address {
+ familyAddrsMap := map[ipAddrFamily][]resolver.Address{}
+ interleavingOrder := []ipAddrFamily{}
+ for _, addr := range addrs {
+ family := addressFamily(addr.Addr)
+ if _, found := familyAddrsMap[family]; !found {
+ interleavingOrder = append(interleavingOrder, family)
+ }
+ familyAddrsMap[family] = append(familyAddrsMap[family], addr)
+ }
+
+ interleavedAddrs := make([]resolver.Address, 0, len(addrs))
+
+ for curFamilyIdx := 0; len(interleavedAddrs) < len(addrs); curFamilyIdx = (curFamilyIdx + 1) % len(interleavingOrder) {
+ // Some IP types may have fewer addresses than others, so we look for
+ // the next type that has a remaining member to add to the interleaved
+ // list.
+ family := interleavingOrder[curFamilyIdx]
+ remainingMembers := familyAddrsMap[family]
+ if len(remainingMembers) > 0 {
+ interleavedAddrs = append(interleavedAddrs, remainingMembers[0])
+ familyAddrsMap[family] = remainingMembers[1:]
+ }
+ }
+
+ return interleavedAddrs
+}
+
+// addressFamily returns the ipAddrFamily after parsing the address string.
+// If the address isn't of the format "ip-address:port", it returns
+// ipAddrFamilyUnknown. The address may be valid even if it's not an IP when
+// using a resolver like passthrough where the address may be a hostname in
+// some format that the dialer can resolve.
+func addressFamily(address string) ipAddrFamily {
+ // Parse the IP after removing the port.
+ host, _, err := net.SplitHostPort(address)
+ if err != nil {
+ return ipAddrFamilyUnknown
+ }
+ ip, err := netip.ParseAddr(host)
+ if err != nil {
+ return ipAddrFamilyUnknown
+ }
+ switch {
+ case ip.Is4() || ip.Is4In6():
+ return ipAddrFamilyV4
+ case ip.Is6():
+ return ipAddrFamilyV6
+ default:
+ return ipAddrFamilyUnknown
+ }
+}
+
+// reconcileSubConnsLocked updates the active subchannels based on a new address
+// list from the resolver. It does this by:
+// - closing subchannels: any existing subchannels associated with addresses
+// that are no longer in the updated list are shut down.
+// - removing subchannels: entries for these closed subchannels are removed
+// from the subchannel map.
+//
+// This ensures that the subchannel map accurately reflects the current set of
+// addresses received from the name resolver.
+func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address) {
+ newAddrsMap := resolver.NewAddressMapV2[bool]()
+ for _, addr := range newAddrs {
+ newAddrsMap.Set(addr, true)
+ }
+
+ for _, oldAddr := range b.subConns.Keys() {
+ if _, ok := newAddrsMap.Get(oldAddr); ok {
+ continue
+ }
+ val, _ := b.subConns.Get(oldAddr)
+ val.subConn.Shutdown()
+ b.subConns.Delete(oldAddr)
+ }
+}
+
+// shutdownRemainingLocked shuts down remaining subConns. Called when a subConn
+// becomes ready, which means that all other subConn must be shutdown.
+func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) {
+ b.cancelConnectionTimer()
+ for _, sd := range b.subConns.Values() {
+ if sd.subConn != selected.subConn {
+ sd.subConn.Shutdown()
+ }
+ }
+ b.subConns = resolver.NewAddressMapV2[*scData]()
+ b.subConns.Set(selected.addr, selected)
+}
+
+// requestConnectionLocked starts connecting on the subchannel corresponding to
+// the current address. If no subchannel exists, one is created. If the current
+// subchannel is in TransientFailure, a connection to the next address is
+// attempted until a subchannel is found.
+func (b *pickfirstBalancer) requestConnectionLocked() {
+ if !b.addressList.isValid() {
+ return
+ }
+ var lastErr error
+ for valid := true; valid; valid = b.addressList.increment() {
+ curAddr := b.addressList.currentAddress()
+ sd, ok := b.subConns.Get(curAddr)
+ if !ok {
+ var err error
+ // We want to assign the new scData to sd from the outer scope,
+ // hence we can't use := below.
+ sd, err = b.newSCData(curAddr)
+ if err != nil {
+ // This should never happen, unless the clientConn is being shut
+ // down.
+ if b.logger.V(2) {
+ b.logger.Infof("Failed to create a subConn for address %v: %v", curAddr.String(), err)
+ }
+ // Do nothing, the LB policy will be closed soon.
+ return
+ }
+ b.subConns.Set(curAddr, sd)
+ }
+
+ switch sd.rawConnectivityState {
+ case connectivity.Idle:
+ sd.subConn.Connect()
+ b.scheduleNextConnectionLocked()
+ return
+ case connectivity.TransientFailure:
+ // The SubConn is being re-used and failed during a previous pass
+ // over the addressList. It has not completed backoff yet.
+ // Mark it as having failed and try the next address.
+ sd.connectionFailedInFirstPass = true
+ lastErr = sd.lastErr
+ continue
+ case connectivity.Connecting:
+ // Wait for the connection attempt to complete or the timer to fire
+ // before attempting the next address.
+ b.scheduleNextConnectionLocked()
+ return
+ default:
+ b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", sd.rawConnectivityState)
+ return
+
+ }
+ }
+
+ // All the remaining addresses in the list are in TRANSIENT_FAILURE, end the
+ // first pass if possible.
+ b.endFirstPassIfPossibleLocked(lastErr)
+}
+
+func (b *pickfirstBalancer) scheduleNextConnectionLocked() {
+ b.cancelConnectionTimer()
+ if !b.addressList.hasNext() {
+ return
+ }
+ curAddr := b.addressList.currentAddress()
+ cancelled := false // Access to this is protected by the balancer's mutex.
+ closeFn := internal.TimeAfterFunc(connectionDelayInterval, func() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ // If the scheduled task is cancelled while acquiring the mutex, return.
+ if cancelled {
+ return
+ }
+ if b.logger.V(2) {
+ b.logger.Infof("Happy Eyeballs timer expired while waiting for connection to %q.", curAddr.Addr)
+ }
+ if b.addressList.increment() {
+ b.requestConnectionLocked()
+ }
+ })
+ // Access to the cancellation callback held by the balancer is guarded by
+ // the balancer's mutex, so it's safe to set the boolean from the callback.
+ b.cancelConnectionTimer = sync.OnceFunc(func() {
+ cancelled = true
+ closeFn()
+ })
+}
+
+func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ oldState := sd.rawConnectivityState
+ sd.rawConnectivityState = newState.ConnectivityState
+ // Previously relevant SubConns can still callback with state updates.
+ // To prevent pickers from returning these obsolete SubConns, this logic
+ // is included to check if the current list of active SubConns includes this
+ // SubConn.
+ if !b.isActiveSCData(sd) {
+ return
+ }
+ if newState.ConnectivityState == connectivity.Shutdown {
+ sd.effectiveState = connectivity.Shutdown
+ return
+ }
+
+ // Record a connection attempt when exiting CONNECTING.
+ if newState.ConnectivityState == connectivity.TransientFailure {
+ sd.connectionFailedInFirstPass = true
+ connectionAttemptsFailedMetric.Record(b.metricsRecorder, 1, b.target)
+ }
+
+ if newState.ConnectivityState == connectivity.Ready {
+ connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target)
+ b.shutdownRemainingLocked(sd)
+ if !b.addressList.seekTo(sd.addr) {
+ // This should not fail as we should have only one SubConn after
+ // entering READY. The SubConn should be present in the addressList.
+ b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
+ return
+ }
+ if !b.healthCheckingEnabled {
+ if b.logger.V(2) {
+ b.logger.Infof("SubConn %p reported connectivity state READY and the health listener is disabled. Transitioning SubConn to READY.", sd.subConn)
+ }
+
+ sd.effectiveState = connectivity.Ready
+ b.updateBalancerState(balancer.State{
+ ConnectivityState: connectivity.Ready,
+ Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
+ })
+ return
+ }
+ if b.logger.V(2) {
+ b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn)
+ }
+ // Send a CONNECTING update to take the SubConn out of sticky-TF if
+ // required.
+ sd.effectiveState = connectivity.Connecting
+ b.updateBalancerState(balancer.State{
+ ConnectivityState: connectivity.Connecting,
+ Picker: &picker{err: balancer.ErrNoSubConnAvailable},
+ })
+ sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) {
+ b.updateSubConnHealthState(sd, scs)
+ })
+ return
+ }
+
+ // If the LB policy is READY, and it receives a subchannel state change,
+ // it means that the READY subchannel has failed.
+ // A SubConn can also transition from CONNECTING directly to IDLE when
+ // a transport is successfully created, but the connection fails
+ // before the SubConn can send the notification for READY. We treat
+ // this as a successful connection and transition to IDLE.
+ // TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
+ // part of the if condition below once the issue is fixed.
+ if oldState == connectivity.Ready || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
+ // Once a transport fails, the balancer enters IDLE and starts from
+ // the first address when the picker is used.
+ b.shutdownRemainingLocked(sd)
+ sd.effectiveState = newState.ConnectivityState
+ // READY SubConn interspliced in between CONNECTING and IDLE, need to
+ // account for that.
+ if oldState == connectivity.Connecting {
+ // A known issue (https://github.com/grpc/grpc-go/issues/7862)
+ // causes a race that prevents the READY state change notification.
+ // This works around it.
+ connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target)
+ }
+ disconnectionsMetric.Record(b.metricsRecorder, 1, b.target)
+ b.addressList.reset()
+ b.updateBalancerState(balancer.State{
+ ConnectivityState: connectivity.Idle,
+ Picker: &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)},
+ })
+ return
+ }
+
+ if b.firstPass {
+ switch newState.ConnectivityState {
+ case connectivity.Connecting:
+ // The effective state can be in either IDLE, CONNECTING or
+ // TRANSIENT_FAILURE. If it's TRANSIENT_FAILURE, stay in
+ // TRANSIENT_FAILURE until it's READY. See A62.
+ if sd.effectiveState != connectivity.TransientFailure {
+ sd.effectiveState = connectivity.Connecting
+ b.updateBalancerState(balancer.State{
+ ConnectivityState: connectivity.Connecting,
+ Picker: &picker{err: balancer.ErrNoSubConnAvailable},
+ })
+ }
+ case connectivity.TransientFailure:
+ sd.lastErr = newState.ConnectionError
+ sd.effectiveState = connectivity.TransientFailure
+ // Since we're re-using common SubConns while handling resolver
+ // updates, we could receive an out of turn TRANSIENT_FAILURE from
+ // a pass over the previous address list. Happy Eyeballs will also
+ // cause out of order updates to arrive.
+
+ if curAddr := b.addressList.currentAddress(); equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
+ b.cancelConnectionTimer()
+ if b.addressList.increment() {
+ b.requestConnectionLocked()
+ return
+ }
+ }
+
+ // End the first pass if we've seen a TRANSIENT_FAILURE from all
+ // SubConns once.
+ b.endFirstPassIfPossibleLocked(newState.ConnectionError)
+ }
+ return
+ }
+
+ // We have finished the first pass, keep re-connecting failing SubConns.
+ switch newState.ConnectivityState {
+ case connectivity.TransientFailure:
+ b.numTF = (b.numTF + 1) % b.subConns.Len()
+ sd.lastErr = newState.ConnectionError
+ if b.numTF%b.subConns.Len() == 0 {
+ b.updateBalancerState(balancer.State{
+ ConnectivityState: connectivity.TransientFailure,
+ Picker: &picker{err: newState.ConnectionError},
+ })
+ }
+ // We don't need to request re-resolution since the SubConn already
+ // does that before reporting TRANSIENT_FAILURE.
+ // TODO: #7534 - Move re-resolution requests from SubConn into
+ // pick_first.
+ case connectivity.Idle:
+ sd.subConn.Connect()
+ }
+}
+
+// endFirstPassIfPossibleLocked ends the first happy-eyeballs pass if all the
+// addresses are tried and their SubConns have reported a failure.
+func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
+ // An optimization to avoid iterating over the entire SubConn map.
+ if b.addressList.isValid() {
+ return
+ }
+ // Connect() has been called on all the SubConns. The first pass can be
+ // ended if all the SubConns have reported a failure.
+ for _, sd := range b.subConns.Values() {
+ if !sd.connectionFailedInFirstPass {
+ return
+ }
+ }
+ b.firstPass = false
+ b.updateBalancerState(balancer.State{
+ ConnectivityState: connectivity.TransientFailure,
+ Picker: &picker{err: lastErr},
+ })
+ // Start re-connecting all the SubConns that are already in IDLE.
+ for _, sd := range b.subConns.Values() {
+ if sd.rawConnectivityState == connectivity.Idle {
+ sd.subConn.Connect()
+ }
+ }
+}
+
+func (b *pickfirstBalancer) isActiveSCData(sd *scData) bool {
+ activeSD, found := b.subConns.Get(sd.addr)
+ return found && activeSD == sd
+}
+
+func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ // Previously relevant SubConns can still callback with state updates.
+ // To prevent pickers from returning these obsolete SubConns, this logic
+ // is included to check if the current list of active SubConns includes
+ // this SubConn.
+ if !b.isActiveSCData(sd) {
+ return
+ }
+ sd.effectiveState = state.ConnectivityState
+ switch state.ConnectivityState {
+ case connectivity.Ready:
+ b.updateBalancerState(balancer.State{
+ ConnectivityState: connectivity.Ready,
+ Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
+ })
+ case connectivity.TransientFailure:
+ b.updateBalancerState(balancer.State{
+ ConnectivityState: connectivity.TransientFailure,
+ Picker: &picker{err: fmt.Errorf("pickfirst: health check failure: %v", state.ConnectionError)},
+ })
+ case connectivity.Connecting:
+ b.updateBalancerState(balancer.State{
+ ConnectivityState: connectivity.Connecting,
+ Picker: &picker{err: balancer.ErrNoSubConnAvailable},
+ })
+ default:
+ b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state)
+ }
+}
+
+// updateBalancerState stores the state reported to the channel and calls
+// ClientConn.UpdateState(). As an optimization, it avoids sending duplicate
+// updates to the channel.
+func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) {
+ // In case of TransientFailures allow the picker to be updated to update
+ // the connectivity error, in all other cases don't send duplicate state
+ // updates.
+ if newState.ConnectivityState == b.state && b.state != connectivity.TransientFailure {
+ return
+ }
+ b.forceUpdateConcludedStateLocked(newState)
+}
+
+// forceUpdateConcludedStateLocked stores the state reported to the channel and
+// calls ClientConn.UpdateState().
+// A separate function is defined to force update the ClientConn state since the
+// channel doesn't correctly assume that LB policies start in CONNECTING and
+// relies on LB policy to send an initial CONNECTING update.
+func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) {
+ b.state = newState.ConnectivityState
+ b.cc.UpdateState(newState)
+}
+
+type picker struct {
+ result balancer.PickResult
+ err error
+}
+
+func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
+ return p.result, p.err
+}
+
+// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
+// CONNECTING when Pick is called.
+type idlePicker struct {
+ exitIdle func()
+}
+
+func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
+ i.exitIdle()
+ return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
+}
+
+// addressList manages sequentially iterating over addresses present in a list
+// of endpoints. It provides a 1 dimensional view of the addresses present in
+// the endpoints.
+// This type is not safe for concurrent access.
+type addressList struct {
+ addresses []resolver.Address
+ idx int
+}
+
+func (al *addressList) isValid() bool {
+ return al.idx < len(al.addresses)
+}
+
+func (al *addressList) size() int {
+ return len(al.addresses)
+}
+
+// increment moves to the next index in the address list.
+// This method returns false if it went off the list, true otherwise.
+func (al *addressList) increment() bool {
+ if !al.isValid() {
+ return false
+ }
+ al.idx++
+ return al.idx < len(al.addresses)
+}
+
+// currentAddress returns the current address pointed to in the addressList.
+// If the list is in an invalid state, it returns an empty address instead.
+func (al *addressList) currentAddress() resolver.Address {
+ if !al.isValid() {
+ return resolver.Address{}
+ }
+ return al.addresses[al.idx]
+}
+
+func (al *addressList) reset() {
+ al.idx = 0
+}
+
+func (al *addressList) updateAddrs(addrs []resolver.Address) {
+ al.addresses = addrs
+ al.reset()
+}
+
+// seekTo returns false if the needle was not found and the current index was
+// left unchanged.
+func (al *addressList) seekTo(needle resolver.Address) bool {
+ for ai, addr := range al.addresses {
+ if !equalAddressIgnoringBalAttributes(&addr, &needle) {
+ continue
+ }
+ al.idx = ai
+ return true
+ }
+ return false
+}
+
+// hasNext returns whether incrementing the addressList will result in moving
+// past the end of the list. If the list has already moved past the end, it
+// returns false.
+func (al *addressList) hasNext() bool {
+ if !al.isValid() {
+ return false
+ }
+ return al.idx+1 < len(al.addresses)
+}
+
+// equalAddressIgnoringBalAttributes returns true is a and b are considered
+// equal. This is different from the Equal method on the resolver.Address type
+// which considers all fields to determine equality. Here, we only consider
+// fields that are meaningful to the SubConn.
+func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
+ return a.Addr == b.Addr && a.ServerName == b.ServerName &&
+ a.Attributes.Equal(b.Attributes)
+}
diff --git a/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go b/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go
index 29f7a4d..22045bf 100644
--- a/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go
+++ b/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go
@@ -22,62 +22,51 @@
package roundrobin
import (
- "context"
- "sync"
+ "fmt"
"google.golang.org/grpc/balancer"
- "google.golang.org/grpc/balancer/base"
+ "google.golang.org/grpc/balancer/endpointsharding"
+ "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal/grpcrand"
- "google.golang.org/grpc/resolver"
+ internalgrpclog "google.golang.org/grpc/internal/grpclog"
)
// Name is the name of round_robin balancer.
const Name = "round_robin"
-// newBuilder creates a new roundrobin balancer builder.
-func newBuilder() balancer.Builder {
- return base.NewBalancerBuilderWithConfig(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
-}
+var logger = grpclog.Component("roundrobin")
func init() {
- balancer.Register(newBuilder())
+ balancer.Register(builder{})
}
-type rrPickerBuilder struct{}
+type builder struct{}
-func (*rrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
- grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
- if len(readySCs) == 0 {
- return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
- }
- var scs []balancer.SubConn
- for _, sc := range readySCs {
- scs = append(scs, sc)
- }
- return &rrPicker{
- subConns: scs,
- // Start at a random index, as the same RR balancer rebuilds a new
- // picker when SubConn states change, and we don't want to apply excess
- // load to the first server in the list.
- next: grpcrand.Intn(len(scs)),
- }
+func (bb builder) Name() string {
+ return Name
}
-type rrPicker struct {
- // subConns is the snapshot of the roundrobin balancer when this picker was
- // created. The slice is immutable. Each Get() will do a round robin
- // selection from it and return the selected SubConn.
- subConns []balancer.SubConn
-
- mu sync.Mutex
- next int
+func (bb builder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
+ childBuilder := balancer.Get(pickfirstleaf.Name).Build
+ bal := &rrBalancer{
+ cc: cc,
+ Balancer: endpointsharding.NewBalancer(cc, opts, childBuilder, endpointsharding.Options{}),
+ }
+ bal.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[%p] ", bal))
+ bal.logger.Infof("Created")
+ return bal
}
-func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
- p.mu.Lock()
- sc := p.subConns[p.next]
- p.next = (p.next + 1) % len(p.subConns)
- p.mu.Unlock()
- return sc, nil, nil
+type rrBalancer struct {
+ balancer.Balancer
+ cc balancer.ClientConn
+ logger *internalgrpclog.PrefixLogger
+}
+
+func (b *rrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
+ return b.Balancer.UpdateClientConnState(balancer.ClientConnState{
+ // Enable the health listener in pickfirst children for client side health
+ // checks and outlier detection, if configured.
+ ResolverState: pickfirstleaf.EnableHealthListener(ccs.ResolverState),
+ })
}
diff --git a/vendor/google.golang.org/grpc/balancer/subconn.go b/vendor/google.golang.org/grpc/balancer/subconn.go
new file mode 100644
index 0000000..9ee44d4
--- /dev/null
+++ b/vendor/google.golang.org/grpc/balancer/subconn.go
@@ -0,0 +1,134 @@
+/*
+ *
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package balancer
+
+import (
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/internal"
+ "google.golang.org/grpc/resolver"
+)
+
+// A SubConn represents a single connection to a gRPC backend service.
+//
+// All SubConns start in IDLE, and will not try to connect. To trigger a
+// connection attempt, Balancers must call Connect.
+//
+// If the connection attempt fails, the SubConn will transition to
+// TRANSIENT_FAILURE for a backoff period, and then return to IDLE. If the
+// connection attempt succeeds, it will transition to READY.
+//
+// If a READY SubConn becomes disconnected, the SubConn will transition to IDLE.
+//
+// If a connection re-enters IDLE, Balancers must call Connect again to trigger
+// a new connection attempt.
+//
+// Each SubConn contains a list of addresses. gRPC will try to connect to the
+// addresses in sequence, and stop trying the remainder once the first
+// connection is successful. However, this behavior is deprecated. SubConns
+// should only use a single address.
+//
+// NOTICE: This interface is intended to be implemented by gRPC, or intercepted
+// by custom load balancing polices. Users should not need their own complete
+// implementation of this interface -- they should always delegate to a SubConn
+// returned by ClientConn.NewSubConn() by embedding it in their implementations.
+// An embedded SubConn must never be nil, or runtime panics will occur.
+type SubConn interface {
+ // UpdateAddresses updates the addresses used in this SubConn.
+ // gRPC checks if currently-connected address is still in the new list.
+ // If it's in the list, the connection will be kept.
+ // If it's not in the list, the connection will gracefully close, and
+ // a new connection will be created.
+ //
+ // This will trigger a state transition for the SubConn.
+ //
+ // Deprecated: this method will be removed. Create new SubConns for new
+ // addresses instead.
+ UpdateAddresses([]resolver.Address)
+ // Connect starts the connecting for this SubConn.
+ Connect()
+ // GetOrBuildProducer returns a reference to the existing Producer for this
+ // ProducerBuilder in this SubConn, or, if one does not currently exist,
+ // creates a new one and returns it. Returns a close function which may be
+ // called when the Producer is no longer needed. Otherwise the producer
+ // will automatically be closed upon connection loss or subchannel close.
+ // Should only be called on a SubConn in state Ready. Otherwise the
+ // producer will be unable to create streams.
+ GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
+ // Shutdown shuts down the SubConn gracefully. Any started RPCs will be
+ // allowed to complete. No future calls should be made on the SubConn.
+ // One final state update will be delivered to the StateListener (or
+ // UpdateSubConnState; deprecated) with ConnectivityState of Shutdown to
+ // indicate the shutdown operation. This may be delivered before
+ // in-progress RPCs are complete and the actual connection is closed.
+ Shutdown()
+ // RegisterHealthListener registers a health listener that receives health
+ // updates for a Ready SubConn. Only one health listener can be registered
+ // at a time. A health listener should be registered each time the SubConn's
+ // connectivity state changes to READY. Registering a health listener when
+ // the connectivity state is not READY may result in undefined behaviour.
+ // This method must not be called synchronously while handling an update
+ // from a previously registered health listener.
+ RegisterHealthListener(func(SubConnState))
+ // EnforceSubConnEmbedding is included to force implementers to embed
+ // another implementation of this interface, allowing gRPC to add methods
+ // without breaking users.
+ internal.EnforceSubConnEmbedding
+}
+
+// A ProducerBuilder is a simple constructor for a Producer. It is used by the
+// SubConn to create producers when needed.
+type ProducerBuilder interface {
+ // Build creates a Producer. The first parameter is always a
+ // grpc.ClientConnInterface (a type to allow creating RPCs/streams on the
+ // associated SubConn), but is declared as `any` to avoid a dependency
+ // cycle. Build also returns a close function that will be called when all
+ // references to the Producer have been given up for a SubConn, or when a
+ // connectivity state change occurs on the SubConn. The close function
+ // should always block until all asynchronous cleanup work is completed.
+ Build(grpcClientConnInterface any) (p Producer, close func())
+}
+
+// SubConnState describes the state of a SubConn.
+type SubConnState struct {
+ // ConnectivityState is the connectivity state of the SubConn.
+ ConnectivityState connectivity.State
+ // ConnectionError is set if the ConnectivityState is TransientFailure,
+ // describing the reason the SubConn failed. Otherwise, it is nil.
+ ConnectionError error
+ // connectedAddr contains the connected address when ConnectivityState is
+ // Ready. Otherwise, it is indeterminate.
+ connectedAddress resolver.Address
+}
+
+// connectedAddress returns the connected address for a SubConnState. The
+// address is only valid if the state is READY.
+func connectedAddress(scs SubConnState) resolver.Address {
+ return scs.connectedAddress
+}
+
+// setConnectedAddress sets the connected address for a SubConnState.
+func setConnectedAddress(scs *SubConnState, addr resolver.Address) {
+ scs.connectedAddress = addr
+}
+
+// A Producer is a type shared among potentially many consumers. It is
+// associated with a SubConn, and an implementation will typically contain
+// other methods to provide additional functionality, e.g. configuration or
+// subscription registration.
+type Producer any