[VOL-5567] update protos

Change-Id: I2237e104062831286129ece7cae6621fd971ecb9
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 81969e7..ddd3773 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -33,20 +33,21 @@
 	"sync/atomic"
 	"time"
 
-	"golang.org/x/net/trace"
-
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials"
 	"google.golang.org/grpc/encoding"
 	"google.golang.org/grpc/encoding/proto"
+	estats "google.golang.org/grpc/experimental/stats"
 	"google.golang.org/grpc/grpclog"
 	"google.golang.org/grpc/internal"
 	"google.golang.org/grpc/internal/binarylog"
 	"google.golang.org/grpc/internal/channelz"
 	"google.golang.org/grpc/internal/grpcsync"
 	"google.golang.org/grpc/internal/grpcutil"
+	istats "google.golang.org/grpc/internal/stats"
 	"google.golang.org/grpc/internal/transport"
 	"google.golang.org/grpc/keepalive"
+	"google.golang.org/grpc/mem"
 	"google.golang.org/grpc/metadata"
 	"google.golang.org/grpc/peer"
 	"google.golang.org/grpc/stats"
@@ -70,9 +71,10 @@
 	internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
 		return srv.opts.creds
 	}
-	internal.DrainServerTransports = func(srv *Server, addr string) {
-		srv.drainServerTransports(addr)
+	internal.IsRegisteredMethod = func(srv *Server, method string) bool {
+		return srv.isRegisteredMethod(method)
 	}
+	internal.ServerFromContext = serverFromContext
 	internal.AddGlobalServerOptions = func(opt ...ServerOption) {
 		globalServerOptions = append(globalServerOptions, opt...)
 	}
@@ -81,17 +83,22 @@
 	}
 	internal.BinaryLogger = binaryLogger
 	internal.JoinServerOptions = newJoinServerOption
+	internal.BufferPool = bufferPool
+	internal.MetricsRecorderForServer = func(srv *Server) estats.MetricsRecorder {
+		return istats.NewMetricsRecorderList(srv.opts.statsHandlers)
+	}
 }
 
 var statusOK = status.New(codes.OK, "")
 var logger = grpclog.Component("core")
 
-type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
+// MethodHandler is a function type that processes a unary RPC method call.
+type MethodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error)
 
 // MethodDesc represents an RPC service's method specification.
 type MethodDesc struct {
 	MethodName string
-	Handler    methodHandler
+	Handler    MethodHandler
 }
 
 // ServiceDesc represents an RPC service's specification.
@@ -99,31 +106,26 @@
 	ServiceName string
 	// The pointer to the service interface. Used to check whether the user
 	// provided implementation satisfies the interface requirements.
-	HandlerType interface{}
+	HandlerType any
 	Methods     []MethodDesc
 	Streams     []StreamDesc
-	Metadata    interface{}
+	Metadata    any
 }
 
 // serviceInfo wraps information about a service. It is very similar to
 // ServiceDesc and is constructed from it for internal purposes.
 type serviceInfo struct {
 	// Contains the implementation for the methods in this service.
-	serviceImpl interface{}
+	serviceImpl any
 	methods     map[string]*MethodDesc
 	streams     map[string]*StreamDesc
-	mdata       interface{}
-}
-
-type serverWorkerData struct {
-	st     transport.ServerTransport
-	wg     *sync.WaitGroup
-	stream *transport.Stream
+	mdata       any
 }
 
 // Server is a gRPC server to serve RPC requests.
 type Server struct {
-	opts serverOptions
+	opts         serverOptions
+	statsHandler stats.Handler
 
 	mu  sync.Mutex // guards following
 	lis map[net.Listener]bool
@@ -135,17 +137,18 @@
 	drain    bool
 	cv       *sync.Cond              // signaled when connections close for GracefulStop
 	services map[string]*serviceInfo // service name -> service info
-	events   trace.EventLog
+	events   traceEventLog
 
 	quit               *grpcsync.Event
 	done               *grpcsync.Event
 	channelzRemoveOnce sync.Once
-	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
+	serveWG            sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop
+	handlersWG         sync.WaitGroup // counts active method handler goroutines
 
-	channelzID *channelz.Identifier
-	czData     *channelzData
+	channelz *channelz.Server
 
-	serverWorkerChannel chan *serverWorkerData
+	serverWorkerChannel      chan func()
+	serverWorkerChannelClose func()
 }
 
 type serverOptions struct {
@@ -170,18 +173,24 @@
 	initialConnWindowSize int32
 	writeBufferSize       int
 	readBufferSize        int
+	sharedWriteBuffer     bool
 	connectionTimeout     time.Duration
 	maxHeaderListSize     *uint32
 	headerTableSize       *uint32
 	numServerWorkers      uint32
+	bufferPool            mem.BufferPool
+	waitForHandlers       bool
+	staticWindowSize      bool
 }
 
 var defaultServerOptions = serverOptions{
+	maxConcurrentStreams:  math.MaxUint32,
 	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
 	maxSendMessageSize:    defaultServerMaxSendMessageSize,
 	connectionTimeout:     120 * time.Second,
 	writeBufferSize:       defaultWriteBufSize,
 	readBufferSize:        defaultReadBufSize,
+	bufferPool:            mem.DefaultBufferPool(),
 }
 var globalServerOptions []ServerOption
 
@@ -233,12 +242,24 @@
 	return &joinServerOption{opts: opts}
 }
 
+// SharedWriteBuffer allows reusing per-connection transport write buffer.
+// If this option is set to true every connection will release the buffer after
+// flushing the data on the wire.
+//
+// # Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func SharedWriteBuffer(val bool) ServerOption {
+	return newFuncServerOption(func(o *serverOptions) {
+		o.sharedWriteBuffer = val
+	})
+}
+
 // WriteBufferSize determines how much data can be batched before doing a write
-// on the wire. The corresponding memory allocation for this buffer will be
-// twice the size to keep syscalls low. The default value for this buffer is
-// 32KB. Zero or negative values will disable the write buffer such that each
-// write will be on underlying connection.
-// Note: A Send call may not directly translate to a write.
+// on the wire. The default value for this buffer is 32KB. Zero or negative
+// values will disable the write buffer such that each write will be on underlying
+// connection. Note: A Send call may not directly translate to a write.
 func WriteBufferSize(s int) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
 		o.writeBufferSize = s
@@ -260,6 +281,7 @@
 func InitialWindowSize(s int32) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
 		o.initialWindowSize = s
+		o.staticWindowSize = true
 	})
 }
 
@@ -268,14 +290,37 @@
 func InitialConnWindowSize(s int32) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
 		o.initialConnWindowSize = s
+		o.staticWindowSize = true
+	})
+}
+
+// StaticStreamWindowSize returns a ServerOption to set the initial stream
+// window size to the value provided and disables dynamic flow control.
+// The lower bound for window size is 64K and any value smaller than that
+// will be ignored.
+func StaticStreamWindowSize(s int32) ServerOption {
+	return newFuncServerOption(func(o *serverOptions) {
+		o.initialWindowSize = s
+		o.staticWindowSize = true
+	})
+}
+
+// StaticConnWindowSize returns a ServerOption to set the initial connection
+// window size to the value provided and disables dynamic flow control.
+// The lower bound for window size is 64K and any value smaller than that
+// will be ignored.
+func StaticConnWindowSize(s int32) ServerOption {
+	return newFuncServerOption(func(o *serverOptions) {
+		o.initialConnWindowSize = s
+		o.staticWindowSize = true
 	})
 }
 
 // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
 func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
-	if kp.Time > 0 && kp.Time < time.Second {
+	if kp.Time > 0 && kp.Time < internal.KeepaliveMinServerPingTime {
 		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
-		kp.Time = time.Second
+		kp.Time = internal.KeepaliveMinServerPingTime
 	}
 
 	return newFuncServerOption(func(o *serverOptions) {
@@ -301,7 +346,7 @@
 // Will be supported throughout 1.x.
 func CustomCodec(codec Codec) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
-		o.codec = codec
+		o.codec = newCodecV0Bridge(codec)
 	})
 }
 
@@ -330,7 +375,22 @@
 // later release.
 func ForceServerCodec(codec encoding.Codec) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
-		o.codec = codec
+		o.codec = newCodecV1Bridge(codec)
+	})
+}
+
+// ForceServerCodecV2 is the equivalent of ForceServerCodec, but for the new
+// CodecV2 interface.
+//
+// Will be supported throughout 1.x.
+//
+// # Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func ForceServerCodecV2(codecV2 encoding.CodecV2) ServerOption {
+	return newFuncServerOption(func(o *serverOptions) {
+		o.codec = codecV2
 	})
 }
 
@@ -387,6 +447,9 @@
 // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
 // of concurrent streams to each ServerTransport.
 func MaxConcurrentStreams(n uint32) ServerOption {
+	if n == 0 {
+		n = math.MaxUint32
+	}
 	return newFuncServerOption(func(o *serverOptions) {
 		o.maxConcurrentStreams = n
 	})
@@ -512,12 +575,22 @@
 	})
 }
 
+// MaxHeaderListSizeServerOption is a ServerOption that sets the max
+// (uncompressed) size of header list that the server is prepared to accept.
+type MaxHeaderListSizeServerOption struct {
+	MaxHeaderListSize uint32
+}
+
+func (o MaxHeaderListSizeServerOption) apply(so *serverOptions) {
+	so.maxHeaderListSize = &o.MaxHeaderListSize
+}
+
 // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
 // of header list that the server is prepared to accept.
 func MaxHeaderListSize(s uint32) ServerOption {
-	return newFuncServerOption(func(o *serverOptions) {
-		o.maxHeaderListSize = &s
-	})
+	return MaxHeaderListSizeServerOption{
+		MaxHeaderListSize: s,
+	}
 }
 
 // HeaderTableSize returns a ServerOption that sets the size of dynamic
@@ -552,6 +625,27 @@
 	})
 }
 
+// WaitForHandlers cause Stop to wait until all outstanding method handlers have
+// exited before returning.  If false, Stop will return as soon as all
+// connections have closed, but method handlers may still be running. By
+// default, Stop does not wait for method handlers to return.
+//
+// # Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func WaitForHandlers(w bool) ServerOption {
+	return newFuncServerOption(func(o *serverOptions) {
+		o.waitForHandlers = w
+	})
+}
+
+func bufferPool(bufferPool mem.BufferPool) ServerOption {
+	return newFuncServerOption(func(o *serverOptions) {
+		o.bufferPool = bufferPool
+	})
+}
+
 // serverWorkerResetThreshold defines how often the stack must be reset. Every
 // N requests, by spawning a new goroutine in its place, a worker can reset its
 // stack so that large stacks don't live in memory forever. 2^16 should allow
@@ -559,41 +653,35 @@
 // workload (assuming a QPS of a few thousand requests/sec).
 const serverWorkerResetThreshold = 1 << 16
 
-// serverWorkers blocks on a *transport.Stream channel forever and waits for
-// data to be fed by serveStreams. This allows multiple requests to be
+// serverWorker blocks on a *transport.ServerStream channel forever and waits
+// for data to be fed by serveStreams. This allows multiple requests to be
 // processed by the same goroutine, removing the need for expensive stack
 // re-allocations (see the runtime.morestack problem [1]).
 //
 // [1] https://github.com/golang/go/issues/18138
 func (s *Server) serverWorker() {
 	for completed := 0; completed < serverWorkerResetThreshold; completed++ {
-		data, ok := <-s.serverWorkerChannel
+		f, ok := <-s.serverWorkerChannel
 		if !ok {
 			return
 		}
-		s.handleSingleStream(data)
+		f()
 	}
 	go s.serverWorker()
 }
 
-func (s *Server) handleSingleStream(data *serverWorkerData) {
-	defer data.wg.Done()
-	s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
-}
-
 // initServerWorkers creates worker goroutines and a channel to process incoming
 // connections to reduce the time spent overall on runtime.morestack.
 func (s *Server) initServerWorkers() {
-	s.serverWorkerChannel = make(chan *serverWorkerData)
+	s.serverWorkerChannel = make(chan func())
+	s.serverWorkerChannelClose = sync.OnceFunc(func() {
+		close(s.serverWorkerChannel)
+	})
 	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
 		go s.serverWorker()
 	}
 }
 
-func (s *Server) stopServerWorkers() {
-	close(s.serverWorkerChannel)
-}
-
 // NewServer creates a gRPC server which has no service registered and has not
 // started to accept requests yet.
 func NewServer(opt ...ServerOption) *Server {
@@ -605,34 +693,34 @@
 		o.apply(&opts)
 	}
 	s := &Server{
-		lis:      make(map[net.Listener]bool),
-		opts:     opts,
-		conns:    make(map[string]map[transport.ServerTransport]bool),
-		services: make(map[string]*serviceInfo),
-		quit:     grpcsync.NewEvent(),
-		done:     grpcsync.NewEvent(),
-		czData:   new(channelzData),
+		lis:          make(map[net.Listener]bool),
+		opts:         opts,
+		statsHandler: istats.NewCombinedHandler(opts.statsHandlers...),
+		conns:        make(map[string]map[transport.ServerTransport]bool),
+		services:     make(map[string]*serviceInfo),
+		quit:         grpcsync.NewEvent(),
+		done:         grpcsync.NewEvent(),
+		channelz:     channelz.RegisterServer(""),
 	}
 	chainUnaryServerInterceptors(s)
 	chainStreamServerInterceptors(s)
 	s.cv = sync.NewCond(&s.mu)
 	if EnableTracing {
 		_, file, line, _ := runtime.Caller(1)
-		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
+		s.events = newTraceEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
 	}
 
 	if s.opts.numServerWorkers > 0 {
 		s.initServerWorkers()
 	}
 
-	s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
-	channelz.Info(logger, s.channelzID, "Server created")
+	channelz.Info(logger, s.channelz, "Server created")
 	return s
 }
 
 // printf records an event in s's event log, unless s has been stopped.
 // REQUIRES s.mu is held.
-func (s *Server) printf(format string, a ...interface{}) {
+func (s *Server) printf(format string, a ...any) {
 	if s.events != nil {
 		s.events.Printf(format, a...)
 	}
@@ -640,7 +728,7 @@
 
 // errorf records an error in s's event log, unless s has been stopped.
 // REQUIRES s.mu is held.
-func (s *Server) errorf(format string, a ...interface{}) {
+func (s *Server) errorf(format string, a ...any) {
 	if s.events != nil {
 		s.events.Errorf(format, a...)
 	}
@@ -655,14 +743,14 @@
 	// once the server has started serving.
 	// desc describes the service and its methods and handlers. impl is the
 	// service implementation which is passed to the method handlers.
-	RegisterService(desc *ServiceDesc, impl interface{})
+	RegisterService(desc *ServiceDesc, impl any)
 }
 
 // RegisterService registers a service and its implementation to the gRPC
 // server. It is called from the IDL generated code. This must be called before
 // invoking Serve. If ss is non-nil (for legacy code), its type is checked to
 // ensure it implements sd.HandlerType.
-func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
+func (s *Server) RegisterService(sd *ServiceDesc, ss any) {
 	if ss != nil {
 		ht := reflect.TypeOf(sd.HandlerType).Elem()
 		st := reflect.TypeOf(ss)
@@ -673,7 +761,7 @@
 	s.register(sd, ss)
 }
 
-func (s *Server) register(sd *ServiceDesc, ss interface{}) {
+func (s *Server) register(sd *ServiceDesc, ss any) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	s.printf("RegisterService(%q)", sd.ServiceName)
@@ -714,7 +802,7 @@
 type ServiceInfo struct {
 	Methods []MethodInfo
 	// Metadata is the metadata specified in ServiceDesc when registering service.
-	Metadata interface{}
+	Metadata any
 }
 
 // GetServiceInfo returns a map from service names to ServiceInfo.
@@ -752,20 +840,13 @@
 
 type listenSocket struct {
 	net.Listener
-	channelzID *channelz.Identifier
-}
-
-func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
-	return &channelz.SocketInternalMetric{
-		SocketOptions: channelz.GetSocketOption(l.Listener),
-		LocalAddr:     l.Listener.Addr(),
-	}
+	channelz *channelz.Socket
 }
 
 func (l *listenSocket) Close() error {
 	err := l.Listener.Close()
-	channelz.RemoveEntry(l.channelzID)
-	channelz.Info(logger, l.channelzID, "ListenSocket deleted")
+	channelz.RemoveEntry(l.channelz.ID)
+	channelz.Info(logger, l.channelz, "ListenSocket deleted")
 	return err
 }
 
@@ -775,6 +856,18 @@
 // Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
 // this method returns.
 // Serve will return a non-nil error unless Stop or GracefulStop is called.
+//
+// Note: All supported releases of Go (as of December 2023) override the OS
+// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
+// with OS defaults for keepalive time and interval, callers need to do the
+// following two things:
+//   - pass a net.Listener created by calling the Listen method on a
+//     net.ListenConfig with the `KeepAlive` field set to a negative value. This
+//     will result in the Go standard library not overriding OS defaults for TCP
+//     keepalive interval and time. But this will also result in the Go standard
+//     library not enabling TCP keepalives by default.
+//   - override the Accept method on the passed in net.Listener and set the
+//     SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
 func (s *Server) Serve(lis net.Listener) error {
 	s.mu.Lock()
 	s.printf("serving")
@@ -795,7 +888,16 @@
 		}
 	}()
 
-	ls := &listenSocket{Listener: lis}
+	ls := &listenSocket{
+		Listener: lis,
+		channelz: channelz.RegisterSocket(&channelz.Socket{
+			SocketType:    channelz.SocketTypeListen,
+			Parent:        s.channelz,
+			RefName:       lis.Addr().String(),
+			LocalAddr:     lis.Addr(),
+			SocketOptions: channelz.GetSocketOption(lis)},
+		),
+	}
 	s.lis[ls] = true
 
 	defer func() {
@@ -807,14 +909,8 @@
 		s.mu.Unlock()
 	}()
 
-	var err error
-	ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
-	if err != nil {
-		s.mu.Unlock()
-		return err
-	}
 	s.mu.Unlock()
-	channelz.Info(logger, ls.channelzID, "ListenSocket created")
+	channelz.Info(logger, ls.channelz, "ListenSocket created")
 
 	var tempDelay time.Duration // how long to sleep on accept failure
 	for {
@@ -882,24 +978,21 @@
 		return
 	}
 
+	if cc, ok := rawConn.(interface {
+		PassServerTransport(transport.ServerTransport)
+	}); ok {
+		cc.PassServerTransport(st)
+	}
+
 	if !s.addConn(lisAddr, st) {
 		return
 	}
 	go func() {
-		s.serveStreams(st)
+		s.serveStreams(context.Background(), st, rawConn)
 		s.removeConn(lisAddr, st)
 	}()
 }
 
-func (s *Server) drainServerTransports(addr string) {
-	s.mu.Lock()
-	conns := s.conns[addr]
-	for st := range conns {
-		st.Drain("")
-	}
-	s.mu.Unlock()
-}
-
 // newHTTP2Transport sets up a http/2 transport (using the
 // gRPC http2 server transport in transport/http2_server.go).
 func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
@@ -908,16 +1001,19 @@
 		ConnectionTimeout:     s.opts.connectionTimeout,
 		Credentials:           s.opts.creds,
 		InTapHandle:           s.opts.inTapHandle,
-		StatsHandlers:         s.opts.statsHandlers,
+		StatsHandler:          s.statsHandler,
 		KeepaliveParams:       s.opts.keepaliveParams,
 		KeepalivePolicy:       s.opts.keepalivePolicy,
 		InitialWindowSize:     s.opts.initialWindowSize,
 		InitialConnWindowSize: s.opts.initialConnWindowSize,
 		WriteBufferSize:       s.opts.writeBufferSize,
 		ReadBufferSize:        s.opts.readBufferSize,
-		ChannelzParentID:      s.channelzID,
+		SharedWriteBuffer:     s.opts.sharedWriteBuffer,
+		ChannelzParent:        s.channelz,
 		MaxHeaderListSize:     s.opts.maxHeaderListSize,
 		HeaderTableSize:       s.opts.headerTableSize,
+		BufferPool:            s.opts.bufferPool,
+		StaticWindowSize:      s.opts.staticWindowSize,
 	}
 	st, err := transport.NewServerTransport(c, config)
 	if err != nil {
@@ -929,7 +1025,7 @@
 		if err != credentials.ErrConnDispatched {
 			// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
 			if err != io.EOF {
-				channelz.Info(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
+				channelz.Info(logger, s.channelz, "grpc: Server.Serve failed to create ServerTransport: ", err)
 			}
 			c.Close()
 		}
@@ -939,33 +1035,44 @@
 	return st
 }
 
-func (s *Server) serveStreams(st transport.ServerTransport) {
-	defer st.Close(errors.New("finished serving streams for the server transport"))
-	var wg sync.WaitGroup
+func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) {
+	ctx = transport.SetConnection(ctx, rawConn)
+	ctx = peer.NewContext(ctx, st.Peer())
+	if s.statsHandler != nil {
+		ctx = s.statsHandler.TagConn(ctx, &stats.ConnTagInfo{
+			RemoteAddr: st.Peer().Addr,
+			LocalAddr:  st.Peer().LocalAddr,
+		})
+		s.statsHandler.HandleConn(ctx, &stats.ConnBegin{})
+	}
 
-	st.HandleStreams(func(stream *transport.Stream) {
-		wg.Add(1)
+	defer func() {
+		st.Close(errors.New("finished serving streams for the server transport"))
+		if s.statsHandler != nil {
+			s.statsHandler.HandleConn(ctx, &stats.ConnEnd{})
+		}
+	}()
+
+	streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
+	st.HandleStreams(ctx, func(stream *transport.ServerStream) {
+		s.handlersWG.Add(1)
+		streamQuota.acquire()
+		f := func() {
+			defer streamQuota.release()
+			defer s.handlersWG.Done()
+			s.handleStream(st, stream)
+		}
+
 		if s.opts.numServerWorkers > 0 {
-			data := &serverWorkerData{st: st, wg: &wg, stream: stream}
 			select {
-			case s.serverWorkerChannel <- data:
+			case s.serverWorkerChannel <- f:
 				return
 			default:
 				// If all stream workers are busy, fallback to the default code path.
 			}
 		}
-		go func() {
-			defer wg.Done()
-			s.handleStream(st, stream, s.traceInfo(st, stream))
-		}()
-	}, func(ctx context.Context, method string) context.Context {
-		if !EnableTracing {
-			return ctx
-		}
-		tr := trace.New("grpc.Recv."+methodFamily(method), method)
-		return trace.NewContext(ctx, tr)
+		go f()
 	})
-	wg.Wait()
 }
 
 var _ http.Handler = (*Server)(nil)
@@ -999,7 +1106,7 @@
 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 // later release.
 func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers)
+	st, err := transport.NewServerHandlerTransport(w, r, s.statsHandler, s.opts.bufferPool)
 	if err != nil {
 		// Errors returned from transport.NewServerHandlerTransport have
 		// already been written to w.
@@ -1009,31 +1116,7 @@
 		return
 	}
 	defer s.removeConn(listenerAddressForServeHTTP, st)
-	s.serveStreams(st)
-}
-
-// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
-// If tracing is not enabled, it returns nil.
-func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
-	if !EnableTracing {
-		return nil
-	}
-	tr, ok := trace.FromContext(stream.Context())
-	if !ok {
-		return nil
-	}
-
-	trInfo = &traceInfo{
-		tr: tr,
-		firstLine: firstLine{
-			client:     false,
-			remoteAddr: st.RemoteAddr(),
-		},
-	}
-	if dl, ok := stream.Context().Deadline(); ok {
-		trInfo.firstLine.deadline = time.Until(dl)
-	}
-	return trInfo
+	s.serveStreams(r.Context(), st, nil)
 }
 
 func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
@@ -1074,49 +1157,51 @@
 	}
 }
 
-func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
-	return &channelz.ServerInternalMetric{
-		CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted),
-		CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded),
-		CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed),
-		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
-	}
-}
-
 func (s *Server) incrCallsStarted() {
-	atomic.AddInt64(&s.czData.callsStarted, 1)
-	atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
+	s.channelz.ServerMetrics.CallsStarted.Add(1)
+	s.channelz.ServerMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
 }
 
 func (s *Server) incrCallsSucceeded() {
-	atomic.AddInt64(&s.czData.callsSucceeded, 1)
+	s.channelz.ServerMetrics.CallsSucceeded.Add(1)
 }
 
 func (s *Server) incrCallsFailed() {
-	atomic.AddInt64(&s.czData.callsFailed, 1)
+	s.channelz.ServerMetrics.CallsFailed.Add(1)
 }
 
-func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
+func (s *Server) sendResponse(ctx context.Context, stream *transport.ServerStream, msg any, cp Compressor, opts *transport.WriteOptions, comp encoding.Compressor) error {
 	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
 	if err != nil {
-		channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
+		channelz.Error(logger, s.channelz, "grpc: server failed to encode response: ", err)
 		return err
 	}
-	compData, err := compress(data, cp, comp)
+
+	compData, pf, err := compress(data, cp, comp, s.opts.bufferPool)
 	if err != nil {
-		channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
+		data.Free()
+		channelz.Error(logger, s.channelz, "grpc: server failed to compress response: ", err)
 		return err
 	}
-	hdr, payload := msgHeader(data, compData)
+
+	hdr, payload := msgHeader(data, compData, pf)
+
+	defer func() {
+		compData.Free()
+		data.Free()
+		// payload does not need to be freed here, it is either data or compData, both of
+		// which are already freed.
+	}()
+
+	dataLen := data.Len()
+	payloadLen := payload.Len()
 	// TODO(dfawley): should we be checking len(data) instead?
-	if len(payload) > s.opts.maxSendMessageSize {
-		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
+	if payloadLen > s.opts.maxSendMessageSize {
+		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", payloadLen, s.opts.maxSendMessageSize)
 	}
-	err = t.Write(stream, hdr, payload, opts)
-	if err == nil {
-		for _, sh := range s.opts.statsHandlers {
-			sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
-		}
+	err = stream.Write(hdr, payload, opts)
+	if err == nil && s.statsHandler != nil {
+		s.statsHandler.HandleRPC(ctx, outPayload(false, msg, dataLen, payloadLen, time.Now()))
 	}
 	return err
 }
@@ -1143,7 +1228,7 @@
 }
 
 func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
-	return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
+	return func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (any, error) {
 		return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
 	}
 }
@@ -1152,26 +1237,25 @@
 	if curr == len(interceptors)-1 {
 		return finalHandler
 	}
-	return func(ctx context.Context, req interface{}) (interface{}, error) {
+	return func(ctx context.Context, req any) (any, error) {
 		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
 	}
 }
 
-func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
-	shs := s.opts.statsHandlers
-	if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
+func (s *Server) processUnaryRPC(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
+	sh := s.statsHandler
+	if sh != nil || trInfo != nil || channelz.IsOn() {
 		if channelz.IsOn() {
 			s.incrCallsStarted()
 		}
 		var statsBegin *stats.Begin
-		for _, sh := range shs {
-			beginTime := time.Now()
+		if sh != nil {
 			statsBegin = &stats.Begin{
-				BeginTime:      beginTime,
+				BeginTime:      time.Now(),
 				IsClientStream: false,
 				IsServerStream: false,
 			}
-			sh.HandleRPC(stream.Context(), statsBegin)
+			sh.HandleRPC(ctx, statsBegin)
 		}
 		if trInfo != nil {
 			trInfo.tr.LazyLog(&trInfo.firstLine, false)
@@ -1189,13 +1273,13 @@
 		defer func() {
 			if trInfo != nil {
 				if err != nil && err != io.EOF {
-					trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+					trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
 					trInfo.tr.SetError()
 				}
 				trInfo.tr.Finish()
 			}
 
-			for _, sh := range shs {
+			if sh != nil {
 				end := &stats.End{
 					BeginTime: statsBegin.BeginTime,
 					EndTime:   time.Now(),
@@ -1203,7 +1287,7 @@
 				if err != nil && err != io.EOF {
 					end.Error = toRPCErr(err)
 				}
-				sh.HandleRPC(stream.Context(), end)
+				sh.HandleRPC(ctx, end)
 			}
 
 			if channelz.IsOn() {
@@ -1225,7 +1309,6 @@
 		}
 	}
 	if len(binlogs) != 0 {
-		ctx := stream.Context()
 		md, _ := metadata.FromIncomingContext(ctx)
 		logEntry := &binarylog.ClientHeader{
 			Header:     md,
@@ -1266,7 +1349,7 @@
 		decomp = encoding.GetCompressor(rc)
 		if decomp == nil {
 			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
-			t.WriteStatus(stream, st)
+			stream.WriteStatus(st)
 			return st.Err()
 		}
 	}
@@ -1293,39 +1376,47 @@
 	}
 
 	var payInfo *payloadInfo
-	if len(shs) != 0 || len(binlogs) != 0 {
+	if sh != nil || len(binlogs) != 0 {
 		payInfo = &payloadInfo{}
+		defer payInfo.free()
 	}
-	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
+
+	d, err := recvAndDecompress(&parser{r: stream, bufferPool: s.opts.bufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp, true)
 	if err != nil {
-		if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
-			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
+		if e := stream.WriteStatus(status.Convert(err)); e != nil {
+			channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
 		}
 		return err
 	}
-	if channelz.IsOn() {
-		t.IncrMsgRecv()
+	freed := false
+	dataFree := func() {
+		if !freed {
+			d.Free()
+			freed = true
+		}
 	}
-	df := func(v interface{}) error {
+	defer dataFree()
+	df := func(v any) error {
+		defer dataFree()
 		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
 			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
 		}
-		for _, sh := range shs {
-			sh.HandleRPC(stream.Context(), &stats.InPayload{
+
+		if sh != nil {
+			sh.HandleRPC(ctx, &stats.InPayload{
 				RecvTime:         time.Now(),
 				Payload:          v,
-				Length:           len(d),
+				Length:           d.Len(),
 				WireLength:       payInfo.compressedLength + headerLen,
 				CompressedLength: payInfo.compressedLength,
-				Data:             d,
 			})
 		}
 		if len(binlogs) != 0 {
 			cm := &binarylog.ClientMessage{
-				Message: d,
+				Message: d.Materialize(),
 			}
 			for _, binlog := range binlogs {
-				binlog.Log(stream.Context(), cm)
+				binlog.Log(ctx, cm)
 			}
 		}
 		if trInfo != nil {
@@ -1333,7 +1424,7 @@
 		}
 		return nil
 	}
-	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
+	ctx = NewContextWithServerTransportStream(ctx, stream)
 	reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
 	if appErr != nil {
 		appStatus, ok := status.FromError(appErr)
@@ -1347,8 +1438,8 @@
 			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
 			trInfo.tr.SetError()
 		}
-		if e := t.WriteStatus(stream, appStatus); e != nil {
-			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
+		if e := stream.WriteStatus(appStatus); e != nil {
+			channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
 		}
 		if len(binlogs) != 0 {
 			if h, _ := stream.Header(); h.Len() > 0 {
@@ -1358,7 +1449,7 @@
 					Header: h,
 				}
 				for _, binlog := range binlogs {
-					binlog.Log(stream.Context(), sh)
+					binlog.Log(ctx, sh)
 				}
 			}
 			st := &binarylog.ServerTrailer{
@@ -1366,7 +1457,7 @@
 				Err:     appErr,
 			}
 			for _, binlog := range binlogs {
-				binlog.Log(stream.Context(), st)
+				binlog.Log(ctx, st)
 			}
 		}
 		return appErr
@@ -1374,21 +1465,21 @@
 	if trInfo != nil {
 		trInfo.tr.LazyLog(stringer("OK"), false)
 	}
-	opts := &transport.Options{Last: true}
+	opts := &transport.WriteOptions{Last: true}
 
 	// Server handler could have set new compressor by calling SetSendCompressor.
 	// In case it is set, we need to use it for compressing outbound message.
 	if stream.SendCompress() != sendCompressorName {
 		comp = encoding.GetCompressor(stream.SendCompress())
 	}
-	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
+	if err := s.sendResponse(ctx, stream, reply, cp, opts, comp); err != nil {
 		if err == io.EOF {
 			// The entire stream is done (for unary RPC only).
 			return err
 		}
 		if sts, ok := status.FromError(err); ok {
-			if e := t.WriteStatus(stream, sts); e != nil {
-				channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
+			if e := stream.WriteStatus(sts); e != nil {
+				channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
 			}
 		} else {
 			switch st := err.(type) {
@@ -1408,8 +1499,8 @@
 				Err:     appErr,
 			}
 			for _, binlog := range binlogs {
-				binlog.Log(stream.Context(), sh)
-				binlog.Log(stream.Context(), st)
+				binlog.Log(ctx, sh)
+				binlog.Log(ctx, st)
 			}
 		}
 		return err
@@ -1423,13 +1514,10 @@
 			Message: reply,
 		}
 		for _, binlog := range binlogs {
-			binlog.Log(stream.Context(), sh)
-			binlog.Log(stream.Context(), sm)
+			binlog.Log(ctx, sh)
+			binlog.Log(ctx, sm)
 		}
 	}
-	if channelz.IsOn() {
-		t.IncrMsgSent()
-	}
 	if trInfo != nil {
 		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
 	}
@@ -1442,10 +1530,10 @@
 			Err:     appErr,
 		}
 		for _, binlog := range binlogs {
-			binlog.Log(stream.Context(), st)
+			binlog.Log(ctx, st)
 		}
 	}
-	return t.WriteStatus(stream, statusOK)
+	return stream.WriteStatus(statusOK)
 }
 
 // chainStreamServerInterceptors chains all stream server interceptors into one.
@@ -1470,7 +1558,7 @@
 }
 
 func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
-	return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
+	return func(srv any, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
 		return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
 	}
 }
@@ -1479,48 +1567,45 @@
 	if curr == len(interceptors)-1 {
 		return finalHandler
 	}
-	return func(srv interface{}, stream ServerStream) error {
+	return func(srv any, stream ServerStream) error {
 		return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
 	}
 }
 
-func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
+func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
 	if channelz.IsOn() {
 		s.incrCallsStarted()
 	}
-	shs := s.opts.statsHandlers
+	sh := s.statsHandler
 	var statsBegin *stats.Begin
-	if len(shs) != 0 {
-		beginTime := time.Now()
+	if sh != nil {
 		statsBegin = &stats.Begin{
-			BeginTime:      beginTime,
+			BeginTime:      time.Now(),
 			IsClientStream: sd.ClientStreams,
 			IsServerStream: sd.ServerStreams,
 		}
-		for _, sh := range shs {
-			sh.HandleRPC(stream.Context(), statsBegin)
-		}
+		sh.HandleRPC(ctx, statsBegin)
 	}
-	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
+	ctx = NewContextWithServerTransportStream(ctx, stream)
 	ss := &serverStream{
 		ctx:                   ctx,
-		t:                     t,
 		s:                     stream,
-		p:                     &parser{r: stream},
+		p:                     parser{r: stream, bufferPool: s.opts.bufferPool},
 		codec:                 s.getCodec(stream.ContentSubtype()),
+		desc:                  sd,
 		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
 		maxSendMessageSize:    s.opts.maxSendMessageSize,
 		trInfo:                trInfo,
-		statsHandler:          shs,
+		statsHandler:          sh,
 	}
 
-	if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
+	if sh != nil || trInfo != nil || channelz.IsOn() {
 		// See comment in processUnaryRPC on defers.
 		defer func() {
 			if trInfo != nil {
 				ss.mu.Lock()
 				if err != nil && err != io.EOF {
-					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
 					ss.trInfo.tr.SetError()
 				}
 				ss.trInfo.tr.Finish()
@@ -1528,7 +1613,7 @@
 				ss.mu.Unlock()
 			}
 
-			if len(shs) != 0 {
+			if sh != nil {
 				end := &stats.End{
 					BeginTime: statsBegin.BeginTime,
 					EndTime:   time.Now(),
@@ -1536,9 +1621,7 @@
 				if err != nil && err != io.EOF {
 					end.Error = toRPCErr(err)
 				}
-				for _, sh := range shs {
-					sh.HandleRPC(stream.Context(), end)
-				}
+				sh.HandleRPC(ctx, end)
 			}
 
 			if channelz.IsOn() {
@@ -1579,19 +1662,19 @@
 			logEntry.PeerAddr = peer.Addr
 		}
 		for _, binlog := range ss.binlogs {
-			binlog.Log(stream.Context(), logEntry)
+			binlog.Log(ctx, logEntry)
 		}
 	}
 
 	// If dc is set and matches the stream's compression, use it.  Otherwise, try
 	// to find a matching registered compressor for decomp.
 	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
-		ss.dc = s.opts.dc
+		ss.decompressorV0 = s.opts.dc
 	} else if rc != "" && rc != encoding.Identity {
-		ss.decomp = encoding.GetCompressor(rc)
-		if ss.decomp == nil {
+		ss.decompressorV1 = encoding.GetCompressor(rc)
+		if ss.decompressorV1 == nil {
 			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
-			t.WriteStatus(ss.s, st)
+			ss.s.WriteStatus(st)
 			return st.Err()
 		}
 	}
@@ -1601,12 +1684,12 @@
 	//
 	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
 	if s.opts.cp != nil {
-		ss.cp = s.opts.cp
+		ss.compressorV0 = s.opts.cp
 		ss.sendCompressorName = s.opts.cp.Type()
 	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
 		// Legacy compressor not specified; attempt to respond with same encoding.
-		ss.comp = encoding.GetCompressor(rc)
-		if ss.comp != nil {
+		ss.compressorV1 = encoding.GetCompressor(rc)
+		if ss.compressorV1 != nil {
 			ss.sendCompressorName = rc
 		}
 	}
@@ -1617,13 +1700,13 @@
 		}
 	}
 
-	ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
+	ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.compressorV0, ss.compressorV1)
 
 	if trInfo != nil {
 		trInfo.tr.LazyLog(&trInfo.firstLine, false)
 	}
 	var appErr error
-	var server interface{}
+	var server any
 	if info != nil {
 		server = info.serviceImpl
 	}
@@ -1657,10 +1740,10 @@
 				Err:     appErr,
 			}
 			for _, binlog := range ss.binlogs {
-				binlog.Log(stream.Context(), st)
+				binlog.Log(ctx, st)
 			}
 		}
-		t.WriteStatus(ss.s, appStatus)
+		ss.s.WriteStatus(appStatus)
 		// TODO: Should we log an error from WriteStatus here and below?
 		return appErr
 	}
@@ -1675,53 +1758,88 @@
 			Err:     appErr,
 		}
 		for _, binlog := range ss.binlogs {
-			binlog.Log(stream.Context(), st)
+			binlog.Log(ctx, st)
 		}
 	}
-	return t.WriteStatus(ss.s, statusOK)
+	return ss.s.WriteStatus(statusOK)
 }
 
-func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
+func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream) {
+	ctx := stream.Context()
+	ctx = contextWithServer(ctx, s)
+	var ti *traceInfo
+	if EnableTracing {
+		tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
+		ctx = newTraceContext(ctx, tr)
+		ti = &traceInfo{
+			tr: tr,
+			firstLine: firstLine{
+				client:     false,
+				remoteAddr: t.Peer().Addr,
+			},
+		}
+		if dl, ok := ctx.Deadline(); ok {
+			ti.firstLine.deadline = time.Until(dl)
+		}
+	}
+
 	sm := stream.Method()
 	if sm != "" && sm[0] == '/' {
 		sm = sm[1:]
 	}
 	pos := strings.LastIndex(sm, "/")
 	if pos == -1 {
-		if trInfo != nil {
-			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
-			trInfo.tr.SetError()
+		if ti != nil {
+			ti.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
+			ti.tr.SetError()
 		}
 		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
-		if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
-			if trInfo != nil {
-				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
-				trInfo.tr.SetError()
+		if err := stream.WriteStatus(status.New(codes.Unimplemented, errDesc)); err != nil {
+			if ti != nil {
+				ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
+				ti.tr.SetError()
 			}
-			channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
+			channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)
 		}
-		if trInfo != nil {
-			trInfo.tr.Finish()
+		if ti != nil {
+			ti.tr.Finish()
 		}
 		return
 	}
 	service := sm[:pos]
 	method := sm[pos+1:]
 
+	// FromIncomingContext is expensive: skip if there are no statsHandlers
+	if s.statsHandler != nil {
+		md, _ := metadata.FromIncomingContext(ctx)
+		ctx = s.statsHandler.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()})
+		s.statsHandler.HandleRPC(ctx, &stats.InHeader{
+			FullMethod:  stream.Method(),
+			RemoteAddr:  t.Peer().Addr,
+			LocalAddr:   t.Peer().LocalAddr,
+			Compression: stream.RecvCompress(),
+			WireLength:  stream.HeaderWireLength(),
+			Header:      md,
+		})
+	}
+	// To have calls in stream callouts work. Will delete once all stats handler
+	// calls come from the gRPC layer.
+	stream.SetContext(ctx)
+
 	srv, knownService := s.services[service]
 	if knownService {
 		if md, ok := srv.methods[method]; ok {
-			s.processUnaryRPC(t, stream, srv, md, trInfo)
+			s.processUnaryRPC(ctx, stream, srv, md, ti)
 			return
 		}
 		if sd, ok := srv.streams[method]; ok {
-			s.processStreamingRPC(t, stream, srv, sd, trInfo)
+			s.processStreamingRPC(ctx, stream, srv, sd, ti)
 			return
 		}
 	}
 	// Unknown service, or known server unknown method.
 	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
-		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
+		s.processStreamingRPC(ctx, stream, nil, unknownDesc, ti)
 		return
 	}
 	var errDesc string
@@ -1730,19 +1848,19 @@
 	} else {
 		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
 	}
-	if trInfo != nil {
-		trInfo.tr.LazyPrintf("%s", errDesc)
-		trInfo.tr.SetError()
+	if ti != nil {
+		ti.tr.LazyPrintf("%s", errDesc)
+		ti.tr.SetError()
 	}
-	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
-		if trInfo != nil {
-			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
-			trInfo.tr.SetError()
+	if err := stream.WriteStatus(status.New(codes.Unimplemented, errDesc)); err != nil {
+		if ti != nil {
+			ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
+			ti.tr.SetError()
 		}
-		channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
+		channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)
 	}
-	if trInfo != nil {
-		trInfo.tr.Finish()
+	if ti != nil {
+		ti.tr.Finish()
 	}
 }
 
@@ -1797,62 +1915,71 @@
 // pending RPCs on the client side will get notified by connection
 // errors.
 func (s *Server) Stop() {
-	s.quit.Fire()
-
-	defer func() {
-		s.serveWG.Wait()
-		s.done.Fire()
-	}()
-
-	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
-
-	s.mu.Lock()
-	listeners := s.lis
-	s.lis = nil
-	conns := s.conns
-	s.conns = nil
-	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
-	s.cv.Broadcast()
-	s.mu.Unlock()
-
-	for lis := range listeners {
-		lis.Close()
-	}
-	for _, cs := range conns {
-		for st := range cs {
-			st.Close(errors.New("Server.Stop called"))
-		}
-	}
-	if s.opts.numServerWorkers > 0 {
-		s.stopServerWorkers()
-	}
-
-	s.mu.Lock()
-	if s.events != nil {
-		s.events.Finish()
-		s.events = nil
-	}
-	s.mu.Unlock()
+	s.stop(false)
 }
 
 // GracefulStop stops the gRPC server gracefully. It stops the server from
 // accepting new connections and RPCs and blocks until all the pending RPCs are
 // finished.
 func (s *Server) GracefulStop() {
+	s.stop(true)
+}
+
+func (s *Server) stop(graceful bool) {
 	s.quit.Fire()
 	defer s.done.Fire()
 
-	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
+	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })
 	s.mu.Lock()
-	if s.conns == nil {
-		s.mu.Unlock()
-		return
+	s.closeListenersLocked()
+	// Wait for serving threads to be ready to exit.  Only then can we be sure no
+	// new conns will be created.
+	s.mu.Unlock()
+	s.serveWG.Wait()
+
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	if graceful {
+		s.drainAllServerTransportsLocked()
+	} else {
+		s.closeServerTransportsLocked()
 	}
 
-	for lis := range s.lis {
-		lis.Close()
+	for len(s.conns) != 0 {
+		s.cv.Wait()
 	}
-	s.lis = nil
+	s.conns = nil
+
+	if s.opts.numServerWorkers > 0 {
+		// Closing the channel (only once, via sync.OnceFunc) after all the
+		// connections have been closed above ensures that there are no
+		// goroutines executing the callback passed to st.HandleStreams (where
+		// the channel is written to).
+		s.serverWorkerChannelClose()
+	}
+
+	if graceful || s.opts.waitForHandlers {
+		s.handlersWG.Wait()
+	}
+
+	if s.events != nil {
+		s.events.Finish()
+		s.events = nil
+	}
+}
+
+// s.mu must be held by the caller.
+func (s *Server) closeServerTransportsLocked() {
+	for _, conns := range s.conns {
+		for st := range conns {
+			st.Close(errors.New("Server.Stop called"))
+		}
+	}
+}
+
+// s.mu must be held by the caller.
+func (s *Server) drainAllServerTransportsLocked() {
 	if !s.drain {
 		for _, conns := range s.conns {
 			for st := range conns {
@@ -1861,22 +1988,14 @@
 		}
 		s.drain = true
 	}
+}
 
-	// Wait for serving threads to be ready to exit.  Only then can we be sure no
-	// new conns will be created.
-	s.mu.Unlock()
-	s.serveWG.Wait()
-	s.mu.Lock()
-
-	for len(s.conns) != 0 {
-		s.cv.Wait()
+// s.mu must be held by the caller.
+func (s *Server) closeListenersLocked() {
+	for lis := range s.lis {
+		lis.Close()
 	}
-	s.conns = nil
-	if s.events != nil {
-		s.events.Finish()
-		s.events = nil
-	}
-	s.mu.Unlock()
+	s.lis = nil
 }
 
 // contentSubtype must be lowercase
@@ -1886,15 +2005,54 @@
 		return s.opts.codec
 	}
 	if contentSubtype == "" {
-		return encoding.GetCodec(proto.Name)
+		return getCodec(proto.Name)
 	}
-	codec := encoding.GetCodec(contentSubtype)
+	codec := getCodec(contentSubtype)
 	if codec == nil {
-		return encoding.GetCodec(proto.Name)
+		logger.Warningf("Unsupported codec %q. Defaulting to %q for now. This will start to fail in future releases.", contentSubtype, proto.Name)
+		return getCodec(proto.Name)
 	}
 	return codec
 }
 
+type serverKey struct{}
+
+// serverFromContext gets the Server from the context.
+func serverFromContext(ctx context.Context) *Server {
+	s, _ := ctx.Value(serverKey{}).(*Server)
+	return s
+}
+
+// contextWithServer sets the Server in the context.
+func contextWithServer(ctx context.Context, server *Server) context.Context {
+	return context.WithValue(ctx, serverKey{}, server)
+}
+
+// isRegisteredMethod returns whether the passed in method is registered as a
+// method on the server. /service/method and service/method will match if the
+// service and method are registered on the server.
+func (s *Server) isRegisteredMethod(serviceMethod string) bool {
+	if serviceMethod != "" && serviceMethod[0] == '/' {
+		serviceMethod = serviceMethod[1:]
+	}
+	pos := strings.LastIndex(serviceMethod, "/")
+	if pos == -1 { // Invalid method name syntax.
+		return false
+	}
+	service := serviceMethod[:pos]
+	method := serviceMethod[pos+1:]
+	srv, knownService := s.services[service]
+	if knownService {
+		if _, ok := srv.methods[method]; ok {
+			return true
+		}
+		if _, ok := srv.streams[method]; ok {
+			return true
+		}
+	}
+	return false
+}
+
 // SetHeader sets the header metadata to be sent from the server to the client.
 // The context provided must be the context passed to the server's handler.
 //
@@ -1969,7 +2127,7 @@
 // Notice: This function is EXPERIMENTAL and may be changed or removed in a
 // later release.
 func SetSendCompressor(ctx context.Context, name string) error {
-	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
+	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.ServerStream)
 	if !ok || stream == nil {
 		return fmt.Errorf("failed to fetch the stream from the given context")
 	}
@@ -1991,12 +2149,12 @@
 // Notice: This function is EXPERIMENTAL and may be changed or removed in a
 // later release.
 func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
-	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
+	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.ServerStream)
 	if !ok || stream == nil {
 		return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
 	}
 
-	return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil
+	return stream.ClientAdvertisedCompressors(), nil
 }
 
 // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
@@ -2026,17 +2184,9 @@
 	return s.Method(), true
 }
 
-type channelzServer struct {
-	s *Server
-}
-
-func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
-	return c.s.channelzMetric()
-}
-
 // validateSendCompressor returns an error when given compressor name cannot be
 // handled by the server or the client based on the advertised compressors.
-func validateSendCompressor(name, clientCompressors string) error {
+func validateSendCompressor(name string, clientCompressors []string) error {
 	if name == encoding.Identity {
 		return nil
 	}
@@ -2045,10 +2195,41 @@
 		return fmt.Errorf("compressor not registered %q", name)
 	}
 
-	for _, c := range strings.Split(clientCompressors, ",") {
+	for _, c := range clientCompressors {
 		if c == name {
 			return nil // found match
 		}
 	}
 	return fmt.Errorf("client does not support compressor %q", name)
 }
+
+// atomicSemaphore implements a blocking, counting semaphore. acquire should be
+// called synchronously; release may be called asynchronously.
+type atomicSemaphore struct {
+	n    atomic.Int64
+	wait chan struct{}
+}
+
+func (q *atomicSemaphore) acquire() {
+	if q.n.Add(-1) < 0 {
+		// We ran out of quota.  Block until a release happens.
+		<-q.wait
+	}
+}
+
+func (q *atomicSemaphore) release() {
+	// N.B. the "<= 0" check below should allow for this to work with multiple
+	// concurrent calls to acquire, but also note that with synchronous calls to
+	// acquire, as our system does, n will never be less than -1.  There are
+	// fairness issues (queuing) to consider if this was to be generalized.
+	if q.n.Add(1) <= 0 {
+		// An acquire was waiting on us.  Unblock it.
+		q.wait <- struct{}{}
+	}
+}
+
+func newHandlerQuota(n uint32) *atomicSemaphore {
+	a := &atomicSemaphore{wait: make(chan struct{}, 1)}
+	a.n.Store(int64(n))
+	return a
+}