[VOL-5291] On demand statistics for ONU and OLT
Change-Id: I4850bb0f0d2235122cb0c1bcf835b3672bb34436
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index f4ea617..04b9ad4 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -19,189 +19,331 @@
package grpc
import (
+ "context"
"fmt"
+ "strings"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/internal/buffer"
+ "google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)
-// scStateUpdate contains the subConn and the new state it changed to.
-type scStateUpdate struct {
- sc balancer.SubConn
- state connectivity.State
- err error
-}
+type ccbMode int
-// exitIdle contains no data and is just a signal sent on the updateCh in
-// ccBalancerWrapper to instruct the balancer to exit idle.
-type exitIdle struct{}
+const (
+ ccbModeActive = iota
+ ccbModeIdle
+ ccbModeClosed
+ ccbModeExitingIdle
+)
-// ccBalancerWrapper is a wrapper on top of cc for balancers.
-// It implements balancer.ClientConn interface.
+// ccBalancerWrapper sits between the ClientConn and the Balancer.
+//
+// ccBalancerWrapper implements methods corresponding to the ones on the
+// balancer.Balancer interface. The ClientConn is free to call these methods
+// concurrently and the ccBalancerWrapper ensures that calls from the ClientConn
+// to the Balancer happen synchronously and in order.
+//
+// ccBalancerWrapper also implements the balancer.ClientConn interface and is
+// passed to the Balancer implementations. It invokes unexported methods on the
+// ClientConn to handle these calls from the Balancer.
+//
+// It uses the gracefulswitch.Balancer internally to ensure that balancer
+// switches happen in a graceful manner.
type ccBalancerWrapper struct {
- cc *ClientConn
- balancerMu sync.Mutex // synchronizes calls to the balancer
- balancer balancer.Balancer
- hasExitIdle bool
- updateCh *buffer.Unbounded
- closed *grpcsync.Event
- done *grpcsync.Event
+ // The following fields are initialized when the wrapper is created and are
+ // read-only afterwards, and therefore can be accessed without a mutex.
+ cc *ClientConn
+ opts balancer.BuildOptions
- mu sync.Mutex
- subConns map[*acBalancerWrapper]struct{}
+ // Outgoing (gRPC --> balancer) calls are guaranteed to execute in a
+ // mutually exclusive manner as they are scheduled in the serializer. Fields
+ // accessed *only* in these serializer callbacks, can therefore be accessed
+ // without a mutex.
+ balancer *gracefulswitch.Balancer
+ curBalancerName string
+
+ // mu guards access to the below fields. Access to the serializer and its
+ // cancel function needs to be mutex protected because they are overwritten
+ // when the wrapper exits idle mode.
+ mu sync.Mutex
+ serializer *grpcsync.CallbackSerializer // To serialize all outoing calls.
+ serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time.
+ mode ccbMode // Tracks the current mode of the wrapper.
}
-func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
+// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
+// is not created until the switchTo() method is invoked.
+func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
+ ctx, cancel := context.WithCancel(context.Background())
ccb := &ccBalancerWrapper{
- cc: cc,
- updateCh: buffer.NewUnbounded(),
- closed: grpcsync.NewEvent(),
- done: grpcsync.NewEvent(),
- subConns: make(map[*acBalancerWrapper]struct{}),
+ cc: cc,
+ opts: bopts,
+ serializer: grpcsync.NewCallbackSerializer(ctx),
+ serializerCancel: cancel,
}
- go ccb.watcher()
- ccb.balancer = b.Build(ccb, bopts)
- _, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)
+ ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
return ccb
}
-// watcher balancer functions sequentially, so the balancer can be implemented
-// lock-free.
-func (ccb *ccBalancerWrapper) watcher() {
- for {
- select {
- case t := <-ccb.updateCh.Get():
- ccb.updateCh.Load()
- if ccb.closed.HasFired() {
- break
- }
- switch u := t.(type) {
- case *scStateUpdate:
- ccb.balancerMu.Lock()
- ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})
- ccb.balancerMu.Unlock()
- case *acBalancerWrapper:
- ccb.mu.Lock()
- if ccb.subConns != nil {
- delete(ccb.subConns, u)
- ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
- }
- ccb.mu.Unlock()
- case exitIdle:
- if ccb.cc.GetState() == connectivity.Idle {
- if ei, ok := ccb.balancer.(balancer.ExitIdler); ok {
- // We already checked that the balancer implements
- // ExitIdle before pushing the event to updateCh, but
- // check conditionally again as defensive programming.
- ccb.balancerMu.Lock()
- ei.ExitIdle()
- ccb.balancerMu.Unlock()
- }
- }
- default:
- logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
- }
- case <-ccb.closed.Done():
- }
-
- if ccb.closed.HasFired() {
- ccb.balancerMu.Lock()
- ccb.balancer.Close()
- ccb.balancerMu.Unlock()
- ccb.mu.Lock()
- scs := ccb.subConns
- ccb.subConns = nil
- ccb.mu.Unlock()
- ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
- ccb.done.Fire()
- // Fire done before removing the addr conns. We can safely unblock
- // ccb.close and allow the removeAddrConns to happen
- // asynchronously.
- for acbw := range scs {
- ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
- }
- return
- }
- }
-}
-
-func (ccb *ccBalancerWrapper) close() {
- ccb.closed.Fire()
- <-ccb.done.Done()
-}
-
-func (ccb *ccBalancerWrapper) exitIdle() bool {
- if !ccb.hasExitIdle {
- return false
- }
- ccb.updateCh.Put(exitIdle{})
- return true
-}
-
-func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
- // When updating addresses for a SubConn, if the address in use is not in
- // the new addresses, the old ac will be tearDown() and a new ac will be
- // created. tearDown() generates a state change with Shutdown state, we
- // don't want the balancer to receive this state change. So before
- // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
- // this function will be called with (nil, Shutdown). We don't need to call
- // balancer method in this case.
- if sc == nil {
- return
- }
- ccb.updateCh.Put(&scStateUpdate{
- sc: sc,
- state: s,
- err: err,
- })
-}
-
+// updateClientConnState is invoked by grpc to push a ClientConnState update to
+// the underlying balancer.
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
- ccb.balancerMu.Lock()
- defer ccb.balancerMu.Unlock()
- return ccb.balancer.UpdateClientConnState(*ccs)
+ ccb.mu.Lock()
+ errCh := make(chan error, 1)
+ // Here and everywhere else where Schedule() is called, it is done with the
+ // lock held. But the lock guards only the scheduling part. The actual
+ // callback is called asynchronously without the lock being held.
+ ok := ccb.serializer.Schedule(func(_ context.Context) {
+ // If the addresses specified in the update contain addresses of type
+ // "grpclb" and the selected LB policy is not "grpclb", these addresses
+ // will be filtered out and ccs will be modified with the updated
+ // address list.
+ if ccb.curBalancerName != grpclbName {
+ var addrs []resolver.Address
+ for _, addr := range ccs.ResolverState.Addresses {
+ if addr.Type == resolver.GRPCLB {
+ continue
+ }
+ addrs = append(addrs, addr)
+ }
+ ccs.ResolverState.Addresses = addrs
+ }
+ errCh <- ccb.balancer.UpdateClientConnState(*ccs)
+ })
+ if !ok {
+ // If we are unable to schedule a function with the serializer, it
+ // indicates that it has been closed. A serializer is only closed when
+ // the wrapper is closed or is in idle.
+ ccb.mu.Unlock()
+ return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer")
+ }
+ ccb.mu.Unlock()
+
+ // We get here only if the above call to Schedule succeeds, in which case it
+ // is guaranteed that the scheduled function will run. Therefore it is safe
+ // to block on this channel.
+ err := <-errCh
+ if logger.V(2) && err != nil {
+ logger.Infof("error from balancer.UpdateClientConnState: %v", err)
+ }
+ return err
+}
+
+// updateSubConnState is invoked by grpc to push a subConn state update to the
+// underlying balancer.
+func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
+ ccb.mu.Lock()
+ ccb.serializer.Schedule(func(_ context.Context) {
+ ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
+ })
+ ccb.mu.Unlock()
}
func (ccb *ccBalancerWrapper) resolverError(err error) {
- ccb.balancerMu.Lock()
- defer ccb.balancerMu.Unlock()
- ccb.balancer.ResolverError(err)
+ ccb.mu.Lock()
+ ccb.serializer.Schedule(func(_ context.Context) {
+ ccb.balancer.ResolverError(err)
+ })
+ ccb.mu.Unlock()
+}
+
+// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
+// LB policy identified by name.
+//
+// ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the
+// first good update from the name resolver, it determines the LB policy to use
+// and invokes the switchTo() method. Upon receipt of every subsequent update
+// from the name resolver, it invokes this method.
+//
+// the ccBalancerWrapper keeps track of the current LB policy name, and skips
+// the graceful balancer switching process if the name does not change.
+func (ccb *ccBalancerWrapper) switchTo(name string) {
+ ccb.mu.Lock()
+ ccb.serializer.Schedule(func(_ context.Context) {
+ // TODO: Other languages use case-sensitive balancer registries. We should
+ // switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
+ if strings.EqualFold(ccb.curBalancerName, name) {
+ return
+ }
+ ccb.buildLoadBalancingPolicy(name)
+ })
+ ccb.mu.Unlock()
+}
+
+// buildLoadBalancingPolicy performs the following:
+// - retrieve a balancer builder for the given name. Use the default LB
+// policy, pick_first, if no LB policy with name is found in the registry.
+// - instruct the gracefulswitch balancer to switch to the above builder. This
+// will actually build the new balancer.
+// - update the `curBalancerName` field
+//
+// Must be called from a serializer callback.
+func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {
+ builder := balancer.Get(name)
+ if builder == nil {
+ channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
+ builder = newPickfirstBuilder()
+ } else {
+ channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
+ }
+
+ if err := ccb.balancer.SwitchTo(builder); err != nil {
+ channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
+ return
+ }
+ ccb.curBalancerName = builder.Name()
+}
+
+func (ccb *ccBalancerWrapper) close() {
+ channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing")
+ ccb.closeBalancer(ccbModeClosed)
+}
+
+// enterIdleMode is invoked by grpc when the channel enters idle mode upon
+// expiry of idle_timeout. This call blocks until the balancer is closed.
+func (ccb *ccBalancerWrapper) enterIdleMode() {
+ channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode")
+ ccb.closeBalancer(ccbModeIdle)
+}
+
+// closeBalancer is invoked when the channel is being closed or when it enters
+// idle mode upon expiry of idle_timeout.
+func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {
+ ccb.mu.Lock()
+ if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle {
+ ccb.mu.Unlock()
+ return
+ }
+
+ ccb.mode = m
+ done := ccb.serializer.Done
+ b := ccb.balancer
+ ok := ccb.serializer.Schedule(func(_ context.Context) {
+ // Close the serializer to ensure that no more calls from gRPC are sent
+ // to the balancer.
+ ccb.serializerCancel()
+ // Empty the current balancer name because we don't have a balancer
+ // anymore and also so that we act on the next call to switchTo by
+ // creating a new balancer specified by the new resolver.
+ ccb.curBalancerName = ""
+ })
+ if !ok {
+ ccb.mu.Unlock()
+ return
+ }
+ ccb.mu.Unlock()
+
+ // Give enqueued callbacks a chance to finish.
+ <-done
+ // Spawn a goroutine to close the balancer (since it may block trying to
+ // cleanup all allocated resources) and return early.
+ go b.Close()
+}
+
+// exitIdleMode is invoked by grpc when the channel exits idle mode either
+// because of an RPC or because of an invocation of the Connect() API. This
+// recreates the balancer that was closed previously when entering idle mode.
+//
+// If the channel is not in idle mode, we know for a fact that we are here as a
+// result of the user calling the Connect() method on the ClientConn. In this
+// case, we can simply forward the call to the underlying balancer, instructing
+// it to reconnect to the backends.
+func (ccb *ccBalancerWrapper) exitIdleMode() {
+ ccb.mu.Lock()
+ if ccb.mode == ccbModeClosed {
+ // Request to exit idle is a no-op when wrapper is already closed.
+ ccb.mu.Unlock()
+ return
+ }
+
+ if ccb.mode == ccbModeIdle {
+ // Recreate the serializer which was closed when we entered idle.
+ ctx, cancel := context.WithCancel(context.Background())
+ ccb.serializer = grpcsync.NewCallbackSerializer(ctx)
+ ccb.serializerCancel = cancel
+ }
+
+ // The ClientConn guarantees that mutual exclusion between close() and
+ // exitIdleMode(), and since we just created a new serializer, we can be
+ // sure that the below function will be scheduled.
+ done := make(chan struct{})
+ ccb.serializer.Schedule(func(_ context.Context) {
+ defer close(done)
+
+ ccb.mu.Lock()
+ defer ccb.mu.Unlock()
+
+ if ccb.mode != ccbModeIdle {
+ ccb.balancer.ExitIdle()
+ return
+ }
+
+ // Gracefulswitch balancer does not support a switchTo operation after
+ // being closed. Hence we need to create a new one here.
+ ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
+ ccb.mode = ccbModeActive
+ channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")
+
+ })
+ ccb.mu.Unlock()
+
+ <-done
+}
+
+func (ccb *ccBalancerWrapper) isIdleOrClosed() bool {
+ ccb.mu.Lock()
+ defer ccb.mu.Unlock()
+ return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
- if len(addrs) <= 0 {
- return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
+ if ccb.isIdleOrClosed() {
+ return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle")
}
- ccb.mu.Lock()
- defer ccb.mu.Unlock()
- if ccb.subConns == nil {
- return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
+
+ if len(addrs) == 0 {
+ return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
ac, err := ccb.cc.newAddrConn(addrs, opts)
if err != nil {
+ channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
}
- acbw := &acBalancerWrapper{ac: ac}
- acbw.ac.mu.Lock()
+ acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
ac.acbw = acbw
- acbw.ac.mu.Unlock()
- ccb.subConns[acbw] = struct{}{}
return acbw, nil
}
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
- // The RemoveSubConn() is handled in the run() goroutine, to avoid deadlock
- // during switchBalancer() if the old balancer calls RemoveSubConn() in its
- // Close().
- ccb.updateCh.Put(sc)
+ if ccb.isIdleOrClosed() {
+ // It it safe to ignore this call when the balancer is closed or in idle
+ // because the ClientConn takes care of closing the connections.
+ //
+ // Not returning early from here when the balancer is closed or in idle
+ // leads to a deadlock though, because of the following sequence of
+ // calls when holding cc.mu:
+ // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
+ // ccb.RemoveAddrConn --> cc.removeAddrConn
+ return
+ }
+
+ acbw, ok := sc.(*acBalancerWrapper)
+ if !ok {
+ return
+ }
+ ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
}
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
+ if ccb.isIdleOrClosed() {
+ return
+ }
+
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
@@ -210,11 +352,10 @@
}
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
- ccb.mu.Lock()
- defer ccb.mu.Unlock()
- if ccb.subConns == nil {
+ if ccb.isIdleOrClosed() {
return
}
+
// Update picker before updating state. Even though the ordering here does
// not matter, it can lead to multiple calls of Pick in the common start-up
// case where we wait for ready and then perform an RPC. If the picker is
@@ -225,6 +366,10 @@
}
func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
+ if ccb.isIdleOrClosed() {
+ return
+ }
+
ccb.cc.resolveNow(o)
}
@@ -235,58 +380,80 @@
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
- mu sync.Mutex
- ac *addrConn
+ ac *addrConn // read-only
+
+ mu sync.Mutex
+ producers map[balancer.ProducerBuilder]*refCountedProducer
+}
+
+func (acbw *acBalancerWrapper) String() string {
+ return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int())
}
func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
- acbw.mu.Lock()
- defer acbw.mu.Unlock()
- if len(addrs) <= 0 {
- acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
- return
- }
- if !acbw.ac.tryUpdateAddrs(addrs) {
- cc := acbw.ac.cc
- opts := acbw.ac.scopts
- acbw.ac.mu.Lock()
- // Set old ac.acbw to nil so the Shutdown state update will be ignored
- // by balancer.
- //
- // TODO(bar) the state transition could be wrong when tearDown() old ac
- // and creating new ac, fix the transition.
- acbw.ac.acbw = nil
- acbw.ac.mu.Unlock()
- acState := acbw.ac.getState()
- acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
-
- if acState == connectivity.Shutdown {
- return
- }
-
- newAC, err := cc.newAddrConn(addrs, opts)
- if err != nil {
- channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
- return
- }
- acbw.ac = newAC
- newAC.mu.Lock()
- newAC.acbw = acbw
- newAC.mu.Unlock()
- if acState != connectivity.Idle {
- go newAC.connect()
- }
- }
+ acbw.ac.updateAddrs(addrs)
}
func (acbw *acBalancerWrapper) Connect() {
- acbw.mu.Lock()
- defer acbw.mu.Unlock()
go acbw.ac.connect()
}
-func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
+// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
+// ready, blocks until it is or ctx expires. Returns an error when the context
+// expires or the addrConn is shut down.
+func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
+ transport, err := acbw.ac.getTransport(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
+}
+
+// Invoke performs a unary RPC. If the addrConn is not ready, returns
+// errSubConnNotReady.
+func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error {
+ cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...)
+ if err != nil {
+ return err
+ }
+ if err := cs.SendMsg(args); err != nil {
+ return err
+ }
+ return cs.RecvMsg(reply)
+}
+
+type refCountedProducer struct {
+ producer balancer.Producer
+ refs int // number of current refs to the producer
+ close func() // underlying producer's close function
+}
+
+func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
acbw.mu.Lock()
defer acbw.mu.Unlock()
- return acbw.ac
+
+ // Look up existing producer from this builder.
+ pData := acbw.producers[pb]
+ if pData == nil {
+ // Not found; create a new one and add it to the producers map.
+ p, close := pb.Build(acbw)
+ pData = &refCountedProducer{producer: p, close: close}
+ acbw.producers[pb] = pData
+ }
+ // Account for this new reference.
+ pData.refs++
+
+ // Return a cleanup function wrapped in a OnceFunc to remove this reference
+ // and delete the refCountedProducer from the map if the total reference
+ // count goes to zero.
+ unref := func() {
+ acbw.mu.Lock()
+ pData.refs--
+ if pData.refs == 0 {
+ defer pData.close() // Run outside the acbw mutex
+ delete(acbw.producers, pb)
+ }
+ acbw.mu.Unlock()
+ }
+ return pData.producer, grpcsync.OnceFunc(unref)
}