[VOL-5486] Fix deprecated versions

Change-Id: I3e03ea246020547ae75fa92ce8cf5cbba7e8f3bb
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index bb99940..0a0af89 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -23,41 +23,56 @@
 	"errors"
 	"io"
 	"math"
+	rand "math/rand/v2"
 	"strconv"
 	"sync"
 	"time"
 
-	"golang.org/x/net/trace"
 	"google.golang.org/grpc/balancer"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/encoding"
-	"google.golang.org/grpc/grpclog"
+	"google.golang.org/grpc/internal"
 	"google.golang.org/grpc/internal/balancerload"
 	"google.golang.org/grpc/internal/binarylog"
 	"google.golang.org/grpc/internal/channelz"
-	"google.golang.org/grpc/internal/grpcrand"
+	"google.golang.org/grpc/internal/grpcutil"
+	imetadata "google.golang.org/grpc/internal/metadata"
+	iresolver "google.golang.org/grpc/internal/resolver"
+	"google.golang.org/grpc/internal/serviceconfig"
+	istatus "google.golang.org/grpc/internal/status"
 	"google.golang.org/grpc/internal/transport"
+	"google.golang.org/grpc/mem"
 	"google.golang.org/grpc/metadata"
 	"google.golang.org/grpc/peer"
 	"google.golang.org/grpc/stats"
 	"google.golang.org/grpc/status"
 )
 
+var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
+
 // StreamHandler defines the handler called by gRPC server to complete the
-// execution of a streaming RPC. If a StreamHandler returns an error, it
-// should be produced by the status package, or else gRPC will use
-// codes.Unknown as the status code and err.Error() as the status message
-// of the RPC.
-type StreamHandler func(srv interface{}, stream ServerStream) error
+// execution of a streaming RPC.
+//
+// If a StreamHandler returns an error, it should either be produced by the
+// status package, or be one of the context errors. Otherwise, gRPC will use
+// codes.Unknown as the status code and err.Error() as the status message of the
+// RPC.
+type StreamHandler func(srv any, stream ServerStream) error
 
-// StreamDesc represents a streaming RPC service's method specification.
+// StreamDesc represents a streaming RPC service's method specification.  Used
+// on the server when registering services and on the client when initiating
+// new streams.
 type StreamDesc struct {
-	StreamName string
-	Handler    StreamHandler
+	// StreamName and Handler are only used when registering handlers on a
+	// server.
+	StreamName string        // the name of the method excluding the service
+	Handler    StreamHandler // the handler called for the method
 
-	// At least one of these is true.
-	ServerStreams bool
-	ClientStreams bool
+	// ServerStreams and ClientStreams are used for registering handlers on a
+	// server as well as defining RPC behavior when passed to NewClientStream
+	// and ClientConn.NewStream.  At least one must be true.
+	ServerStreams bool // indicates the server can perform streaming sends
+	ClientStreams bool // indicates the client can perform streaming sends
 }
 
 // Stream defines the common interface a client or server stream has to satisfy.
@@ -67,9 +82,9 @@
 	// Deprecated: See ClientStream and ServerStream documentation instead.
 	Context() context.Context
 	// Deprecated: See ClientStream and ServerStream documentation instead.
-	SendMsg(m interface{}) error
+	SendMsg(m any) error
 	// Deprecated: See ClientStream and ServerStream documentation instead.
-	RecvMsg(m interface{}) error
+	RecvMsg(m any) error
 }
 
 // ClientStream defines the client-side behavior of a streaming RPC.
@@ -78,15 +93,17 @@
 // status package.
 type ClientStream interface {
 	// Header returns the header metadata received from the server if there
-	// is any. It blocks if the metadata is not ready to read.
+	// is any. It blocks if the metadata is not ready to read.  If the metadata
+	// is nil and the error is also nil, then the stream was terminated without
+	// headers, and the status can be discovered by calling RecvMsg.
 	Header() (metadata.MD, error)
 	// Trailer returns the trailer metadata from the server, if there is any.
 	// It must only be called after stream.CloseAndRecv has returned, or
 	// stream.Recv has returned a non-nil error (including io.EOF).
 	Trailer() metadata.MD
-	// CloseSend closes the send direction of the stream. It closes the stream
-	// when non-nil error is met. It is also not safe to call CloseSend
-	// concurrently with SendMsg.
+	// CloseSend closes the send direction of the stream. This method always
+	// returns a nil error. The status of the stream may be discovered using
+	// RecvMsg. It is also not safe to call CloseSend concurrently with SendMsg.
 	CloseSend() error
 	// Context returns the context for this stream.
 	//
@@ -96,7 +113,9 @@
 	// SendMsg is generally called by generated code. On error, SendMsg aborts
 	// the stream. If the error was generated by the client, the status is
 	// returned directly; otherwise, io.EOF is returned and the status of
-	// the stream may be discovered using RecvMsg.
+	// the stream may be discovered using RecvMsg. For unary or server-streaming
+	// RPCs (StreamDesc.ClientStreams is false), a nil error is returned
+	// unconditionally.
 	//
 	// SendMsg blocks until:
 	//   - There is sufficient flow control to schedule m with the transport, or
@@ -111,7 +130,10 @@
 	// calling RecvMsg on the same stream at the same time, but it is not safe
 	// to call SendMsg on the same stream in different goroutines. It is also
 	// not safe to call CloseSend concurrently with SendMsg.
-	SendMsg(m interface{}) error
+	//
+	// It is not safe to modify the message after calling SendMsg. Tracing
+	// libraries and stats handlers may use the message lazily.
+	SendMsg(m any) error
 	// RecvMsg blocks until it receives a message into m or the stream is
 	// done. It returns io.EOF when the stream completes successfully. On
 	// any other error, the stream is aborted and the error contains the RPC
@@ -120,7 +142,7 @@
 	// It is safe to have a goroutine calling SendMsg and another goroutine
 	// calling RecvMsg on the same stream at the same time, but it is not
 	// safe to call RecvMsg on the same stream in different goroutines.
-	RecvMsg(m interface{}) error
+	RecvMsg(m any) error
 }
 
 // NewStream creates a new Stream for the client side. This is typically
@@ -129,13 +151,13 @@
 // To ensure resources are not leaked due to the stream returned, one of the following
 // actions must be performed:
 //
-//      1. Call Close on the ClientConn.
-//      2. Cancel the context provided.
-//      3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
-//         client-streaming RPC, for instance, might use the helper function
-//         CloseAndRecv (note that CloseSend does not Recv, therefore is not
-//         guaranteed to release all resources).
-//      4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
+//  1. Call Close on the ClientConn.
+//  2. Cancel the context provided.
+//  3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
+//     client-streaming RPC, for instance, might use the helper function
+//     CloseAndRecv (note that CloseSend does not Recv, therefore is not
+//     guaranteed to release all resources).
+//  4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
 //
 // If none of the above happen, a goroutine and a context will be leaked, and grpc
 // will not call the optionally-configured stats handler with a stats.End message.
@@ -156,6 +178,30 @@
 }
 
 func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
+	// Start tracking the RPC for idleness purposes. This is where a stream is
+	// created for both streaming and unary RPCs, and hence is a good place to
+	// track active RPC count.
+	if err := cc.idlenessMgr.OnCallBegin(); err != nil {
+		return nil, err
+	}
+	// Add a calloption, to decrement the active call count, that gets executed
+	// when the RPC completes.
+	opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
+
+	if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
+		// validate md
+		if err := imetadata.Validate(md); err != nil {
+			return nil, status.Error(codes.Internal, err.Error())
+		}
+		// validate added
+		for _, kvs := range added {
+			for i := 0; i < len(kvs); i += 2 {
+				if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
+					return nil, status.Error(codes.Internal, err.Error())
+				}
+			}
+		}
+	}
 	if channelz.IsOn() {
 		cc.incrCallsStarted()
 		defer func() {
@@ -164,15 +210,58 @@
 			}
 		}()
 	}
-	c := defaultCallInfo()
 	// Provide an opportunity for the first RPC to see the first service config
 	// provided by the resolver.
-	if err := cc.waitForResolvedAddrs(ctx); err != nil {
+	nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx)
+	if err != nil {
 		return nil, err
 	}
-	mc := cc.GetMethodConfig(method)
+
+	var mc serviceconfig.MethodConfig
+	var onCommit func()
+	newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
+		return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, nameResolutionDelayed, opts...)
+	}
+
+	rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
+	rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
+	if err != nil {
+		if st, ok := status.FromError(err); ok {
+			// Restrict the code to the list allowed by gRFC A54.
+			if istatus.IsRestrictedControlPlaneCode(st) {
+				err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
+			}
+			return nil, err
+		}
+		return nil, toRPCErr(err)
+	}
+
+	if rpcConfig != nil {
+		if rpcConfig.Context != nil {
+			ctx = rpcConfig.Context
+		}
+		mc = rpcConfig.MethodConfig
+		onCommit = rpcConfig.OnCommitted
+		if rpcConfig.Interceptor != nil {
+			rpcInfo.Context = nil
+			ns := newStream
+			newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
+				cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
+				if err != nil {
+					return nil, toRPCErr(err)
+				}
+				return cs, nil
+			}
+		}
+	}
+
+	return newStream(ctx, func() {})
+}
+
+func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), nameResolutionDelayed bool, opts ...CallOption) (_ iresolver.ClientStream, err error) {
+	callInfo := defaultCallInfo()
 	if mc.WaitForReady != nil {
-		c.failFast = !*mc.WaitForReady
+		callInfo.failFast = !*mc.WaitForReady
 	}
 
 	// Possible context leak:
@@ -193,106 +282,94 @@
 	}()
 
 	for _, o := range opts {
-		if err := o.before(c); err != nil {
+		if err := o.before(callInfo); err != nil {
 			return nil, toRPCErr(err)
 		}
 	}
-	c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
-	c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
-	if err := setCallInfoCodec(c); err != nil {
+	callInfo.maxSendMessageSize = getMaxSize(mc.MaxReqSize, callInfo.maxSendMessageSize, defaultClientMaxSendMessageSize)
+	callInfo.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, callInfo.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
+	if err := setCallInfoCodec(callInfo); err != nil {
 		return nil, err
 	}
 
 	callHdr := &transport.CallHdr{
 		Host:           cc.authority,
 		Method:         method,
-		ContentSubtype: c.contentSubtype,
+		ContentSubtype: callInfo.contentSubtype,
+		DoneFunc:       doneFunc,
+		Authority:      callInfo.authority,
 	}
 
 	// Set our outgoing compression according to the UseCompressor CallOption, if
 	// set.  In that case, also find the compressor from the encoding package.
 	// Otherwise, use the compressor configured by the WithCompressor DialOption,
 	// if set.
-	var cp Compressor
-	var comp encoding.Compressor
-	if ct := c.compressorType; ct != "" {
+	var compressorV0 Compressor
+	var compressorV1 encoding.Compressor
+	if ct := callInfo.compressorName; ct != "" {
 		callHdr.SendCompress = ct
 		if ct != encoding.Identity {
-			comp = encoding.GetCompressor(ct)
-			if comp == nil {
+			compressorV1 = encoding.GetCompressor(ct)
+			if compressorV1 == nil {
 				return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
 			}
 		}
-	} else if cc.dopts.cp != nil {
-		callHdr.SendCompress = cc.dopts.cp.Type()
-		cp = cc.dopts.cp
+	} else if cc.dopts.compressorV0 != nil {
+		callHdr.SendCompress = cc.dopts.compressorV0.Type()
+		compressorV0 = cc.dopts.compressorV0
 	}
-	if c.creds != nil {
-		callHdr.Creds = c.creds
-	}
-	var trInfo *traceInfo
-	if EnableTracing {
-		trInfo = &traceInfo{
-			tr: trace.New("grpc.Sent."+methodFamily(method), method),
-			firstLine: firstLine{
-				client: true,
-			},
-		}
-		if deadline, ok := ctx.Deadline(); ok {
-			trInfo.firstLine.deadline = time.Until(deadline)
-		}
-		trInfo.tr.LazyLog(&trInfo.firstLine, false)
-		ctx = trace.NewContext(ctx, trInfo.tr)
-	}
-	ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
-	sh := cc.dopts.copts.StatsHandler
-	var beginTime time.Time
-	if sh != nil {
-		ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
-		beginTime = time.Now()
-		begin := &stats.Begin{
-			Client:    true,
-			BeginTime: beginTime,
-			FailFast:  c.failFast,
-		}
-		sh.HandleRPC(ctx, begin)
+	if callInfo.creds != nil {
+		callHdr.Creds = callInfo.creds
 	}
 
 	cs := &clientStream{
-		callHdr:      callHdr,
-		ctx:          ctx,
-		methodConfig: &mc,
-		opts:         opts,
-		callInfo:     c,
-		cc:           cc,
-		desc:         desc,
-		codec:        c.codec,
-		cp:           cp,
-		comp:         comp,
-		cancel:       cancel,
-		beginTime:    beginTime,
-		firstAttempt: true,
+		callHdr:             callHdr,
+		ctx:                 ctx,
+		methodConfig:        &mc,
+		opts:                opts,
+		callInfo:            callInfo,
+		cc:                  cc,
+		desc:                desc,
+		codec:               callInfo.codec,
+		compressorV0:        compressorV0,
+		compressorV1:        compressorV1,
+		cancel:              cancel,
+		firstAttempt:        true,
+		onCommit:            onCommit,
+		nameResolutionDelay: nameResolutionDelayed,
 	}
 	if !cc.dopts.disableRetry {
 		cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
 	}
-	cs.binlog = binarylog.GetMethodLogger(method)
+	if ml := binarylog.GetMethodLogger(method); ml != nil {
+		cs.binlogs = append(cs.binlogs, ml)
+	}
+	if cc.dopts.binaryLogger != nil {
+		if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
+			cs.binlogs = append(cs.binlogs, ml)
+		}
+	}
 
-	cs.callInfo.stream = cs
-	// Only this initial attempt has stats/tracing.
-	// TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
-	if err := cs.newAttemptLocked(sh, trInfo); err != nil {
-		cs.finish(err)
+	// Pick the transport to use and create a new stream on the transport.
+	// Assign cs.attempt upon success.
+	op := func(a *csAttempt) error {
+		if err := a.getTransport(); err != nil {
+			return err
+		}
+		if err := a.newStream(); err != nil {
+			return err
+		}
+		// Because this operation is always called either here (while creating
+		// the clientStream) or by the retry code while locked when replaying
+		// the operation, it is safe to access cs.attempt directly.
+		cs.attempt = a
+		return nil
+	}
+	if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
 		return nil, err
 	}
 
-	op := func(a *csAttempt) error { return a.newStream() }
-	if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
-		cs.finish(err)
-		return nil, err
-	}
-
-	if cs.binlog != nil {
+	if len(cs.binlogs) != 0 {
 		md, _ := metadata.FromOutgoingContext(ctx)
 		logEntry := &binarylog.ClientHeader{
 			OnClientSide: true,
@@ -306,7 +383,9 @@
 				logEntry.Timeout = 0
 			}
 		}
-		cs.binlog.Log(logEntry)
+		for _, binlog := range cs.binlogs {
+			binlog.Log(cs.ctx, logEntry)
+		}
 	}
 
 	if desc != unaryStreamDesc {
@@ -327,49 +406,130 @@
 	return cs, nil
 }
 
-// newAttemptLocked creates a new attempt with a transport.
-// If it succeeds, then it replaces clientStream's attempt with this new attempt.
-func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
-	newAttempt := &csAttempt{
-		cs:           cs,
-		dc:           cs.cc.dopts.dc,
-		statsHandler: sh,
-		trInfo:       trInfo,
-	}
-	defer func() {
-		if retErr != nil {
-			// This attempt is not set in the clientStream, so it's finish won't
-			// be called. Call it here for stats and trace in case they are not
-			// nil.
-			newAttempt.finish(retErr)
-		}
-	}()
-
+// newAttemptLocked creates a new csAttempt without a transport or stream.
+func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
 	if err := cs.ctx.Err(); err != nil {
-		return toRPCErr(err)
+		return nil, toRPCErr(err)
 	}
-	t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
+	if err := cs.cc.ctx.Err(); err != nil {
+		return nil, ErrClientConnClosing
+	}
+
+	ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.compressorV0, cs.compressorV1)
+	method := cs.callHdr.Method
+	var beginTime time.Time
+	shs := cs.cc.dopts.copts.StatsHandlers
+	for _, sh := range shs {
+		ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast, NameResolutionDelay: cs.nameResolutionDelay})
+		beginTime = time.Now()
+		begin := &stats.Begin{
+			Client:                    true,
+			BeginTime:                 beginTime,
+			FailFast:                  cs.callInfo.failFast,
+			IsClientStream:            cs.desc.ClientStreams,
+			IsServerStream:            cs.desc.ServerStreams,
+			IsTransparentRetryAttempt: isTransparent,
+		}
+		sh.HandleRPC(ctx, begin)
+	}
+
+	var trInfo *traceInfo
+	if EnableTracing {
+		trInfo = &traceInfo{
+			tr: newTrace("grpc.Sent."+methodFamily(method), method),
+			firstLine: firstLine{
+				client: true,
+			},
+		}
+		if deadline, ok := ctx.Deadline(); ok {
+			trInfo.firstLine.deadline = time.Until(deadline)
+		}
+		trInfo.tr.LazyLog(&trInfo.firstLine, false)
+		ctx = newTraceContext(ctx, trInfo.tr)
+	}
+
+	if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {
+		// Add extra metadata (metadata that will be added by transport) to context
+		// so the balancer can see them.
+		ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
+			"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
+		))
+	}
+
+	return &csAttempt{
+		ctx:            ctx,
+		beginTime:      beginTime,
+		cs:             cs,
+		decompressorV0: cs.cc.dopts.dc,
+		statsHandlers:  shs,
+		trInfo:         trInfo,
+	}, nil
+}
+
+func (a *csAttempt) getTransport() error {
+	cs := a.cs
+
+	pickInfo := balancer.PickInfo{Ctx: a.ctx, FullMethodName: cs.callHdr.Method}
+	pick, err := cs.cc.pickerWrapper.pick(a.ctx, cs.callInfo.failFast, pickInfo)
+	a.transport, a.pickResult = pick.transport, pick.result
 	if err != nil {
+		if de, ok := err.(dropError); ok {
+			err = de.error
+			a.drop = true
+		}
 		return err
 	}
-	if trInfo != nil {
-		trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
+	if a.trInfo != nil {
+		a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
 	}
-	newAttempt.t = t
-	newAttempt.done = done
-	cs.attempt = newAttempt
+	if pick.blocked {
+		for _, sh := range a.statsHandlers {
+			sh.HandleRPC(a.ctx, &stats.DelayedPickComplete{})
+		}
+	}
 	return nil
 }
 
 func (a *csAttempt) newStream() error {
 	cs := a.cs
 	cs.callHdr.PreviousAttempts = cs.numRetries
-	s, err := a.t.NewStream(cs.ctx, cs.callHdr)
-	if err != nil {
-		return toRPCErr(err)
+
+	// Merge metadata stored in PickResult, if any, with existing call metadata.
+	// It is safe to overwrite the csAttempt's context here, since all state
+	// maintained in it are local to the attempt. When the attempt has to be
+	// retried, a new instance of csAttempt will be created.
+	if a.pickResult.Metadata != nil {
+		// We currently do not have a function it the metadata package which
+		// merges given metadata with existing metadata in a context. Existing
+		// function `AppendToOutgoingContext()` takes a variadic argument of key
+		// value pairs.
+		//
+		// TODO: Make it possible to retrieve key value pairs from metadata.MD
+		// in a form passable to AppendToOutgoingContext(), or create a version
+		// of AppendToOutgoingContext() that accepts a metadata.MD.
+		md, _ := metadata.FromOutgoingContext(a.ctx)
+		md = metadata.Join(md, a.pickResult.Metadata)
+		a.ctx = metadata.NewOutgoingContext(a.ctx, md)
 	}
-	cs.attempt.s = s
-	cs.attempt.p = &parser{r: s}
+
+	s, err := a.transport.NewStream(a.ctx, cs.callHdr)
+	if err != nil {
+		nse, ok := err.(*transport.NewStreamError)
+		if !ok {
+			// Unexpected.
+			return err
+		}
+
+		if nse.AllowTransparentRetry {
+			a.allowTransparentRetry = true
+		}
+
+		// Unwrap and convert error.
+		return toRPCErr(nse.Err)
+	}
+	a.transportStream = s
+	a.ctx = s.Context()
+	a.parser = &parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
 	return nil
 }
 
@@ -381,14 +541,15 @@
 	cc       *ClientConn
 	desc     *StreamDesc
 
-	codec baseCodec
-	cp    Compressor
-	comp  encoding.Compressor
+	codec        baseCodec
+	compressorV0 Compressor
+	compressorV1 encoding.Compressor
 
 	cancel context.CancelFunc // cancels all attempts
 
-	sentLast  bool // sent an end stream
-	beginTime time.Time
+	sentLast bool // sent an end stream
+
+	receivedFirstMsg bool // set after the first message is received
 
 	methodConfig *MethodConfig
 
@@ -396,7 +557,7 @@
 
 	retryThrottler *retryThrottler // The throttler active when the RPC began.
 
-	binlog *binarylog.MethodLogger // Binary logger, can be nil.
+	binlogs []binarylog.MethodLogger
 	// serverHeaderBinlogged is a boolean for whether server header has been
 	// logged. Server header will be logged when the first time one of those
 	// happens: stream.Header(), stream.Recv().
@@ -419,24 +580,34 @@
 	// place where we need to check if the attempt is nil.
 	attempt *csAttempt
 	// TODO(hedging): hedging will have multiple attempts simultaneously.
-	committed  bool                       // active attempt committed for retry?
-	buffer     []func(a *csAttempt) error // operations to replay on retry
-	bufferSize int                        // current size of buffer
+	committed        bool // active attempt committed for retry?
+	onCommit         func()
+	replayBuffer     []replayOp // operations to replay on retry
+	replayBufferSize int        // current size of replayBuffer
+	// nameResolutionDelay indicates if there was a delay in the name resolution.
+	// This field is only valid on client side, it's always false on server side.
+	nameResolutionDelay bool
+}
+
+type replayOp struct {
+	op      func(a *csAttempt) error
+	cleanup func()
 }
 
 // csAttempt implements a single transport stream attempt within a
 // clientStream.
 type csAttempt struct {
-	cs   *clientStream
-	t    transport.ClientTransport
-	s    *transport.Stream
-	p    *parser
-	done func(balancer.DoneInfo)
+	ctx             context.Context
+	cs              *clientStream
+	transport       transport.ClientTransport
+	transportStream *transport.ClientStream
+	parser          *parser
+	pickResult      balancer.PickResult
 
-	finished  bool
-	dc        Decompressor
-	decomp    encoding.Compressor
-	decompSet bool
+	finished        bool
+	decompressorV0  Decompressor
+	decompressorV1  encoding.Compressor
+	decompressorSet bool
 
 	mu sync.Mutex // guards trInfo.tr
 	// trInfo may be nil (if EnableTracing is false).
@@ -444,12 +615,26 @@
 	// and cleared when the finish method is called.
 	trInfo *traceInfo
 
-	statsHandler stats.Handler
+	statsHandlers []stats.Handler
+	beginTime     time.Time
+
+	// set for newStream errors that may be transparently retried
+	allowTransparentRetry bool
+	// set for pick errors that are returned as a status
+	drop bool
 }
 
 func (cs *clientStream) commitAttemptLocked() {
+	if !cs.committed && cs.onCommit != nil {
+		cs.onCommit()
+	}
 	cs.committed = true
-	cs.buffer = nil
+	for _, op := range cs.replayBuffer {
+		if op.cleanup != nil {
+			op.cleanup()
+		}
+	}
+	cs.replayBuffer = nil
 }
 
 func (cs *clientStream) commitAttempt() {
@@ -459,76 +644,76 @@
 }
 
 // shouldRetry returns nil if the RPC should be retried; otherwise it returns
-// the error that should be returned by the operation.
-func (cs *clientStream) shouldRetry(err error) error {
-	if cs.attempt.s == nil && !cs.callInfo.failFast {
-		// In the event of any error from NewStream (attempt.s == nil), we
-		// never attempted to write anything to the wire, so we can retry
-		// indefinitely for non-fail-fast RPCs.
-		return nil
+// the error that should be returned by the operation.  If the RPC should be
+// retried, the bool indicates whether it is being retried transparently.
+func (a *csAttempt) shouldRetry(err error) (bool, error) {
+	cs := a.cs
+
+	if cs.finished || cs.committed || a.drop {
+		// RPC is finished or committed or was dropped by the picker; cannot retry.
+		return false, err
 	}
-	if cs.finished || cs.committed {
-		// RPC is finished or committed; cannot retry.
-		return err
+	if a.transportStream == nil && a.allowTransparentRetry {
+		return true, nil
 	}
 	// Wait for the trailers.
-	if cs.attempt.s != nil {
-		<-cs.attempt.s.Done()
+	unprocessed := false
+	if a.transportStream != nil {
+		<-a.transportStream.Done()
+		unprocessed = a.transportStream.Unprocessed()
 	}
-	if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
+	if cs.firstAttempt && unprocessed {
 		// First attempt, stream unprocessed: transparently retry.
-		cs.firstAttempt = false
-		return nil
+		return true, nil
 	}
-	cs.firstAttempt = false
 	if cs.cc.dopts.disableRetry {
-		return err
+		return false, err
 	}
 
 	pushback := 0
 	hasPushback := false
-	if cs.attempt.s != nil {
-		if !cs.attempt.s.TrailersOnly() {
-			return err
+	if a.transportStream != nil {
+		if !a.transportStream.TrailersOnly() {
+			return false, err
 		}
 
 		// TODO(retry): Move down if the spec changes to not check server pushback
 		// before considering this a failure for throttling.
-		sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
+		sps := a.transportStream.Trailer()["grpc-retry-pushback-ms"]
 		if len(sps) == 1 {
 			var e error
 			if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
-				grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
+				channelz.Infof(logger, cs.cc.channelz, "Server retry pushback specified to abort (%q).", sps[0])
 				cs.retryThrottler.throttle() // This counts as a failure for throttling.
-				return err
+				return false, err
 			}
 			hasPushback = true
 		} else if len(sps) > 1 {
-			grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
+			channelz.Warningf(logger, cs.cc.channelz, "Server retry pushback specified multiple values (%q); not retrying.", sps)
 			cs.retryThrottler.throttle() // This counts as a failure for throttling.
-			return err
+			return false, err
 		}
 	}
 
 	var code codes.Code
-	if cs.attempt.s != nil {
-		code = cs.attempt.s.Status().Code()
+	if a.transportStream != nil {
+		code = a.transportStream.Status().Code()
 	} else {
-		code = status.Convert(err).Code()
+		code = status.Code(err)
 	}
 
-	rp := cs.methodConfig.retryPolicy
-	if rp == nil || !rp.retryableStatusCodes[code] {
-		return err
+	rp := cs.methodConfig.RetryPolicy
+	if rp == nil || !rp.RetryableStatusCodes[code] {
+		return false, err
 	}
 
 	// Note: the ordering here is important; we count this as a failure
 	// only if the code matched a retryable code.
 	if cs.retryThrottler.throttle() {
-		return err
+		return false, err
 	}
-	if cs.numRetries+1 >= rp.maxAttempts {
-		return err
+	if cs.numRetries+1 >= rp.MaxAttempts {
+		return false, err
 	}
 
 	var dur time.Duration
@@ -536,12 +721,11 @@
 		dur = time.Millisecond * time.Duration(pushback)
 		cs.numRetriesSincePushback = 0
 	} else {
-		fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
-		cur := float64(rp.initialBackoff) * fact
-		if max := float64(rp.maxBackoff); cur > max {
-			cur = max
-		}
-		dur = time.Duration(grpcrand.Int63n(int64(cur)))
+		fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
+		cur := min(float64(rp.InitialBackoff)*fact, float64(rp.MaxBackoff))
+		// Apply jitter by multiplying with a random factor between 0.8 and 1.2
+		cur *= 0.8 + 0.4*rand.Float64()
+		dur = time.Duration(int64(cur))
 		cs.numRetriesSincePushback++
 	}
 
@@ -551,25 +735,32 @@
 	select {
 	case <-t.C:
 		cs.numRetries++
-		return nil
+		return false, nil
 	case <-cs.ctx.Done():
 		t.Stop()
-		return status.FromContextError(cs.ctx.Err()).Err()
+		return false, status.FromContextError(cs.ctx.Err()).Err()
 	}
 }
 
 // Returns nil if a retry was performed and succeeded; error otherwise.
-func (cs *clientStream) retryLocked(lastErr error) error {
+func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
 	for {
-		cs.attempt.finish(lastErr)
-		if err := cs.shouldRetry(lastErr); err != nil {
+		attempt.finish(toRPCErr(lastErr))
+		isTransparent, err := attempt.shouldRetry(lastErr)
+		if err != nil {
 			cs.commitAttemptLocked()
 			return err
 		}
-		if err := cs.newAttemptLocked(nil, nil); err != nil {
+		cs.firstAttempt = false
+		attempt, err = cs.newAttemptLocked(isTransparent)
+		if err != nil {
+			// Only returns error if the clientconn is closed or the context of
+			// the stream is canceled.
 			return err
 		}
-		if lastErr = cs.replayBufferLocked(); lastErr == nil {
+		// Note that the first op in replayBuffer always sets cs.attempt
+		// if it is able to pick a transport and create a stream.
+		if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
 			return nil
 		}
 	}
@@ -579,7 +770,10 @@
 	cs.commitAttempt()
 	// No need to lock before using attempt, since we know it is committed and
 	// cannot change.
-	return cs.attempt.s.Context()
+	if cs.attempt.transportStream != nil {
+		return cs.attempt.transportStream.Context()
+	}
+	return cs.ctx
 }
 
 func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
@@ -587,7 +781,23 @@
 	for {
 		if cs.committed {
 			cs.mu.Unlock()
-			return op(cs.attempt)
+			// toRPCErr is used in case the error from the attempt comes from
+			// NewClientStream, which intentionally doesn't return a status
+			// error to allow for further inspection; all other errors should
+			// already be status errors.
+			return toRPCErr(op(cs.attempt))
+		}
+		if len(cs.replayBuffer) == 0 {
+			// For the first op, which controls creation of the stream and
+			// assigns cs.attempt, we need to create a new attempt inline
+			// before executing the first op.  On subsequent ops, the attempt
+			// is created immediately before replaying the ops.
+			var err error
+			if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
+				cs.mu.Unlock()
+				cs.finish(err)
+				return err
+			}
 		}
 		a := cs.attempt
 		cs.mu.Unlock()
@@ -598,14 +808,14 @@
 			continue
 		}
 		if err == io.EOF {
-			<-a.s.Done()
+			<-a.transportStream.Done()
 		}
-		if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
+		if err == nil || (err == io.EOF && a.transportStream.Status().Code() == codes.OK) {
 			onSuccess()
 			cs.mu.Unlock()
 			return err
 		}
-		if err := cs.retryLocked(err); err != nil {
+		if err := cs.retryLocked(a, err); err != nil {
 			cs.mu.Unlock()
 			return err
 		}
@@ -616,15 +826,24 @@
 	var m metadata.MD
 	err := cs.withRetry(func(a *csAttempt) error {
 		var err error
-		m, err = a.s.Header()
+		m, err = a.transportStream.Header()
 		return toRPCErr(err)
 	}, cs.commitAttemptLocked)
+
+	if m == nil && err == nil {
+		// The stream ended with success.  Finish the clientStream.
+		err = io.EOF
+	}
+
 	if err != nil {
 		cs.finish(err)
-		return nil, err
+		// Do not return the error.  The user should get it by calling Recv().
+		return nil, nil
 	}
-	if cs.binlog != nil && !cs.serverHeaderBinlogged {
-		// Only log if binary log is on and header has not been logged.
+
+	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && m != nil {
+		// Only log if binary log is on and header has not been logged, and
+		// there is actually headers to log.
 		logEntry := &binarylog.ServerHeader{
 			OnClientSide: true,
 			Header:       m,
@@ -633,10 +852,13 @@
 		if peer, ok := peer.FromContext(cs.Context()); ok {
 			logEntry.PeerAddr = peer.Addr
 		}
-		cs.binlog.Log(logEntry)
 		cs.serverHeaderBinlogged = true
+		for _, binlog := range cs.binlogs {
+			binlog.Log(cs.ctx, logEntry)
+		}
 	}
-	return m, err
+
+	return m, nil
 }
 
 func (cs *clientStream) Trailer() metadata.MD {
@@ -648,36 +870,36 @@
 	// directions -- it will prevent races and should not meaningfully impact
 	// performance.
 	cs.commitAttempt()
-	if cs.attempt.s == nil {
+	if cs.attempt.transportStream == nil {
 		return nil
 	}
-	return cs.attempt.s.Trailer()
+	return cs.attempt.transportStream.Trailer()
 }
 
-func (cs *clientStream) replayBufferLocked() error {
-	a := cs.attempt
-	for _, f := range cs.buffer {
-		if err := f(a); err != nil {
+func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
+	for _, f := range cs.replayBuffer {
+		if err := f.op(attempt); err != nil {
 			return err
 		}
 	}
 	return nil
 }
 
-func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
+func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error, cleanup func()) {
 	// Note: we still will buffer if retry is disabled (for transparent retries).
 	if cs.committed {
 		return
 	}
-	cs.bufferSize += sz
-	if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
+	cs.replayBufferSize += sz
+	if cs.replayBufferSize > cs.callInfo.maxRetryRPCBufferSize {
 		cs.commitAttemptLocked()
+		cleanup()
 		return
 	}
-	cs.buffer = append(cs.buffer, op)
+	cs.replayBuffer = append(cs.replayBuffer, replayOp{op: op, cleanup: cleanup})
 }
 
-func (cs *clientStream) SendMsg(m interface{}) (err error) {
+func (cs *clientStream) SendMsg(m any) (err error) {
 	defer func() {
 		if err != nil && err != io.EOF {
 			// Call finish on the client stream for errors generated by this SendMsg
@@ -696,95 +918,114 @@
 	}
 
 	// load hdr, payload, data
-	hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
+	hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.compressorV0, cs.compressorV1, cs.cc.dopts.copts.BufferPool)
 	if err != nil {
 		return err
 	}
 
+	defer func() {
+		data.Free()
+		// only free payload if compression was made, and therefore it is a different set
+		// of buffers from data.
+		if pf.isCompressed() {
+			payload.Free()
+		}
+	}()
+
+	dataLen := data.Len()
+	payloadLen := payload.Len()
 	// TODO(dfawley): should we be checking len(data) instead?
-	if len(payload) > *cs.callInfo.maxSendMessageSize {
-		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
+	if payloadLen > *cs.callInfo.maxSendMessageSize {
+		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, *cs.callInfo.maxSendMessageSize)
 	}
-	msgBytes := data // Store the pointer before setting to nil. For binary logging.
+
+	// always take an extra ref in case data == payload (i.e. when the data isn't
+	// compressed). The original ref will always be freed by the deferred free above.
+	payload.Ref()
 	op := func(a *csAttempt) error {
-		err := a.sendMsg(m, hdr, payload, data)
-		// nil out the message and uncomp when replaying; they are only needed for
-		// stats which is disabled for subsequent attempts.
-		m, data = nil, nil
-		return err
+		return a.sendMsg(m, hdr, payload, dataLen, payloadLen)
 	}
-	err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
-	if cs.binlog != nil && err == nil {
-		cs.binlog.Log(&binarylog.ClientMessage{
+
+	// onSuccess is invoked when the op is captured for a subsequent retry. If the
+	// stream was established by a previous message and therefore retries are
+	// disabled, onSuccess will not be invoked, and payloadRef can be freed
+	// immediately.
+	onSuccessCalled := false
+	err = cs.withRetry(op, func() {
+		cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free)
+		onSuccessCalled = true
+	})
+	if !onSuccessCalled {
+		payload.Free()
+	}
+	if len(cs.binlogs) != 0 && err == nil {
+		cm := &binarylog.ClientMessage{
 			OnClientSide: true,
-			Message:      msgBytes,
-		})
+			Message:      data.Materialize(),
+		}
+		for _, binlog := range cs.binlogs {
+			binlog.Log(cs.ctx, cm)
+		}
 	}
-	return
+	return err
 }
 
-func (cs *clientStream) RecvMsg(m interface{}) error {
-	if cs.binlog != nil && !cs.serverHeaderBinlogged {
+func (cs *clientStream) RecvMsg(m any) error {
+	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
 		// Call Header() to binary log header if it's not already logged.
 		cs.Header()
 	}
 	var recvInfo *payloadInfo
-	if cs.binlog != nil {
+	if len(cs.binlogs) != 0 {
 		recvInfo = &payloadInfo{}
+		defer recvInfo.free()
 	}
 	err := cs.withRetry(func(a *csAttempt) error {
 		return a.recvMsg(m, recvInfo)
 	}, cs.commitAttemptLocked)
-	if cs.binlog != nil && err == nil {
-		cs.binlog.Log(&binarylog.ServerMessage{
+	if len(cs.binlogs) != 0 && err == nil {
+		sm := &binarylog.ServerMessage{
 			OnClientSide: true,
-			Message:      recvInfo.uncompressedBytes,
-		})
+			Message:      recvInfo.uncompressedBytes.Materialize(),
+		}
+		for _, binlog := range cs.binlogs {
+			binlog.Log(cs.ctx, sm)
+		}
 	}
 	if err != nil || !cs.desc.ServerStreams {
 		// err != nil or non-server-streaming indicates end of stream.
 		cs.finish(err)
-
-		if cs.binlog != nil {
-			// finish will not log Trailer. Log Trailer here.
-			logEntry := &binarylog.ServerTrailer{
-				OnClientSide: true,
-				Trailer:      cs.Trailer(),
-				Err:          err,
-			}
-			if logEntry.Err == io.EOF {
-				logEntry.Err = nil
-			}
-			if peer, ok := peer.FromContext(cs.Context()); ok {
-				logEntry.PeerAddr = peer.Addr
-			}
-			cs.binlog.Log(logEntry)
-		}
 	}
 	return err
 }
 
 func (cs *clientStream) CloseSend() error {
 	if cs.sentLast {
-		// TODO: return an error and finish the stream instead, due to API misuse?
+		// Return a nil error on repeated calls to this method.
 		return nil
 	}
 	cs.sentLast = true
 	op := func(a *csAttempt) error {
-		a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
+		a.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
 		// Always return nil; io.EOF is the only error that might make sense
 		// instead, but there is no need to signal the client to call RecvMsg
 		// as the only use left for the stream after CloseSend is to call
 		// RecvMsg.  This also matches historical behavior.
 		return nil
 	}
-	cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
-	if cs.binlog != nil {
-		cs.binlog.Log(&binarylog.ClientHalfClose{
+	cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) })
+	if len(cs.binlogs) != 0 {
+		chc := &binarylog.ClientHalfClose{
 			OnClientSide: true,
-		})
+		}
+		for _, binlog := range cs.binlogs {
+			binlog.Log(cs.ctx, chc)
+		}
 	}
-	// We never returned an error here for reasons.
+	// We don't return an error here as we expect users to read all messages
+	// from the stream and get the RPC status from RecvMsg().  Note that
+	// SendMsg() must return an error when one occurs so the application
+	// knows to stop sending messages, but that does not apply here.
 	return nil
 }
 
@@ -799,17 +1040,44 @@
 		return
 	}
 	cs.finished = true
+	for _, onFinish := range cs.callInfo.onFinish {
+		onFinish(err)
+	}
 	cs.commitAttemptLocked()
+	if cs.attempt != nil {
+		cs.attempt.finish(err)
+		// after functions all rely upon having a stream.
+		if cs.attempt.transportStream != nil {
+			for _, o := range cs.opts {
+				o.after(cs.callInfo, cs.attempt)
+			}
+		}
+	}
+
 	cs.mu.Unlock()
-	// For binary logging. only log cancel in finish (could be caused by RPC ctx
-	// canceled or ClientConn closed). Trailer will be logged in RecvMsg.
-	//
-	// Only one of cancel or trailer needs to be logged. In the cases where
-	// users don't call RecvMsg, users must have already canceled the RPC.
-	if cs.binlog != nil && status.Code(err) == codes.Canceled {
-		cs.binlog.Log(&binarylog.Cancel{
-			OnClientSide: true,
-		})
+	// Only one of cancel or trailer needs to be logged.
+	if len(cs.binlogs) != 0 {
+		switch err {
+		case errContextCanceled, errContextDeadline, ErrClientConnClosing:
+			c := &binarylog.Cancel{
+				OnClientSide: true,
+			}
+			for _, binlog := range cs.binlogs {
+				binlog.Log(cs.ctx, c)
+			}
+		default:
+			logEntry := &binarylog.ServerTrailer{
+				OnClientSide: true,
+				Trailer:      cs.Trailer(),
+				Err:          err,
+			}
+			if peer, ok := peer.FromContext(cs.Context()); ok {
+				logEntry.PeerAddr = peer.Addr
+			}
+			for _, binlog := range cs.binlogs {
+				binlog.Log(cs.ctx, logEntry)
+			}
+		}
 	}
 	if err == nil {
 		cs.retryThrottler.successfulRPC()
@@ -821,19 +1089,10 @@
 			cs.cc.incrCallsSucceeded()
 		}
 	}
-	if cs.attempt != nil {
-		cs.attempt.finish(err)
-		// after functions all rely upon having a stream.
-		if cs.attempt.s != nil {
-			for _, o := range cs.opts {
-				o.after(cs.callInfo)
-			}
-		}
-	}
 	cs.cancel()
 }
 
-func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
+func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error {
 	cs := a.cs
 	if a.trInfo != nil {
 		a.mu.Lock()
@@ -842,7 +1101,7 @@
 		}
 		a.mu.Unlock()
 	}
-	if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
+	if err := a.transportStream.Write(hdr, payld, &transport.WriteOptions{Last: !cs.desc.ClientStreams}); err != nil {
 		if !cs.desc.ClientStreams {
 			// For non-client-streaming RPCs, we return nil instead of EOF on error
 			// because the generated code requires it.  finish is not called; RecvMsg()
@@ -851,47 +1110,52 @@
 		}
 		return io.EOF
 	}
-	if a.statsHandler != nil {
-		a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
-	}
-	if channelz.IsOn() {
-		a.t.IncrMsgSent()
+	if len(a.statsHandlers) != 0 {
+		for _, sh := range a.statsHandlers {
+			sh.HandleRPC(a.ctx, outPayload(true, m, dataLength, payloadLength, time.Now()))
+		}
 	}
 	return nil
 }
 
-func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
+func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
 	cs := a.cs
-	if a.statsHandler != nil && payInfo == nil {
+	if len(a.statsHandlers) != 0 && payInfo == nil {
 		payInfo = &payloadInfo{}
+		defer payInfo.free()
 	}
 
-	if !a.decompSet {
+	if !a.decompressorSet {
 		// Block until we receive headers containing received message encoding.
-		if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
-			if a.dc == nil || a.dc.Type() != ct {
+		if ct := a.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
+			if a.decompressorV0 == nil || a.decompressorV0.Type() != ct {
 				// No configured decompressor, or it does not match the incoming
 				// message encoding; attempt to find a registered compressor that does.
-				a.dc = nil
-				a.decomp = encoding.GetCompressor(ct)
+				a.decompressorV0 = nil
+				a.decompressorV1 = encoding.GetCompressor(ct)
 			}
 		} else {
 			// No compression is used; disable our decompressor.
-			a.dc = nil
+			a.decompressorV0 = nil
 		}
 		// Only initialize this state once per stream.
-		a.decompSet = true
+		a.decompressorSet = true
 	}
-	err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
-	if err != nil {
+	if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil {
 		if err == io.EOF {
-			if statusErr := a.s.Status().Err(); statusErr != nil {
+			if statusErr := a.transportStream.Status().Err(); statusErr != nil {
 				return statusErr
 			}
+			// Received no msg and status OK for non-server streaming rpcs.
+			if !cs.desc.ServerStreams && !cs.receivedFirstMsg {
+				return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
+			}
 			return io.EOF // indicates successful end of stream.
 		}
+
 		return toRPCErr(err)
 	}
+	cs.receivedFirstMsg = true
 	if a.trInfo != nil {
 		a.mu.Lock()
 		if a.trInfo.tr != nil {
@@ -899,34 +1163,28 @@
 		}
 		a.mu.Unlock()
 	}
-	if a.statsHandler != nil {
-		a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
-			Client:   true,
-			RecvTime: time.Now(),
-			Payload:  m,
-			// TODO truncate large payload.
-			Data:       payInfo.uncompressedBytes,
-			WireLength: payInfo.wireLength,
-			Length:     len(payInfo.uncompressedBytes),
+	for _, sh := range a.statsHandlers {
+		sh.HandleRPC(a.ctx, &stats.InPayload{
+			Client:           true,
+			RecvTime:         time.Now(),
+			Payload:          m,
+			WireLength:       payInfo.compressedLength + headerLen,
+			CompressedLength: payInfo.compressedLength,
+			Length:           payInfo.uncompressedBytes.Len(),
 		})
 	}
-	if channelz.IsOn() {
-		a.t.IncrMsgRecv()
-	}
 	if cs.desc.ServerStreams {
 		// Subsequent messages should be received by subsequent RecvMsg calls.
 		return nil
 	}
 	// Special handling for non-server-stream rpcs.
 	// This recv expects EOF or errors, so we don't collect inPayload.
-	err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
-	if err == nil {
-		return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
+	if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF {
+		return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success
+	} else if err != nil {
+		return toRPCErr(err)
 	}
-	if err == io.EOF {
-		return a.s.Status().Err() // non-server streaming Recv returns nil on success
-	}
-	return toRPCErr(err)
+	return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
 }
 
 func (a *csAttempt) finish(err error) {
@@ -941,33 +1199,33 @@
 		err = nil
 	}
 	var tr metadata.MD
-	if a.s != nil {
-		a.t.CloseStream(a.s, err)
-		tr = a.s.Trailer()
+	if a.transportStream != nil {
+		a.transportStream.Close(err)
+		tr = a.transportStream.Trailer()
 	}
 
-	if a.done != nil {
+	if a.pickResult.Done != nil {
 		br := false
-		if a.s != nil {
-			br = a.s.BytesReceived()
+		if a.transportStream != nil {
+			br = a.transportStream.BytesReceived()
 		}
-		a.done(balancer.DoneInfo{
+		a.pickResult.Done(balancer.DoneInfo{
 			Err:           err,
 			Trailer:       tr,
-			BytesSent:     a.s != nil,
+			BytesSent:     a.transportStream != nil,
 			BytesReceived: br,
 			ServerLoad:    balancerload.Parse(tr),
 		})
 	}
-	if a.statsHandler != nil {
+	for _, sh := range a.statsHandlers {
 		end := &stats.End{
 			Client:    true,
-			BeginTime: a.cs.beginTime,
+			BeginTime: a.beginTime,
 			EndTime:   time.Now(),
 			Trailer:   tr,
 			Error:     err,
 		}
-		a.statsHandler.HandleRPC(a.cs.ctx, end)
+		sh.HandleRPC(a.ctx, end)
 	}
 	if a.trInfo != nil && a.trInfo.tr != nil {
 		if err == nil {
@@ -982,12 +1240,12 @@
 	a.mu.Unlock()
 }
 
-// newClientStream creates a ClientStream with the specified transport, on the
+// newNonRetryClientStream creates a ClientStream with the specified transport, on the
 // given addrConn.
 //
 // It's expected that the given transport is either the same one in addrConn, or
 // is already closed. To avoid race, transport is specified separately, instead
-// of using ac.transpot.
+// of using ac.transport.
 //
 // Main difference between this and ClientConn.NewStream:
 // - no retry
@@ -1036,7 +1294,7 @@
 	// if set.
 	var cp Compressor
 	var comp encoding.Compressor
-	if ct := c.compressorType; ct != "" {
+	if ct := c.compressorName; ct != "" {
 		callHdr.SendCompress = ct
 		if ct != encoding.Identity {
 			comp = encoding.GetCompressor(ct)
@@ -1044,9 +1302,9 @@
 				return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
 			}
 		}
-	} else if ac.cc.dopts.cp != nil {
-		callHdr.SendCompress = ac.cc.dopts.cp.Type()
-		cp = ac.cc.dopts.cp
+	} else if ac.cc.dopts.compressorV0 != nil {
+		callHdr.SendCompress = ac.cc.dopts.compressorV0.Type()
+		cp = ac.cc.dopts.compressorV0
 	}
 	if c.creds != nil {
 		callHdr.Creds = c.creds
@@ -1054,37 +1312,41 @@
 
 	// Use a special addrConnStream to avoid retry.
 	as := &addrConnStream{
-		callHdr:  callHdr,
-		ac:       ac,
-		ctx:      ctx,
-		cancel:   cancel,
-		opts:     opts,
-		callInfo: c,
-		desc:     desc,
-		codec:    c.codec,
-		cp:       cp,
-		comp:     comp,
-		t:        t,
+		callHdr:          callHdr,
+		ac:               ac,
+		ctx:              ctx,
+		cancel:           cancel,
+		opts:             opts,
+		callInfo:         c,
+		desc:             desc,
+		codec:            c.codec,
+		sendCompressorV0: cp,
+		sendCompressorV1: comp,
+		transport:        t,
 	}
 
-	as.callInfo.stream = as
-	s, err := as.t.NewStream(as.ctx, as.callHdr)
+	s, err := as.transport.NewStream(as.ctx, as.callHdr)
 	if err != nil {
 		err = toRPCErr(err)
 		return nil, err
 	}
-	as.s = s
-	as.p = &parser{r: s}
+	as.transportStream = s
+	as.parser = &parser{r: s, bufferPool: ac.dopts.copts.BufferPool}
 	ac.incrCallsStarted()
 	if desc != unaryStreamDesc {
-		// Listen on cc and stream contexts to cleanup when the user closes the
-		// ClientConn or cancels the stream context.  In all other cases, an error
-		// should already be injected into the recv buffer by the transport, which
-		// the client will eventually receive, and then we will cancel the stream's
-		// context in clientStream.finish.
+		// Listen on stream context to cleanup when the stream context is
+		// canceled.  Also listen for the addrConn's context in case the
+		// addrConn is closed or reconnects to a different address.  In all
+		// other cases, an error should already be injected into the recv
+		// buffer by the transport, which the client will eventually receive,
+		// and then we will cancel the stream's context in
+		// addrConnStream.finish.
 		go func() {
+			ac.mu.Lock()
+			acCtx := ac.ctx
+			ac.mu.Unlock()
 			select {
-			case <-ac.ctx.Done():
+			case <-acCtx.Done():
 				as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
 			case <-ctx.Done():
 				as.finish(toRPCErr(ctx.Err()))
@@ -1095,29 +1357,32 @@
 }
 
 type addrConnStream struct {
-	s         *transport.Stream
-	ac        *addrConn
-	callHdr   *transport.CallHdr
-	cancel    context.CancelFunc
-	opts      []CallOption
-	callInfo  *callInfo
-	t         transport.ClientTransport
-	ctx       context.Context
-	sentLast  bool
-	desc      *StreamDesc
-	codec     baseCodec
-	cp        Compressor
-	comp      encoding.Compressor
-	decompSet bool
-	dc        Decompressor
-	decomp    encoding.Compressor
-	p         *parser
-	mu        sync.Mutex
-	finished  bool
+	transportStream  *transport.ClientStream
+	ac               *addrConn
+	callHdr          *transport.CallHdr
+	cancel           context.CancelFunc
+	opts             []CallOption
+	callInfo         *callInfo
+	transport        transport.ClientTransport
+	ctx              context.Context
+	sentLast         bool
+	receivedFirstMsg bool
+	desc             *StreamDesc
+	codec            baseCodec
+	sendCompressorV0 Compressor
+	sendCompressorV1 encoding.Compressor
+	decompressorSet  bool
+	decompressorV0   Decompressor
+	decompressorV1   encoding.Compressor
+	parser           *parser
+
+	// mu guards finished and is held for the entire finish method.
+	mu       sync.Mutex
+	finished bool
 }
 
 func (as *addrConnStream) Header() (metadata.MD, error) {
-	m, err := as.s.Header()
+	m, err := as.transportStream.Header()
 	if err != nil {
 		as.finish(toRPCErr(err))
 	}
@@ -1125,17 +1390,17 @@
 }
 
 func (as *addrConnStream) Trailer() metadata.MD {
-	return as.s.Trailer()
+	return as.transportStream.Trailer()
 }
 
 func (as *addrConnStream) CloseSend() error {
 	if as.sentLast {
-		// TODO: return an error and finish the stream instead, due to API misuse?
+		// Return a nil error on repeated calls to this method.
 		return nil
 	}
 	as.sentLast = true
 
-	as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
+	as.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
 	// Always return nil; io.EOF is the only error that might make sense
 	// instead, but there is no need to signal the client to call RecvMsg
 	// as the only use left for the stream after CloseSend is to call
@@ -1144,10 +1409,10 @@
 }
 
 func (as *addrConnStream) Context() context.Context {
-	return as.s.Context()
+	return as.transportStream.Context()
 }
 
-func (as *addrConnStream) SendMsg(m interface{}) (err error) {
+func (as *addrConnStream) SendMsg(m any) (err error) {
 	defer func() {
 		if err != nil && err != io.EOF {
 			// Call finish on the client stream for errors generated by this SendMsg
@@ -1166,17 +1431,26 @@
 	}
 
 	// load hdr, payload, data
-	hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
+	hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.sendCompressorV0, as.sendCompressorV1, as.ac.dopts.copts.BufferPool)
 	if err != nil {
 		return err
 	}
 
+	defer func() {
+		data.Free()
+		// only free payload if compression was made, and therefore it is a different set
+		// of buffers from data.
+		if pf.isCompressed() {
+			payload.Free()
+		}
+	}()
+
 	// TODO(dfawley): should we be checking len(data) instead?
-	if len(payld) > *as.callInfo.maxSendMessageSize {
-		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
+	if payload.Len() > *as.callInfo.maxSendMessageSize {
+		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payload.Len(), *as.callInfo.maxSendMessageSize)
 	}
 
-	if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
+	if err := as.transportStream.Write(hdr, payload, &transport.WriteOptions{Last: !as.desc.ClientStreams}); err != nil {
 		if !as.desc.ClientStreams {
 			// For non-client-streaming RPCs, we return nil instead of EOF on error
 			// because the generated code requires it.  finish is not called; RecvMsg()
@@ -1186,13 +1460,10 @@
 		return io.EOF
 	}
 
-	if channelz.IsOn() {
-		as.t.IncrMsgSent()
-	}
 	return nil
 }
 
-func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
+func (as *addrConnStream) RecvMsg(m any) (err error) {
 	defer func() {
 		if err != nil || !as.desc.ServerStreams {
 			// err != nil or non-server-streaming indicates end of stream.
@@ -1200,36 +1471,37 @@
 		}
 	}()
 
-	if !as.decompSet {
+	if !as.decompressorSet {
 		// Block until we receive headers containing received message encoding.
-		if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
-			if as.dc == nil || as.dc.Type() != ct {
+		if ct := as.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
+			if as.decompressorV0 == nil || as.decompressorV0.Type() != ct {
 				// No configured decompressor, or it does not match the incoming
 				// message encoding; attempt to find a registered compressor that does.
-				as.dc = nil
-				as.decomp = encoding.GetCompressor(ct)
+				as.decompressorV0 = nil
+				as.decompressorV1 = encoding.GetCompressor(ct)
 			}
 		} else {
 			// No compression is used; disable our decompressor.
-			as.dc = nil
+			as.decompressorV0 = nil
 		}
 		// Only initialize this state once per stream.
-		as.decompSet = true
+		as.decompressorSet = true
 	}
-	err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
-	if err != nil {
+	if err := recv(as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err != nil {
 		if err == io.EOF {
-			if statusErr := as.s.Status().Err(); statusErr != nil {
+			if statusErr := as.transportStream.Status().Err(); statusErr != nil {
 				return statusErr
 			}
+			// Received no msg and status OK for non-server streaming rpcs.
+			if !as.desc.ServerStreams && !as.receivedFirstMsg {
+				return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
+			}
 			return io.EOF // indicates successful end of stream.
 		}
 		return toRPCErr(err)
 	}
+	as.receivedFirstMsg = true
 
-	if channelz.IsOn() {
-		as.t.IncrMsgRecv()
-	}
 	if as.desc.ServerStreams {
 		// Subsequent messages should be received by subsequent RecvMsg calls.
 		return nil
@@ -1237,14 +1509,12 @@
 
 	// Special handling for non-server-stream rpcs.
 	// This recv expects EOF or errors, so we don't collect inPayload.
-	err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
-	if err == nil {
-		return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
+	if err := recv(as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err == io.EOF {
+		return as.transportStream.Status().Err() // non-server streaming Recv returns nil on success
+	} else if err != nil {
+		return toRPCErr(err)
 	}
-	if err == io.EOF {
-		return as.s.Status().Err() // non-server streaming Recv returns nil on success
-	}
-	return toRPCErr(err)
+	return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
 }
 
 func (as *addrConnStream) finish(err error) {
@@ -1258,8 +1528,8 @@
 		// Ending a stream with EOF indicates a success.
 		err = nil
 	}
-	if as.s != nil {
-		as.t.CloseStream(as.s, err)
+	if as.transportStream != nil {
+		as.transportStream.Close(err)
 	}
 
 	if err != nil {
@@ -1273,8 +1543,10 @@
 
 // ServerStream defines the server-side behavior of a streaming RPC.
 //
-// All errors returned from ServerStream methods are compatible with the
-// status package.
+// Errors returned from ServerStream methods are compatible with the status
+// package.  However, the status code will often not match the RPC status as
+// seen by the client application, and therefore, should not be relied upon for
+// this purpose.
 type ServerStream interface {
 	// SetHeader sets the header metadata. It may be called multiple times.
 	// When call multiple times, all the provided metadata will be merged.
@@ -1306,7 +1578,10 @@
 	// It is safe to have a goroutine calling SendMsg and another goroutine
 	// calling RecvMsg on the same stream at the same time, but it is not safe
 	// to call SendMsg on the same stream in different goroutines.
-	SendMsg(m interface{}) error
+	//
+	// It is not safe to modify the message after calling SendMsg. Tracing
+	// libraries and stats handlers may use the message lazily.
+	SendMsg(m any) error
 	// RecvMsg blocks until it receives a message into m or the stream is
 	// done. It returns io.EOF when the client has performed a CloseSend. On
 	// any non-EOF error, the stream is aborted and the error contains the
@@ -1315,29 +1590,33 @@
 	// It is safe to have a goroutine calling SendMsg and another goroutine
 	// calling RecvMsg on the same stream at the same time, but it is not
 	// safe to call RecvMsg on the same stream in different goroutines.
-	RecvMsg(m interface{}) error
+	RecvMsg(m any) error
 }
 
 // serverStream implements a server side Stream.
 type serverStream struct {
 	ctx   context.Context
-	t     transport.ServerTransport
-	s     *transport.Stream
+	s     *transport.ServerStream
 	p     *parser
 	codec baseCodec
+	desc  *StreamDesc
 
-	cp     Compressor
-	dc     Decompressor
-	comp   encoding.Compressor
-	decomp encoding.Compressor
+	compressorV0   Compressor
+	compressorV1   encoding.Compressor
+	decompressorV0 Decompressor
+	decompressorV1 encoding.Compressor
+
+	sendCompressorName string
+
+	recvFirstMsg bool // set after the first message is received
 
 	maxReceiveMessageSize int
 	maxSendMessageSize    int
 	trInfo                *traceInfo
 
-	statsHandler stats.Handler
+	statsHandler []stats.Handler
 
-	binlog *binarylog.MethodLogger
+	binlogs []binarylog.MethodLogger
 	// serverHeaderBinlogged indicates whether server header has been logged. It
 	// will happen when one of the following two happens: stream.SendHeader(),
 	// stream.Send().
@@ -1357,17 +1636,29 @@
 	if md.Len() == 0 {
 		return nil
 	}
+	err := imetadata.Validate(md)
+	if err != nil {
+		return status.Error(codes.Internal, err.Error())
+	}
 	return ss.s.SetHeader(md)
 }
 
 func (ss *serverStream) SendHeader(md metadata.MD) error {
-	err := ss.t.WriteHeader(ss.s, md)
-	if ss.binlog != nil && !ss.serverHeaderBinlogged {
+	err := imetadata.Validate(md)
+	if err != nil {
+		return status.Error(codes.Internal, err.Error())
+	}
+
+	err = ss.s.SendHeader(md)
+	if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
 		h, _ := ss.s.Header()
-		ss.binlog.Log(&binarylog.ServerHeader{
+		sh := &binarylog.ServerHeader{
 			Header: h,
-		})
+		}
 		ss.serverHeaderBinlogged = true
+		for _, binlog := range ss.binlogs {
+			binlog.Log(ss.ctx, sh)
+		}
 	}
 	return err
 }
@@ -1376,10 +1667,13 @@
 	if md.Len() == 0 {
 		return
 	}
+	if err := imetadata.Validate(md); err != nil {
+		logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
+	}
 	ss.s.SetTrailer(md)
 }
 
-func (ss *serverStream) SendMsg(m interface{}) (err error) {
+func (ss *serverStream) SendMsg(m any) (err error) {
 	defer func() {
 		if ss.trInfo != nil {
 			ss.mu.Lock()
@@ -1387,7 +1681,7 @@
 				if err == nil {
 					ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
 				} else {
-					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
 					ss.trInfo.tr.SetError()
 				}
 			}
@@ -1395,7 +1689,7 @@
 		}
 		if err != nil && err != io.EOF {
 			st, _ := status.FromError(toRPCErr(err))
-			ss.t.WriteStatus(ss.s, st)
+			ss.s.WriteStatus(st)
 			// Non-user specified status was sent out. This should be an error
 			// case (as a server side Cancel maybe).
 			//
@@ -1403,43 +1697,68 @@
 			// status from the service handler, we will log that error instead.
 			// This behavior is similar to an interceptor.
 		}
-		if channelz.IsOn() && err == nil {
-			ss.t.IncrMsgSent()
-		}
 	}()
 
+	// Server handler could have set new compressor by calling SetSendCompressor.
+	// In case it is set, we need to use it for compressing outbound message.
+	if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
+		ss.compressorV1 = encoding.GetCompressor(sendCompressorsName)
+		ss.sendCompressorName = sendCompressorsName
+	}
+
 	// load hdr, payload, data
-	hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
+	hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.compressorV0, ss.compressorV1, ss.p.bufferPool)
 	if err != nil {
 		return err
 	}
 
+	defer func() {
+		data.Free()
+		// only free payload if compression was made, and therefore it is a different set
+		// of buffers from data.
+		if pf.isCompressed() {
+			payload.Free()
+		}
+	}()
+
+	dataLen := data.Len()
+	payloadLen := payload.Len()
+
 	// TODO(dfawley): should we be checking len(data) instead?
-	if len(payload) > ss.maxSendMessageSize {
-		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
+	if payloadLen > ss.maxSendMessageSize {
+		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, ss.maxSendMessageSize)
 	}
-	if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
+	if err := ss.s.Write(hdr, payload, &transport.WriteOptions{Last: false}); err != nil {
 		return toRPCErr(err)
 	}
-	if ss.binlog != nil {
+
+	if len(ss.binlogs) != 0 {
 		if !ss.serverHeaderBinlogged {
 			h, _ := ss.s.Header()
-			ss.binlog.Log(&binarylog.ServerHeader{
+			sh := &binarylog.ServerHeader{
 				Header: h,
-			})
+			}
 			ss.serverHeaderBinlogged = true
+			for _, binlog := range ss.binlogs {
+				binlog.Log(ss.ctx, sh)
+			}
 		}
-		ss.binlog.Log(&binarylog.ServerMessage{
-			Message: data,
-		})
+		sm := &binarylog.ServerMessage{
+			Message: data.Materialize(),
+		}
+		for _, binlog := range ss.binlogs {
+			binlog.Log(ss.ctx, sm)
+		}
 	}
-	if ss.statsHandler != nil {
-		ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
+	if len(ss.statsHandler) != 0 {
+		for _, sh := range ss.statsHandler {
+			sh.HandleRPC(ss.s.Context(), outPayload(false, m, dataLen, payloadLen, time.Now()))
+		}
 	}
 	return nil
 }
 
-func (ss *serverStream) RecvMsg(m interface{}) (err error) {
+func (ss *serverStream) RecvMsg(m any) (err error) {
 	defer func() {
 		if ss.trInfo != nil {
 			ss.mu.Lock()
@@ -1447,7 +1766,7 @@
 				if err == nil {
 					ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
 				} else if err != io.EOF {
-					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
 					ss.trInfo.tr.SetError()
 				}
 			}
@@ -1455,7 +1774,7 @@
 		}
 		if err != nil && err != io.EOF {
 			st, _ := status.FromError(toRPCErr(err))
-			ss.t.WriteStatus(ss.s, st)
+			ss.s.WriteStatus(st)
 			// Non-user specified status was sent out. This should be an error
 			// case (as a server side Cancel maybe).
 			//
@@ -1463,42 +1782,64 @@
 			// status from the service handler, we will log that error instead.
 			// This behavior is similar to an interceptor.
 		}
-		if channelz.IsOn() && err == nil {
-			ss.t.IncrMsgRecv()
-		}
 	}()
 	var payInfo *payloadInfo
-	if ss.statsHandler != nil || ss.binlog != nil {
+	if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
 		payInfo = &payloadInfo{}
+		defer payInfo.free()
 	}
-	if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
+	if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != nil {
 		if err == io.EOF {
-			if ss.binlog != nil {
-				ss.binlog.Log(&binarylog.ClientHalfClose{})
+			if len(ss.binlogs) != 0 {
+				chc := &binarylog.ClientHalfClose{}
+				for _, binlog := range ss.binlogs {
+					binlog.Log(ss.ctx, chc)
+				}
+			}
+			// Received no request msg for non-client streaming rpcs.
+			if !ss.desc.ClientStreams && !ss.recvFirstMsg {
+				return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC")
 			}
 			return err
 		}
 		if err == io.ErrUnexpectedEOF {
-			err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
+			err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
 		}
 		return toRPCErr(err)
 	}
-	if ss.statsHandler != nil {
-		ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
-			RecvTime: time.Now(),
-			Payload:  m,
-			// TODO truncate large payload.
-			Data:       payInfo.uncompressedBytes,
-			WireLength: payInfo.wireLength,
-			Length:     len(payInfo.uncompressedBytes),
-		})
+	ss.recvFirstMsg = true
+	if len(ss.statsHandler) != 0 {
+		for _, sh := range ss.statsHandler {
+			sh.HandleRPC(ss.s.Context(), &stats.InPayload{
+				RecvTime:         time.Now(),
+				Payload:          m,
+				Length:           payInfo.uncompressedBytes.Len(),
+				WireLength:       payInfo.compressedLength + headerLen,
+				CompressedLength: payInfo.compressedLength,
+			})
+		}
 	}
-	if ss.binlog != nil {
-		ss.binlog.Log(&binarylog.ClientMessage{
-			Message: payInfo.uncompressedBytes,
-		})
+	if len(ss.binlogs) != 0 {
+		cm := &binarylog.ClientMessage{
+			Message: payInfo.uncompressedBytes.Materialize(),
+		}
+		for _, binlog := range ss.binlogs {
+			binlog.Log(ss.ctx, cm)
+		}
 	}
-	return nil
+
+	if ss.desc.ClientStreams {
+		// Subsequent messages should be received by subsequent RecvMsg calls.
+		return nil
+	}
+	// Special handling for non-client-stream rpcs.
+	// This recv expects EOF or errors, so we don't collect inPayload.
+	if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, nil, ss.decompressorV1, true); err == io.EOF {
+		return nil
+	} else if err != nil {
+		return err
+	}
+	return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-client-streaming RPC")
 }
 
 // MethodFromServerStream returns the method string for the input stream.
@@ -1507,23 +1848,26 @@
 	return Method(stream.Context())
 }
 
-// prepareMsg returns the hdr, payload and data
-// using the compressors passed or using the
-// passed preparedmsg
-func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
+// prepareMsg returns the hdr, payload and data using the compressors passed or
+// using the passed preparedmsg. The returned boolean indicates whether
+// compression was made and therefore whether the payload needs to be freed in
+// addition to the returned data. Freeing the payload if the returned boolean is
+// false can lead to undefined behavior.
+func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor, pool mem.BufferPool) (hdr []byte, data, payload mem.BufferSlice, pf payloadFormat, err error) {
 	if preparedMsg, ok := m.(*PreparedMsg); ok {
-		return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
+		return preparedMsg.hdr, preparedMsg.encodedData, preparedMsg.payload, preparedMsg.pf, nil
 	}
 	// The input interface is not a prepared msg.
 	// Marshal and Compress the data at this point
 	data, err = encode(codec, m)
 	if err != nil {
-		return nil, nil, nil, err
+		return nil, nil, nil, 0, err
 	}
-	compData, err := compress(data, cp, comp)
+	compData, pf, err := compress(data, cp, comp, pool)
 	if err != nil {
-		return nil, nil, nil, err
+		data.Free()
+		return nil, nil, nil, 0, err
 	}
-	hdr, payload = msgHeader(data, compData)
-	return hdr, payload, data, nil
+	hdr, payload = msgHeader(data, compData, pf)
+	return hdr, data, payload, pf, nil
 }