blob: ca87ff9776ef68314337327627ac13ee74567055 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "io"
25 "math"
Abhay Kumara61c5222025-11-10 07:32:50 +000026 rand "math/rand/v2"
William Kurkianea869482019-04-09 15:16:11 -040027 "strconv"
28 "sync"
29 "time"
30
William Kurkianea869482019-04-09 15:16:11 -040031 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
William Kurkianea869482019-04-09 15:16:11 -040033 "google.golang.org/grpc/encoding"
Abhay Kumara61c5222025-11-10 07:32:50 +000034 "google.golang.org/grpc/internal"
Abhilash S.L3b494632019-07-16 15:51:09 +053035 "google.golang.org/grpc/internal/balancerload"
William Kurkianea869482019-04-09 15:16:11 -040036 "google.golang.org/grpc/internal/binarylog"
37 "google.golang.org/grpc/internal/channelz"
Abhay Kumara61c5222025-11-10 07:32:50 +000038 "google.golang.org/grpc/internal/grpcutil"
39 imetadata "google.golang.org/grpc/internal/metadata"
40 iresolver "google.golang.org/grpc/internal/resolver"
41 "google.golang.org/grpc/internal/serviceconfig"
42 istatus "google.golang.org/grpc/internal/status"
William Kurkianea869482019-04-09 15:16:11 -040043 "google.golang.org/grpc/internal/transport"
Abhay Kumara61c5222025-11-10 07:32:50 +000044 "google.golang.org/grpc/mem"
William Kurkianea869482019-04-09 15:16:11 -040045 "google.golang.org/grpc/metadata"
46 "google.golang.org/grpc/peer"
47 "google.golang.org/grpc/stats"
48 "google.golang.org/grpc/status"
49)
50
Abhay Kumara61c5222025-11-10 07:32:50 +000051var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
52
William Kurkianea869482019-04-09 15:16:11 -040053// StreamHandler defines the handler called by gRPC server to complete the
Abhay Kumara61c5222025-11-10 07:32:50 +000054// execution of a streaming RPC.
55//
56// If a StreamHandler returns an error, it should either be produced by the
57// status package, or be one of the context errors. Otherwise, gRPC will use
58// codes.Unknown as the status code and err.Error() as the status message of the
59// RPC.
60type StreamHandler func(srv any, stream ServerStream) error
William Kurkianea869482019-04-09 15:16:11 -040061
Abhay Kumara61c5222025-11-10 07:32:50 +000062// StreamDesc represents a streaming RPC service's method specification. Used
63// on the server when registering services and on the client when initiating
64// new streams.
William Kurkianea869482019-04-09 15:16:11 -040065type StreamDesc struct {
Abhay Kumara61c5222025-11-10 07:32:50 +000066 // StreamName and Handler are only used when registering handlers on a
67 // server.
68 StreamName string // the name of the method excluding the service
69 Handler StreamHandler // the handler called for the method
William Kurkianea869482019-04-09 15:16:11 -040070
Abhay Kumara61c5222025-11-10 07:32:50 +000071 // ServerStreams and ClientStreams are used for registering handlers on a
72 // server as well as defining RPC behavior when passed to NewClientStream
73 // and ClientConn.NewStream. At least one must be true.
74 ServerStreams bool // indicates the server can perform streaming sends
75 ClientStreams bool // indicates the client can perform streaming sends
William Kurkianea869482019-04-09 15:16:11 -040076}
77
78// Stream defines the common interface a client or server stream has to satisfy.
79//
80// Deprecated: See ClientStream and ServerStream documentation instead.
81type Stream interface {
82 // Deprecated: See ClientStream and ServerStream documentation instead.
83 Context() context.Context
84 // Deprecated: See ClientStream and ServerStream documentation instead.
Abhay Kumara61c5222025-11-10 07:32:50 +000085 SendMsg(m any) error
William Kurkianea869482019-04-09 15:16:11 -040086 // Deprecated: See ClientStream and ServerStream documentation instead.
Abhay Kumara61c5222025-11-10 07:32:50 +000087 RecvMsg(m any) error
William Kurkianea869482019-04-09 15:16:11 -040088}
89
90// ClientStream defines the client-side behavior of a streaming RPC.
91//
92// All errors returned from ClientStream methods are compatible with the
93// status package.
94type ClientStream interface {
95 // Header returns the header metadata received from the server if there
Abhay Kumara61c5222025-11-10 07:32:50 +000096 // is any. It blocks if the metadata is not ready to read. If the metadata
97 // is nil and the error is also nil, then the stream was terminated without
98 // headers, and the status can be discovered by calling RecvMsg.
William Kurkianea869482019-04-09 15:16:11 -040099 Header() (metadata.MD, error)
100 // Trailer returns the trailer metadata from the server, if there is any.
101 // It must only be called after stream.CloseAndRecv has returned, or
102 // stream.Recv has returned a non-nil error (including io.EOF).
103 Trailer() metadata.MD
Abhay Kumara61c5222025-11-10 07:32:50 +0000104 // CloseSend closes the send direction of the stream. This method always
105 // returns a nil error. The status of the stream may be discovered using
106 // RecvMsg. It is also not safe to call CloseSend concurrently with SendMsg.
William Kurkianea869482019-04-09 15:16:11 -0400107 CloseSend() error
108 // Context returns the context for this stream.
109 //
110 // It should not be called until after Header or RecvMsg has returned. Once
111 // called, subsequent client-side retries are disabled.
112 Context() context.Context
113 // SendMsg is generally called by generated code. On error, SendMsg aborts
114 // the stream. If the error was generated by the client, the status is
115 // returned directly; otherwise, io.EOF is returned and the status of
Abhay Kumara61c5222025-11-10 07:32:50 +0000116 // the stream may be discovered using RecvMsg. For unary or server-streaming
117 // RPCs (StreamDesc.ClientStreams is false), a nil error is returned
118 // unconditionally.
William Kurkianea869482019-04-09 15:16:11 -0400119 //
120 // SendMsg blocks until:
121 // - There is sufficient flow control to schedule m with the transport, or
122 // - The stream is done, or
123 // - The stream breaks.
124 //
125 // SendMsg does not wait until the message is received by the server. An
126 // untimely stream closure may result in lost messages. To ensure delivery,
127 // users should ensure the RPC completed successfully using RecvMsg.
128 //
129 // It is safe to have a goroutine calling SendMsg and another goroutine
130 // calling RecvMsg on the same stream at the same time, but it is not safe
131 // to call SendMsg on the same stream in different goroutines. It is also
132 // not safe to call CloseSend concurrently with SendMsg.
Abhay Kumara61c5222025-11-10 07:32:50 +0000133 //
134 // It is not safe to modify the message after calling SendMsg. Tracing
135 // libraries and stats handlers may use the message lazily.
136 SendMsg(m any) error
William Kurkianea869482019-04-09 15:16:11 -0400137 // RecvMsg blocks until it receives a message into m or the stream is
138 // done. It returns io.EOF when the stream completes successfully. On
139 // any other error, the stream is aborted and the error contains the RPC
140 // status.
141 //
142 // It is safe to have a goroutine calling SendMsg and another goroutine
143 // calling RecvMsg on the same stream at the same time, but it is not
144 // safe to call RecvMsg on the same stream in different goroutines.
Abhay Kumara61c5222025-11-10 07:32:50 +0000145 RecvMsg(m any) error
William Kurkianea869482019-04-09 15:16:11 -0400146}
147
148// NewStream creates a new Stream for the client side. This is typically
149// called by generated code. ctx is used for the lifetime of the stream.
150//
151// To ensure resources are not leaked due to the stream returned, one of the following
152// actions must be performed:
153//
Abhay Kumara61c5222025-11-10 07:32:50 +0000154// 1. Call Close on the ClientConn.
155// 2. Cancel the context provided.
156// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
157// client-streaming RPC, for instance, might use the helper function
158// CloseAndRecv (note that CloseSend does not Recv, therefore is not
159// guaranteed to release all resources).
160// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
William Kurkianea869482019-04-09 15:16:11 -0400161//
162// If none of the above happen, a goroutine and a context will be leaked, and grpc
163// will not call the optionally-configured stats handler with a stats.End message.
164func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
165 // allow interceptor to see all applicable call options, which means those
166 // configured as defaults from dial option as well as per-call options
167 opts = combine(cc.dopts.callOptions, opts)
168
169 if cc.dopts.streamInt != nil {
170 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
171 }
172 return newClientStream(ctx, desc, cc, method, opts...)
173}
174
175// NewClientStream is a wrapper for ClientConn.NewStream.
176func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
177 return cc.NewStream(ctx, desc, method, opts...)
178}
179
bseeniva0b9cbcb2026-02-12 19:11:11 +0530180var emptyMethodConfig = serviceconfig.MethodConfig{}
181
William Kurkianea869482019-04-09 15:16:11 -0400182func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
Abhay Kumara61c5222025-11-10 07:32:50 +0000183 // Start tracking the RPC for idleness purposes. This is where a stream is
184 // created for both streaming and unary RPCs, and hence is a good place to
185 // track active RPC count.
186 if err := cc.idlenessMgr.OnCallBegin(); err != nil {
187 return nil, err
188 }
189 // Add a calloption, to decrement the active call count, that gets executed
190 // when the RPC completes.
191 opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
192
193 if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
194 // validate md
195 if err := imetadata.Validate(md); err != nil {
196 return nil, status.Error(codes.Internal, err.Error())
197 }
198 // validate added
199 for _, kvs := range added {
200 for i := 0; i < len(kvs); i += 2 {
201 if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
202 return nil, status.Error(codes.Internal, err.Error())
203 }
204 }
205 }
206 }
William Kurkianea869482019-04-09 15:16:11 -0400207 if channelz.IsOn() {
208 cc.incrCallsStarted()
209 defer func() {
210 if err != nil {
211 cc.incrCallsFailed()
212 }
213 }()
214 }
William Kurkianea869482019-04-09 15:16:11 -0400215 // Provide an opportunity for the first RPC to see the first service config
216 // provided by the resolver.
Abhay Kumara61c5222025-11-10 07:32:50 +0000217 nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx)
218 if err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400219 return nil, err
220 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000221
bseeniva0b9cbcb2026-02-12 19:11:11 +0530222 mc := &emptyMethodConfig
Abhay Kumara61c5222025-11-10 07:32:50 +0000223 var onCommit func()
224 newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
225 return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, nameResolutionDelayed, opts...)
226 }
227
228 rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
229 rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
230 if err != nil {
231 if st, ok := status.FromError(err); ok {
232 // Restrict the code to the list allowed by gRFC A54.
233 if istatus.IsRestrictedControlPlaneCode(st) {
234 err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
235 }
236 return nil, err
237 }
238 return nil, toRPCErr(err)
239 }
240
241 if rpcConfig != nil {
242 if rpcConfig.Context != nil {
243 ctx = rpcConfig.Context
244 }
bseeniva0b9cbcb2026-02-12 19:11:11 +0530245 mc = &rpcConfig.MethodConfig
Abhay Kumara61c5222025-11-10 07:32:50 +0000246 onCommit = rpcConfig.OnCommitted
247 if rpcConfig.Interceptor != nil {
248 rpcInfo.Context = nil
249 ns := newStream
250 newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
251 cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
252 if err != nil {
253 return nil, toRPCErr(err)
254 }
255 return cs, nil
256 }
257 }
258 }
259
260 return newStream(ctx, func() {})
261}
262
bseeniva0b9cbcb2026-02-12 19:11:11 +0530263func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc *serviceconfig.MethodConfig, onCommit, doneFunc func(), nameResolutionDelayed bool, opts ...CallOption) (_ iresolver.ClientStream, err error) {
Abhay Kumara61c5222025-11-10 07:32:50 +0000264 callInfo := defaultCallInfo()
William Kurkianea869482019-04-09 15:16:11 -0400265 if mc.WaitForReady != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +0000266 callInfo.failFast = !*mc.WaitForReady
William Kurkianea869482019-04-09 15:16:11 -0400267 }
268
269 // Possible context leak:
270 // The cancel function for the child context we create will only be called
271 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
272 // an error is generated by SendMsg.
273 // https://github.com/grpc/grpc-go/issues/1818.
274 var cancel context.CancelFunc
275 if mc.Timeout != nil && *mc.Timeout >= 0 {
276 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
277 } else {
278 ctx, cancel = context.WithCancel(ctx)
279 }
280 defer func() {
281 if err != nil {
282 cancel()
283 }
284 }()
285
286 for _, o := range opts {
Abhay Kumara61c5222025-11-10 07:32:50 +0000287 if err := o.before(callInfo); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400288 return nil, toRPCErr(err)
289 }
290 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000291 callInfo.maxSendMessageSize = getMaxSize(mc.MaxReqSize, callInfo.maxSendMessageSize, defaultClientMaxSendMessageSize)
292 callInfo.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, callInfo.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
293 if err := setCallInfoCodec(callInfo); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400294 return nil, err
295 }
296
297 callHdr := &transport.CallHdr{
298 Host: cc.authority,
299 Method: method,
Abhay Kumara61c5222025-11-10 07:32:50 +0000300 ContentSubtype: callInfo.contentSubtype,
301 DoneFunc: doneFunc,
302 Authority: callInfo.authority,
William Kurkianea869482019-04-09 15:16:11 -0400303 }
304
305 // Set our outgoing compression according to the UseCompressor CallOption, if
306 // set. In that case, also find the compressor from the encoding package.
307 // Otherwise, use the compressor configured by the WithCompressor DialOption,
308 // if set.
Abhay Kumara61c5222025-11-10 07:32:50 +0000309 var compressorV0 Compressor
310 var compressorV1 encoding.Compressor
311 if ct := callInfo.compressorName; ct != "" {
William Kurkianea869482019-04-09 15:16:11 -0400312 callHdr.SendCompress = ct
313 if ct != encoding.Identity {
Abhay Kumara61c5222025-11-10 07:32:50 +0000314 compressorV1 = encoding.GetCompressor(ct)
315 if compressorV1 == nil {
William Kurkianea869482019-04-09 15:16:11 -0400316 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
317 }
318 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000319 } else if cc.dopts.compressorV0 != nil {
320 callHdr.SendCompress = cc.dopts.compressorV0.Type()
321 compressorV0 = cc.dopts.compressorV0
William Kurkianea869482019-04-09 15:16:11 -0400322 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000323 if callInfo.creds != nil {
324 callHdr.Creds = callInfo.creds
William Kurkianea869482019-04-09 15:16:11 -0400325 }
326
327 cs := &clientStream{
Abhay Kumara61c5222025-11-10 07:32:50 +0000328 callHdr: callHdr,
329 ctx: ctx,
bseeniva0b9cbcb2026-02-12 19:11:11 +0530330 methodConfig: mc,
Abhay Kumara61c5222025-11-10 07:32:50 +0000331 opts: opts,
332 callInfo: callInfo,
333 cc: cc,
334 desc: desc,
335 codec: callInfo.codec,
336 compressorV0: compressorV0,
337 compressorV1: compressorV1,
338 cancel: cancel,
339 firstAttempt: true,
340 onCommit: onCommit,
341 nameResolutionDelay: nameResolutionDelayed,
William Kurkianea869482019-04-09 15:16:11 -0400342 }
343 if !cc.dopts.disableRetry {
344 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
345 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000346 if ml := binarylog.GetMethodLogger(method); ml != nil {
347 cs.binlogs = append(cs.binlogs, ml)
348 }
349 if cc.dopts.binaryLogger != nil {
350 if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
351 cs.binlogs = append(cs.binlogs, ml)
352 }
353 }
William Kurkianea869482019-04-09 15:16:11 -0400354
Abhay Kumara61c5222025-11-10 07:32:50 +0000355 // Pick the transport to use and create a new stream on the transport.
356 // Assign cs.attempt upon success.
357 op := func(a *csAttempt) error {
358 if err := a.getTransport(); err != nil {
359 return err
360 }
361 if err := a.newStream(); err != nil {
362 return err
363 }
364 // Because this operation is always called either here (while creating
365 // the clientStream) or by the retry code while locked when replaying
366 // the operation, it is safe to access cs.attempt directly.
367 cs.attempt = a
368 return nil
369 }
370 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400371 return nil, err
372 }
373
Abhay Kumara61c5222025-11-10 07:32:50 +0000374 if len(cs.binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -0400375 md, _ := metadata.FromOutgoingContext(ctx)
376 logEntry := &binarylog.ClientHeader{
377 OnClientSide: true,
378 Header: md,
379 MethodName: method,
380 Authority: cs.cc.authority,
381 }
382 if deadline, ok := ctx.Deadline(); ok {
383 logEntry.Timeout = time.Until(deadline)
384 if logEntry.Timeout < 0 {
385 logEntry.Timeout = 0
386 }
387 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000388 for _, binlog := range cs.binlogs {
389 binlog.Log(cs.ctx, logEntry)
390 }
William Kurkianea869482019-04-09 15:16:11 -0400391 }
392
393 if desc != unaryStreamDesc {
394 // Listen on cc and stream contexts to cleanup when the user closes the
395 // ClientConn or cancels the stream context. In all other cases, an error
396 // should already be injected into the recv buffer by the transport, which
397 // the client will eventually receive, and then we will cancel the stream's
398 // context in clientStream.finish.
399 go func() {
400 select {
401 case <-cc.ctx.Done():
402 cs.finish(ErrClientConnClosing)
403 case <-ctx.Done():
404 cs.finish(toRPCErr(ctx.Err()))
405 }
406 }()
407 }
408 return cs, nil
409}
410
Abhay Kumara61c5222025-11-10 07:32:50 +0000411// newAttemptLocked creates a new csAttempt without a transport or stream.
412func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
William Kurkianea869482019-04-09 15:16:11 -0400413 if err := cs.ctx.Err(); err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +0000414 return nil, toRPCErr(err)
William Kurkianea869482019-04-09 15:16:11 -0400415 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000416 if err := cs.cc.ctx.Err(); err != nil {
417 return nil, ErrClientConnClosing
418 }
419
420 ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.compressorV0, cs.compressorV1)
421 method := cs.callHdr.Method
422 var beginTime time.Time
bseeniva0b9cbcb2026-02-12 19:11:11 +0530423 sh := cs.cc.statsHandler
424 if sh != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +0000425 beginTime = time.Now()
bseeniva0b9cbcb2026-02-12 19:11:11 +0530426 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{
427 FullMethodName: method, FailFast: cs.callInfo.failFast,
428 NameResolutionDelay: cs.nameResolutionDelay,
429 })
430 sh.HandleRPC(ctx, &stats.Begin{
Abhay Kumara61c5222025-11-10 07:32:50 +0000431 Client: true,
432 BeginTime: beginTime,
433 FailFast: cs.callInfo.failFast,
434 IsClientStream: cs.desc.ClientStreams,
435 IsServerStream: cs.desc.ServerStreams,
436 IsTransparentRetryAttempt: isTransparent,
bseeniva0b9cbcb2026-02-12 19:11:11 +0530437 })
Abhay Kumara61c5222025-11-10 07:32:50 +0000438 }
439
440 var trInfo *traceInfo
441 if EnableTracing {
442 trInfo = &traceInfo{
443 tr: newTrace("grpc.Sent."+methodFamily(method), method),
444 firstLine: firstLine{
445 client: true,
446 },
447 }
448 if deadline, ok := ctx.Deadline(); ok {
449 trInfo.firstLine.deadline = time.Until(deadline)
450 }
451 trInfo.tr.LazyLog(&trInfo.firstLine, false)
452 ctx = newTraceContext(ctx, trInfo.tr)
453 }
454
455 if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {
456 // Add extra metadata (metadata that will be added by transport) to context
457 // so the balancer can see them.
458 ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
459 "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
460 ))
461 }
462
463 return &csAttempt{
464 ctx: ctx,
465 beginTime: beginTime,
466 cs: cs,
467 decompressorV0: cs.cc.dopts.dc,
bseeniva0b9cbcb2026-02-12 19:11:11 +0530468 statsHandler: sh,
Abhay Kumara61c5222025-11-10 07:32:50 +0000469 trInfo: trInfo,
470 }, nil
471}
472
473func (a *csAttempt) getTransport() error {
474 cs := a.cs
475
476 pickInfo := balancer.PickInfo{Ctx: a.ctx, FullMethodName: cs.callHdr.Method}
477 pick, err := cs.cc.pickerWrapper.pick(a.ctx, cs.callInfo.failFast, pickInfo)
478 a.transport, a.pickResult = pick.transport, pick.result
William Kurkianea869482019-04-09 15:16:11 -0400479 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +0000480 if de, ok := err.(dropError); ok {
481 err = de.error
482 a.drop = true
483 }
William Kurkianea869482019-04-09 15:16:11 -0400484 return err
485 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000486 if a.trInfo != nil {
487 a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
Abhilash S.L3b494632019-07-16 15:51:09 +0530488 }
bseeniva0b9cbcb2026-02-12 19:11:11 +0530489 if pick.blocked && a.statsHandler != nil {
490 a.statsHandler.HandleRPC(a.ctx, &stats.DelayedPickComplete{})
Abhay Kumara61c5222025-11-10 07:32:50 +0000491 }
William Kurkianea869482019-04-09 15:16:11 -0400492 return nil
493}
494
495func (a *csAttempt) newStream() error {
496 cs := a.cs
497 cs.callHdr.PreviousAttempts = cs.numRetries
Abhay Kumara61c5222025-11-10 07:32:50 +0000498
499 // Merge metadata stored in PickResult, if any, with existing call metadata.
500 // It is safe to overwrite the csAttempt's context here, since all state
501 // maintained in it are local to the attempt. When the attempt has to be
502 // retried, a new instance of csAttempt will be created.
503 if a.pickResult.Metadata != nil {
504 // We currently do not have a function it the metadata package which
505 // merges given metadata with existing metadata in a context. Existing
506 // function `AppendToOutgoingContext()` takes a variadic argument of key
507 // value pairs.
508 //
509 // TODO: Make it possible to retrieve key value pairs from metadata.MD
510 // in a form passable to AppendToOutgoingContext(), or create a version
511 // of AppendToOutgoingContext() that accepts a metadata.MD.
512 md, _ := metadata.FromOutgoingContext(a.ctx)
513 md = metadata.Join(md, a.pickResult.Metadata)
514 a.ctx = metadata.NewOutgoingContext(a.ctx, md)
William Kurkianea869482019-04-09 15:16:11 -0400515 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000516
517 s, err := a.transport.NewStream(a.ctx, cs.callHdr)
518 if err != nil {
519 nse, ok := err.(*transport.NewStreamError)
520 if !ok {
521 // Unexpected.
522 return err
523 }
524
525 if nse.AllowTransparentRetry {
526 a.allowTransparentRetry = true
527 }
528
529 // Unwrap and convert error.
530 return toRPCErr(nse.Err)
531 }
532 a.transportStream = s
533 a.ctx = s.Context()
bseeniva0b9cbcb2026-02-12 19:11:11 +0530534 a.parser = parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
William Kurkianea869482019-04-09 15:16:11 -0400535 return nil
536}
537
538// clientStream implements a client side Stream.
539type clientStream struct {
540 callHdr *transport.CallHdr
541 opts []CallOption
542 callInfo *callInfo
543 cc *ClientConn
544 desc *StreamDesc
545
Abhay Kumara61c5222025-11-10 07:32:50 +0000546 codec baseCodec
547 compressorV0 Compressor
548 compressorV1 encoding.Compressor
William Kurkianea869482019-04-09 15:16:11 -0400549
550 cancel context.CancelFunc // cancels all attempts
551
Abhay Kumara61c5222025-11-10 07:32:50 +0000552 sentLast bool // sent an end stream
553
554 receivedFirstMsg bool // set after the first message is received
William Kurkianea869482019-04-09 15:16:11 -0400555
556 methodConfig *MethodConfig
557
558 ctx context.Context // the application's context, wrapped by stats/tracing
559
560 retryThrottler *retryThrottler // The throttler active when the RPC began.
561
Abhay Kumara61c5222025-11-10 07:32:50 +0000562 binlogs []binarylog.MethodLogger
William Kurkianea869482019-04-09 15:16:11 -0400563 // serverHeaderBinlogged is a boolean for whether server header has been
564 // logged. Server header will be logged when the first time one of those
565 // happens: stream.Header(), stream.Recv().
566 //
567 // It's only read and used by Recv() and Header(), so it doesn't need to be
568 // synchronized.
569 serverHeaderBinlogged bool
570
571 mu sync.Mutex
David Bainbridge788e5202019-10-21 18:49:40 +0000572 firstAttempt bool // if true, transparent retry is valid
573 numRetries int // exclusive of transparent retry attempt(s)
574 numRetriesSincePushback int // retries since pushback; to reset backoff
575 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
576 // attempt is the active client stream attempt.
577 // The only place where it is written is the newAttemptLocked method and this method never writes nil.
578 // So, attempt can be nil only inside newClientStream function when clientStream is first created.
579 // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
580 // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
581 // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
582 // place where we need to check if the attempt is nil.
583 attempt *csAttempt
William Kurkianea869482019-04-09 15:16:11 -0400584 // TODO(hedging): hedging will have multiple attempts simultaneously.
Abhay Kumara61c5222025-11-10 07:32:50 +0000585 committed bool // active attempt committed for retry?
586 onCommit func()
587 replayBuffer []replayOp // operations to replay on retry
588 replayBufferSize int // current size of replayBuffer
589 // nameResolutionDelay indicates if there was a delay in the name resolution.
590 // This field is only valid on client side, it's always false on server side.
591 nameResolutionDelay bool
592}
593
594type replayOp struct {
595 op func(a *csAttempt) error
596 cleanup func()
William Kurkianea869482019-04-09 15:16:11 -0400597}
598
599// csAttempt implements a single transport stream attempt within a
600// clientStream.
601type csAttempt struct {
Abhay Kumara61c5222025-11-10 07:32:50 +0000602 ctx context.Context
603 cs *clientStream
604 transport transport.ClientTransport
605 transportStream *transport.ClientStream
bseeniva0b9cbcb2026-02-12 19:11:11 +0530606 parser parser
Abhay Kumara61c5222025-11-10 07:32:50 +0000607 pickResult balancer.PickResult
William Kurkianea869482019-04-09 15:16:11 -0400608
Abhay Kumara61c5222025-11-10 07:32:50 +0000609 finished bool
610 decompressorV0 Decompressor
611 decompressorV1 encoding.Compressor
612 decompressorSet bool
William Kurkianea869482019-04-09 15:16:11 -0400613
614 mu sync.Mutex // guards trInfo.tr
Abhilash S.L3b494632019-07-16 15:51:09 +0530615 // trInfo may be nil (if EnableTracing is false).
William Kurkianea869482019-04-09 15:16:11 -0400616 // trInfo.tr is set when created (if EnableTracing is true),
617 // and cleared when the finish method is called.
Abhilash S.L3b494632019-07-16 15:51:09 +0530618 trInfo *traceInfo
William Kurkianea869482019-04-09 15:16:11 -0400619
bseeniva0b9cbcb2026-02-12 19:11:11 +0530620 statsHandler stats.Handler
621 beginTime time.Time
Abhay Kumara61c5222025-11-10 07:32:50 +0000622
623 // set for newStream errors that may be transparently retried
624 allowTransparentRetry bool
625 // set for pick errors that are returned as a status
626 drop bool
William Kurkianea869482019-04-09 15:16:11 -0400627}
628
629func (cs *clientStream) commitAttemptLocked() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000630 if !cs.committed && cs.onCommit != nil {
631 cs.onCommit()
632 }
William Kurkianea869482019-04-09 15:16:11 -0400633 cs.committed = true
Abhay Kumara61c5222025-11-10 07:32:50 +0000634 for _, op := range cs.replayBuffer {
635 if op.cleanup != nil {
636 op.cleanup()
637 }
638 }
639 cs.replayBuffer = nil
William Kurkianea869482019-04-09 15:16:11 -0400640}
641
642func (cs *clientStream) commitAttempt() {
643 cs.mu.Lock()
644 cs.commitAttemptLocked()
645 cs.mu.Unlock()
646}
647
648// shouldRetry returns nil if the RPC should be retried; otherwise it returns
Abhay Kumara61c5222025-11-10 07:32:50 +0000649// the error that should be returned by the operation. If the RPC should be
650// retried, the bool indicates whether it is being retried transparently.
651func (a *csAttempt) shouldRetry(err error) (bool, error) {
652 cs := a.cs
653
654 if cs.finished || cs.committed || a.drop {
655 // RPC is finished or committed or was dropped by the picker; cannot retry.
656 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400657 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000658 if a.transportStream == nil && a.allowTransparentRetry {
659 return true, nil
William Kurkianea869482019-04-09 15:16:11 -0400660 }
661 // Wait for the trailers.
Abhay Kumara61c5222025-11-10 07:32:50 +0000662 unprocessed := false
663 if a.transportStream != nil {
664 <-a.transportStream.Done()
665 unprocessed = a.transportStream.Unprocessed()
William Kurkianea869482019-04-09 15:16:11 -0400666 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000667 if cs.firstAttempt && unprocessed {
David Bainbridge788e5202019-10-21 18:49:40 +0000668 // First attempt, stream unprocessed: transparently retry.
Abhay Kumara61c5222025-11-10 07:32:50 +0000669 return true, nil
William Kurkianea869482019-04-09 15:16:11 -0400670 }
William Kurkianea869482019-04-09 15:16:11 -0400671 if cs.cc.dopts.disableRetry {
Abhay Kumara61c5222025-11-10 07:32:50 +0000672 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400673 }
674
675 pushback := 0
676 hasPushback := false
Abhay Kumara61c5222025-11-10 07:32:50 +0000677 if a.transportStream != nil {
678 if !a.transportStream.TrailersOnly() {
679 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400680 }
681
682 // TODO(retry): Move down if the spec changes to not check server pushback
683 // before considering this a failure for throttling.
Abhay Kumara61c5222025-11-10 07:32:50 +0000684 sps := a.transportStream.Trailer()["grpc-retry-pushback-ms"]
William Kurkianea869482019-04-09 15:16:11 -0400685 if len(sps) == 1 {
686 var e error
687 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
Abhay Kumara61c5222025-11-10 07:32:50 +0000688 channelz.Infof(logger, cs.cc.channelz, "Server retry pushback specified to abort (%q).", sps[0])
William Kurkianea869482019-04-09 15:16:11 -0400689 cs.retryThrottler.throttle() // This counts as a failure for throttling.
Abhay Kumara61c5222025-11-10 07:32:50 +0000690 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400691 }
692 hasPushback = true
693 } else if len(sps) > 1 {
Abhay Kumara61c5222025-11-10 07:32:50 +0000694 channelz.Warningf(logger, cs.cc.channelz, "Server retry pushback specified multiple values (%q); not retrying.", sps)
William Kurkianea869482019-04-09 15:16:11 -0400695 cs.retryThrottler.throttle() // This counts as a failure for throttling.
Abhay Kumara61c5222025-11-10 07:32:50 +0000696 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400697 }
698 }
699
700 var code codes.Code
Abhay Kumara61c5222025-11-10 07:32:50 +0000701 if a.transportStream != nil {
702 code = a.transportStream.Status().Code()
William Kurkianea869482019-04-09 15:16:11 -0400703 } else {
Abhay Kumara61c5222025-11-10 07:32:50 +0000704 code = status.Code(err)
William Kurkianea869482019-04-09 15:16:11 -0400705 }
706
Abhay Kumara61c5222025-11-10 07:32:50 +0000707 rp := cs.methodConfig.RetryPolicy
708 if rp == nil || !rp.RetryableStatusCodes[code] {
709 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400710 }
711
712 // Note: the ordering here is important; we count this as a failure
713 // only if the code matched a retryable code.
714 if cs.retryThrottler.throttle() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000715 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400716 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000717 if cs.numRetries+1 >= rp.MaxAttempts {
718 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400719 }
720
721 var dur time.Duration
722 if hasPushback {
723 dur = time.Millisecond * time.Duration(pushback)
724 cs.numRetriesSincePushback = 0
725 } else {
Abhay Kumara61c5222025-11-10 07:32:50 +0000726 fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
727 cur := min(float64(rp.InitialBackoff)*fact, float64(rp.MaxBackoff))
728 // Apply jitter by multiplying with a random factor between 0.8 and 1.2
729 cur *= 0.8 + 0.4*rand.Float64()
730 dur = time.Duration(int64(cur))
William Kurkianea869482019-04-09 15:16:11 -0400731 cs.numRetriesSincePushback++
732 }
733
734 // TODO(dfawley): we could eagerly fail here if dur puts us past the
735 // deadline, but unsure if it is worth doing.
736 t := time.NewTimer(dur)
737 select {
738 case <-t.C:
739 cs.numRetries++
Abhay Kumara61c5222025-11-10 07:32:50 +0000740 return false, nil
William Kurkianea869482019-04-09 15:16:11 -0400741 case <-cs.ctx.Done():
742 t.Stop()
Abhay Kumara61c5222025-11-10 07:32:50 +0000743 return false, status.FromContextError(cs.ctx.Err()).Err()
William Kurkianea869482019-04-09 15:16:11 -0400744 }
745}
746
747// Returns nil if a retry was performed and succeeded; error otherwise.
Abhay Kumara61c5222025-11-10 07:32:50 +0000748func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
William Kurkianea869482019-04-09 15:16:11 -0400749 for {
Abhay Kumara61c5222025-11-10 07:32:50 +0000750 attempt.finish(toRPCErr(lastErr))
751 isTransparent, err := attempt.shouldRetry(lastErr)
752 if err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400753 cs.commitAttemptLocked()
754 return err
755 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000756 cs.firstAttempt = false
757 attempt, err = cs.newAttemptLocked(isTransparent)
758 if err != nil {
759 // Only returns error if the clientconn is closed or the context of
760 // the stream is canceled.
William Kurkianea869482019-04-09 15:16:11 -0400761 return err
762 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000763 // Note that the first op in replayBuffer always sets cs.attempt
764 // if it is able to pick a transport and create a stream.
765 if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
William Kurkianea869482019-04-09 15:16:11 -0400766 return nil
767 }
768 }
769}
770
771func (cs *clientStream) Context() context.Context {
772 cs.commitAttempt()
773 // No need to lock before using attempt, since we know it is committed and
774 // cannot change.
Abhay Kumara61c5222025-11-10 07:32:50 +0000775 if cs.attempt.transportStream != nil {
776 return cs.attempt.transportStream.Context()
777 }
778 return cs.ctx
William Kurkianea869482019-04-09 15:16:11 -0400779}
780
781func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
782 cs.mu.Lock()
783 for {
784 if cs.committed {
785 cs.mu.Unlock()
Abhay Kumara61c5222025-11-10 07:32:50 +0000786 // toRPCErr is used in case the error from the attempt comes from
787 // NewClientStream, which intentionally doesn't return a status
788 // error to allow for further inspection; all other errors should
789 // already be status errors.
790 return toRPCErr(op(cs.attempt))
791 }
792 if len(cs.replayBuffer) == 0 {
793 // For the first op, which controls creation of the stream and
794 // assigns cs.attempt, we need to create a new attempt inline
795 // before executing the first op. On subsequent ops, the attempt
796 // is created immediately before replaying the ops.
797 var err error
798 if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
799 cs.mu.Unlock()
800 cs.finish(err)
801 return err
802 }
William Kurkianea869482019-04-09 15:16:11 -0400803 }
804 a := cs.attempt
805 cs.mu.Unlock()
806 err := op(a)
807 cs.mu.Lock()
808 if a != cs.attempt {
809 // We started another attempt already.
810 continue
811 }
812 if err == io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +0000813 <-a.transportStream.Done()
William Kurkianea869482019-04-09 15:16:11 -0400814 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000815 if err == nil || (err == io.EOF && a.transportStream.Status().Code() == codes.OK) {
William Kurkianea869482019-04-09 15:16:11 -0400816 onSuccess()
817 cs.mu.Unlock()
818 return err
819 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000820 if err := cs.retryLocked(a, err); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400821 cs.mu.Unlock()
822 return err
823 }
824 }
825}
826
827func (cs *clientStream) Header() (metadata.MD, error) {
828 var m metadata.MD
829 err := cs.withRetry(func(a *csAttempt) error {
830 var err error
Abhay Kumara61c5222025-11-10 07:32:50 +0000831 m, err = a.transportStream.Header()
William Kurkianea869482019-04-09 15:16:11 -0400832 return toRPCErr(err)
833 }, cs.commitAttemptLocked)
Abhay Kumara61c5222025-11-10 07:32:50 +0000834
835 if m == nil && err == nil {
836 // The stream ended with success. Finish the clientStream.
837 err = io.EOF
838 }
839
William Kurkianea869482019-04-09 15:16:11 -0400840 if err != nil {
841 cs.finish(err)
Abhay Kumara61c5222025-11-10 07:32:50 +0000842 // Do not return the error. The user should get it by calling Recv().
843 return nil, nil
William Kurkianea869482019-04-09 15:16:11 -0400844 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000845
846 if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && m != nil {
847 // Only log if binary log is on and header has not been logged, and
848 // there is actually headers to log.
William Kurkianea869482019-04-09 15:16:11 -0400849 logEntry := &binarylog.ServerHeader{
850 OnClientSide: true,
851 Header: m,
852 PeerAddr: nil,
853 }
854 if peer, ok := peer.FromContext(cs.Context()); ok {
855 logEntry.PeerAddr = peer.Addr
856 }
William Kurkianea869482019-04-09 15:16:11 -0400857 cs.serverHeaderBinlogged = true
Abhay Kumara61c5222025-11-10 07:32:50 +0000858 for _, binlog := range cs.binlogs {
859 binlog.Log(cs.ctx, logEntry)
860 }
William Kurkianea869482019-04-09 15:16:11 -0400861 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000862
863 return m, nil
William Kurkianea869482019-04-09 15:16:11 -0400864}
865
866func (cs *clientStream) Trailer() metadata.MD {
867 // On RPC failure, we never need to retry, because usage requires that
868 // RecvMsg() returned a non-nil error before calling this function is valid.
869 // We would have retried earlier if necessary.
870 //
871 // Commit the attempt anyway, just in case users are not following those
872 // directions -- it will prevent races and should not meaningfully impact
873 // performance.
874 cs.commitAttempt()
Abhay Kumara61c5222025-11-10 07:32:50 +0000875 if cs.attempt.transportStream == nil {
William Kurkianea869482019-04-09 15:16:11 -0400876 return nil
877 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000878 return cs.attempt.transportStream.Trailer()
William Kurkianea869482019-04-09 15:16:11 -0400879}
880
Abhay Kumara61c5222025-11-10 07:32:50 +0000881func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
882 for _, f := range cs.replayBuffer {
883 if err := f.op(attempt); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400884 return err
885 }
886 }
887 return nil
888}
889
Abhay Kumara61c5222025-11-10 07:32:50 +0000890func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error, cleanup func()) {
William Kurkianea869482019-04-09 15:16:11 -0400891 // Note: we still will buffer if retry is disabled (for transparent retries).
892 if cs.committed {
893 return
894 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000895 cs.replayBufferSize += sz
896 if cs.replayBufferSize > cs.callInfo.maxRetryRPCBufferSize {
William Kurkianea869482019-04-09 15:16:11 -0400897 cs.commitAttemptLocked()
Abhay Kumara61c5222025-11-10 07:32:50 +0000898 cleanup()
William Kurkianea869482019-04-09 15:16:11 -0400899 return
900 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000901 cs.replayBuffer = append(cs.replayBuffer, replayOp{op: op, cleanup: cleanup})
William Kurkianea869482019-04-09 15:16:11 -0400902}
903
Abhay Kumara61c5222025-11-10 07:32:50 +0000904func (cs *clientStream) SendMsg(m any) (err error) {
William Kurkianea869482019-04-09 15:16:11 -0400905 defer func() {
906 if err != nil && err != io.EOF {
907 // Call finish on the client stream for errors generated by this SendMsg
908 // call, as these indicate problems created by this client. (Transport
909 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
910 // error will be returned from RecvMsg eventually in that case, or be
911 // retried.)
912 cs.finish(err)
913 }
914 }()
915 if cs.sentLast {
916 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
917 }
918 if !cs.desc.ClientStreams {
919 cs.sentLast = true
920 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530921
922 // load hdr, payload, data
Abhay Kumara61c5222025-11-10 07:32:50 +0000923 hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.compressorV0, cs.compressorV1, cs.cc.dopts.copts.BufferPool)
William Kurkianea869482019-04-09 15:16:11 -0400924 if err != nil {
925 return err
926 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530927
Abhay Kumara61c5222025-11-10 07:32:50 +0000928 defer func() {
929 data.Free()
930 // only free payload if compression was made, and therefore it is a different set
931 // of buffers from data.
932 if pf.isCompressed() {
933 payload.Free()
934 }
935 }()
936
937 dataLen := data.Len()
938 payloadLen := payload.Len()
William Kurkianea869482019-04-09 15:16:11 -0400939 // TODO(dfawley): should we be checking len(data) instead?
Abhay Kumara61c5222025-11-10 07:32:50 +0000940 if payloadLen > *cs.callInfo.maxSendMessageSize {
941 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, *cs.callInfo.maxSendMessageSize)
William Kurkianea869482019-04-09 15:16:11 -0400942 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000943
944 // always take an extra ref in case data == payload (i.e. when the data isn't
945 // compressed). The original ref will always be freed by the deferred free above.
946 payload.Ref()
William Kurkianea869482019-04-09 15:16:11 -0400947 op := func(a *csAttempt) error {
Abhay Kumara61c5222025-11-10 07:32:50 +0000948 return a.sendMsg(m, hdr, payload, dataLen, payloadLen)
William Kurkianea869482019-04-09 15:16:11 -0400949 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000950
951 // onSuccess is invoked when the op is captured for a subsequent retry. If the
952 // stream was established by a previous message and therefore retries are
953 // disabled, onSuccess will not be invoked, and payloadRef can be freed
954 // immediately.
955 onSuccessCalled := false
956 err = cs.withRetry(op, func() {
957 cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free)
958 onSuccessCalled = true
959 })
960 if !onSuccessCalled {
961 payload.Free()
962 }
963 if len(cs.binlogs) != 0 && err == nil {
964 cm := &binarylog.ClientMessage{
William Kurkianea869482019-04-09 15:16:11 -0400965 OnClientSide: true,
Abhay Kumara61c5222025-11-10 07:32:50 +0000966 Message: data.Materialize(),
967 }
968 for _, binlog := range cs.binlogs {
969 binlog.Log(cs.ctx, cm)
970 }
William Kurkianea869482019-04-09 15:16:11 -0400971 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000972 return err
William Kurkianea869482019-04-09 15:16:11 -0400973}
974
Abhay Kumara61c5222025-11-10 07:32:50 +0000975func (cs *clientStream) RecvMsg(m any) error {
976 if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
William Kurkianea869482019-04-09 15:16:11 -0400977 // Call Header() to binary log header if it's not already logged.
978 cs.Header()
979 }
980 var recvInfo *payloadInfo
Abhay Kumara61c5222025-11-10 07:32:50 +0000981 if len(cs.binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -0400982 recvInfo = &payloadInfo{}
Abhay Kumara61c5222025-11-10 07:32:50 +0000983 defer recvInfo.free()
William Kurkianea869482019-04-09 15:16:11 -0400984 }
985 err := cs.withRetry(func(a *csAttempt) error {
986 return a.recvMsg(m, recvInfo)
987 }, cs.commitAttemptLocked)
Abhay Kumara61c5222025-11-10 07:32:50 +0000988 if len(cs.binlogs) != 0 && err == nil {
989 sm := &binarylog.ServerMessage{
William Kurkianea869482019-04-09 15:16:11 -0400990 OnClientSide: true,
Abhay Kumara61c5222025-11-10 07:32:50 +0000991 Message: recvInfo.uncompressedBytes.Materialize(),
992 }
993 for _, binlog := range cs.binlogs {
994 binlog.Log(cs.ctx, sm)
995 }
William Kurkianea869482019-04-09 15:16:11 -0400996 }
997 if err != nil || !cs.desc.ServerStreams {
998 // err != nil or non-server-streaming indicates end of stream.
999 cs.finish(err)
William Kurkianea869482019-04-09 15:16:11 -04001000 }
1001 return err
1002}
1003
1004func (cs *clientStream) CloseSend() error {
1005 if cs.sentLast {
Abhay Kumara61c5222025-11-10 07:32:50 +00001006 // Return a nil error on repeated calls to this method.
William Kurkianea869482019-04-09 15:16:11 -04001007 return nil
1008 }
1009 cs.sentLast = true
1010 op := func(a *csAttempt) error {
Abhay Kumara61c5222025-11-10 07:32:50 +00001011 a.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
William Kurkianea869482019-04-09 15:16:11 -04001012 // Always return nil; io.EOF is the only error that might make sense
1013 // instead, but there is no need to signal the client to call RecvMsg
1014 // as the only use left for the stream after CloseSend is to call
1015 // RecvMsg. This also matches historical behavior.
1016 return nil
1017 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001018 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) })
1019 if len(cs.binlogs) != 0 {
1020 chc := &binarylog.ClientHalfClose{
William Kurkianea869482019-04-09 15:16:11 -04001021 OnClientSide: true,
Abhay Kumara61c5222025-11-10 07:32:50 +00001022 }
1023 for _, binlog := range cs.binlogs {
1024 binlog.Log(cs.ctx, chc)
1025 }
William Kurkianea869482019-04-09 15:16:11 -04001026 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001027 // We don't return an error here as we expect users to read all messages
1028 // from the stream and get the RPC status from RecvMsg(). Note that
1029 // SendMsg() must return an error when one occurs so the application
1030 // knows to stop sending messages, but that does not apply here.
William Kurkianea869482019-04-09 15:16:11 -04001031 return nil
1032}
1033
1034func (cs *clientStream) finish(err error) {
1035 if err == io.EOF {
1036 // Ending a stream with EOF indicates a success.
1037 err = nil
1038 }
1039 cs.mu.Lock()
1040 if cs.finished {
1041 cs.mu.Unlock()
1042 return
1043 }
1044 cs.finished = true
Abhay Kumara61c5222025-11-10 07:32:50 +00001045 for _, onFinish := range cs.callInfo.onFinish {
1046 onFinish(err)
1047 }
William Kurkianea869482019-04-09 15:16:11 -04001048 cs.commitAttemptLocked()
Abhay Kumara61c5222025-11-10 07:32:50 +00001049 if cs.attempt != nil {
1050 cs.attempt.finish(err)
1051 // after functions all rely upon having a stream.
1052 if cs.attempt.transportStream != nil {
1053 for _, o := range cs.opts {
1054 o.after(cs.callInfo, cs.attempt)
1055 }
1056 }
1057 }
1058
William Kurkianea869482019-04-09 15:16:11 -04001059 cs.mu.Unlock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001060 // Only one of cancel or trailer needs to be logged.
1061 if len(cs.binlogs) != 0 {
1062 switch err {
1063 case errContextCanceled, errContextDeadline, ErrClientConnClosing:
1064 c := &binarylog.Cancel{
1065 OnClientSide: true,
1066 }
1067 for _, binlog := range cs.binlogs {
1068 binlog.Log(cs.ctx, c)
1069 }
1070 default:
1071 logEntry := &binarylog.ServerTrailer{
1072 OnClientSide: true,
1073 Trailer: cs.Trailer(),
1074 Err: err,
1075 }
1076 if peer, ok := peer.FromContext(cs.Context()); ok {
1077 logEntry.PeerAddr = peer.Addr
1078 }
1079 for _, binlog := range cs.binlogs {
1080 binlog.Log(cs.ctx, logEntry)
1081 }
1082 }
William Kurkianea869482019-04-09 15:16:11 -04001083 }
1084 if err == nil {
1085 cs.retryThrottler.successfulRPC()
1086 }
1087 if channelz.IsOn() {
1088 if err != nil {
1089 cs.cc.incrCallsFailed()
1090 } else {
1091 cs.cc.incrCallsSucceeded()
1092 }
1093 }
William Kurkianea869482019-04-09 15:16:11 -04001094 cs.cancel()
1095}
1096
Abhay Kumara61c5222025-11-10 07:32:50 +00001097func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error {
William Kurkianea869482019-04-09 15:16:11 -04001098 cs := a.cs
Abhilash S.L3b494632019-07-16 15:51:09 +05301099 if a.trInfo != nil {
William Kurkianea869482019-04-09 15:16:11 -04001100 a.mu.Lock()
1101 if a.trInfo.tr != nil {
1102 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1103 }
1104 a.mu.Unlock()
1105 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001106 if err := a.transportStream.Write(hdr, payld, &transport.WriteOptions{Last: !cs.desc.ClientStreams}); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001107 if !cs.desc.ClientStreams {
1108 // For non-client-streaming RPCs, we return nil instead of EOF on error
1109 // because the generated code requires it. finish is not called; RecvMsg()
1110 // will call it with the stream's status independently.
1111 return nil
1112 }
1113 return io.EOF
1114 }
bseeniva0b9cbcb2026-02-12 19:11:11 +05301115 if a.statsHandler != nil {
1116 a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, dataLength, payloadLength, time.Now()))
William Kurkianea869482019-04-09 15:16:11 -04001117 }
1118 return nil
1119}
1120
Abhay Kumara61c5222025-11-10 07:32:50 +00001121func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
William Kurkianea869482019-04-09 15:16:11 -04001122 cs := a.cs
bseeniva0b9cbcb2026-02-12 19:11:11 +05301123 if a.statsHandler != nil && payInfo == nil {
William Kurkianea869482019-04-09 15:16:11 -04001124 payInfo = &payloadInfo{}
Abhay Kumara61c5222025-11-10 07:32:50 +00001125 defer payInfo.free()
William Kurkianea869482019-04-09 15:16:11 -04001126 }
1127
Abhay Kumara61c5222025-11-10 07:32:50 +00001128 if !a.decompressorSet {
William Kurkianea869482019-04-09 15:16:11 -04001129 // Block until we receive headers containing received message encoding.
Abhay Kumara61c5222025-11-10 07:32:50 +00001130 if ct := a.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1131 if a.decompressorV0 == nil || a.decompressorV0.Type() != ct {
William Kurkianea869482019-04-09 15:16:11 -04001132 // No configured decompressor, or it does not match the incoming
1133 // message encoding; attempt to find a registered compressor that does.
Abhay Kumara61c5222025-11-10 07:32:50 +00001134 a.decompressorV0 = nil
1135 a.decompressorV1 = encoding.GetCompressor(ct)
William Kurkianea869482019-04-09 15:16:11 -04001136 }
1137 } else {
1138 // No compression is used; disable our decompressor.
Abhay Kumara61c5222025-11-10 07:32:50 +00001139 a.decompressorV0 = nil
William Kurkianea869482019-04-09 15:16:11 -04001140 }
1141 // Only initialize this state once per stream.
Abhay Kumara61c5222025-11-10 07:32:50 +00001142 a.decompressorSet = true
William Kurkianea869482019-04-09 15:16:11 -04001143 }
bseeniva0b9cbcb2026-02-12 19:11:11 +05301144 if err := recv(&a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001145 if err == io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001146 if statusErr := a.transportStream.Status().Err(); statusErr != nil {
William Kurkianea869482019-04-09 15:16:11 -04001147 return statusErr
1148 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001149 // Received no msg and status OK for non-server streaming rpcs.
1150 if !cs.desc.ServerStreams && !cs.receivedFirstMsg {
1151 return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
1152 }
William Kurkianea869482019-04-09 15:16:11 -04001153 return io.EOF // indicates successful end of stream.
1154 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001155
William Kurkianea869482019-04-09 15:16:11 -04001156 return toRPCErr(err)
1157 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001158 cs.receivedFirstMsg = true
Abhilash S.L3b494632019-07-16 15:51:09 +05301159 if a.trInfo != nil {
William Kurkianea869482019-04-09 15:16:11 -04001160 a.mu.Lock()
1161 if a.trInfo.tr != nil {
1162 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1163 }
1164 a.mu.Unlock()
1165 }
bseeniva0b9cbcb2026-02-12 19:11:11 +05301166 if a.statsHandler != nil {
1167 a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
Abhay Kumara61c5222025-11-10 07:32:50 +00001168 Client: true,
1169 RecvTime: time.Now(),
1170 Payload: m,
1171 WireLength: payInfo.compressedLength + headerLen,
1172 CompressedLength: payInfo.compressedLength,
1173 Length: payInfo.uncompressedBytes.Len(),
William Kurkianea869482019-04-09 15:16:11 -04001174 })
1175 }
William Kurkianea869482019-04-09 15:16:11 -04001176 if cs.desc.ServerStreams {
1177 // Subsequent messages should be received by subsequent RecvMsg calls.
1178 return nil
1179 }
1180 // Special handling for non-server-stream rpcs.
1181 // This recv expects EOF or errors, so we don't collect inPayload.
bseeniva0b9cbcb2026-02-12 19:11:11 +05301182 if err := recv(&a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001183 return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1184 } else if err != nil {
1185 return toRPCErr(err)
William Kurkianea869482019-04-09 15:16:11 -04001186 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001187 return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
William Kurkianea869482019-04-09 15:16:11 -04001188}
1189
1190func (a *csAttempt) finish(err error) {
1191 a.mu.Lock()
1192 if a.finished {
1193 a.mu.Unlock()
1194 return
1195 }
1196 a.finished = true
1197 if err == io.EOF {
1198 // Ending a stream with EOF indicates a success.
1199 err = nil
1200 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301201 var tr metadata.MD
Abhay Kumara61c5222025-11-10 07:32:50 +00001202 if a.transportStream != nil {
1203 a.transportStream.Close(err)
1204 tr = a.transportStream.Trailer()
William Kurkianea869482019-04-09 15:16:11 -04001205 }
1206
Abhay Kumara61c5222025-11-10 07:32:50 +00001207 if a.pickResult.Done != nil {
William Kurkianea869482019-04-09 15:16:11 -04001208 br := false
Abhay Kumara61c5222025-11-10 07:32:50 +00001209 if a.transportStream != nil {
1210 br = a.transportStream.BytesReceived()
William Kurkianea869482019-04-09 15:16:11 -04001211 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001212 a.pickResult.Done(balancer.DoneInfo{
William Kurkianea869482019-04-09 15:16:11 -04001213 Err: err,
1214 Trailer: tr,
Abhay Kumara61c5222025-11-10 07:32:50 +00001215 BytesSent: a.transportStream != nil,
William Kurkianea869482019-04-09 15:16:11 -04001216 BytesReceived: br,
Abhilash S.L3b494632019-07-16 15:51:09 +05301217 ServerLoad: balancerload.Parse(tr),
William Kurkianea869482019-04-09 15:16:11 -04001218 })
1219 }
bseeniva0b9cbcb2026-02-12 19:11:11 +05301220 if a.statsHandler != nil {
1221 a.statsHandler.HandleRPC(a.ctx, &stats.End{
William Kurkianea869482019-04-09 15:16:11 -04001222 Client: true,
Abhay Kumara61c5222025-11-10 07:32:50 +00001223 BeginTime: a.beginTime,
William Kurkianea869482019-04-09 15:16:11 -04001224 EndTime: time.Now(),
Abhilash S.L3b494632019-07-16 15:51:09 +05301225 Trailer: tr,
William Kurkianea869482019-04-09 15:16:11 -04001226 Error: err,
bseeniva0b9cbcb2026-02-12 19:11:11 +05301227 })
William Kurkianea869482019-04-09 15:16:11 -04001228 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301229 if a.trInfo != nil && a.trInfo.tr != nil {
William Kurkianea869482019-04-09 15:16:11 -04001230 if err == nil {
1231 a.trInfo.tr.LazyPrintf("RPC: [OK]")
1232 } else {
1233 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
1234 a.trInfo.tr.SetError()
1235 }
1236 a.trInfo.tr.Finish()
1237 a.trInfo.tr = nil
1238 }
1239 a.mu.Unlock()
1240}
1241
Abhay Kumara61c5222025-11-10 07:32:50 +00001242// newNonRetryClientStream creates a ClientStream with the specified transport, on the
Abhilash S.L3b494632019-07-16 15:51:09 +05301243// given addrConn.
1244//
1245// It's expected that the given transport is either the same one in addrConn, or
1246// is already closed. To avoid race, transport is specified separately, instead
Abhay Kumara61c5222025-11-10 07:32:50 +00001247// of using ac.transport.
Abhilash S.L3b494632019-07-16 15:51:09 +05301248//
1249// Main difference between this and ClientConn.NewStream:
1250// - no retry
1251// - no service config (or wait for service config)
1252// - no tracing or stats
1253func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
William Kurkianea869482019-04-09 15:16:11 -04001254 if t == nil {
1255 // TODO: return RPC error here?
1256 return nil, errors.New("transport provided is nil")
1257 }
1258 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
1259 c := &callInfo{}
1260
William Kurkianea869482019-04-09 15:16:11 -04001261 // Possible context leak:
1262 // The cancel function for the child context we create will only be called
1263 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
1264 // an error is generated by SendMsg.
1265 // https://github.com/grpc/grpc-go/issues/1818.
1266 ctx, cancel := context.WithCancel(ctx)
1267 defer func() {
1268 if err != nil {
1269 cancel()
1270 }
1271 }()
1272
Abhilash S.L3b494632019-07-16 15:51:09 +05301273 for _, o := range opts {
1274 if err := o.before(c); err != nil {
1275 return nil, toRPCErr(err)
1276 }
1277 }
1278 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
1279 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
William Kurkianea869482019-04-09 15:16:11 -04001280 if err := setCallInfoCodec(c); err != nil {
1281 return nil, err
1282 }
1283
1284 callHdr := &transport.CallHdr{
1285 Host: ac.cc.authority,
1286 Method: method,
1287 ContentSubtype: c.contentSubtype,
1288 }
1289
1290 // Set our outgoing compression according to the UseCompressor CallOption, if
1291 // set. In that case, also find the compressor from the encoding package.
1292 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1293 // if set.
1294 var cp Compressor
1295 var comp encoding.Compressor
Abhay Kumara61c5222025-11-10 07:32:50 +00001296 if ct := c.compressorName; ct != "" {
William Kurkianea869482019-04-09 15:16:11 -04001297 callHdr.SendCompress = ct
1298 if ct != encoding.Identity {
1299 comp = encoding.GetCompressor(ct)
1300 if comp == nil {
1301 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1302 }
1303 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001304 } else if ac.cc.dopts.compressorV0 != nil {
1305 callHdr.SendCompress = ac.cc.dopts.compressorV0.Type()
1306 cp = ac.cc.dopts.compressorV0
William Kurkianea869482019-04-09 15:16:11 -04001307 }
1308 if c.creds != nil {
1309 callHdr.Creds = c.creds
1310 }
1311
Abhilash S.L3b494632019-07-16 15:51:09 +05301312 // Use a special addrConnStream to avoid retry.
William Kurkianea869482019-04-09 15:16:11 -04001313 as := &addrConnStream{
Abhay Kumara61c5222025-11-10 07:32:50 +00001314 callHdr: callHdr,
1315 ac: ac,
1316 ctx: ctx,
1317 cancel: cancel,
1318 opts: opts,
1319 callInfo: c,
1320 desc: desc,
1321 codec: c.codec,
1322 sendCompressorV0: cp,
1323 sendCompressorV1: comp,
1324 transport: t,
William Kurkianea869482019-04-09 15:16:11 -04001325 }
1326
Abhay Kumara61c5222025-11-10 07:32:50 +00001327 s, err := as.transport.NewStream(as.ctx, as.callHdr)
William Kurkianea869482019-04-09 15:16:11 -04001328 if err != nil {
1329 err = toRPCErr(err)
1330 return nil, err
1331 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001332 as.transportStream = s
bseeniva0b9cbcb2026-02-12 19:11:11 +05301333 as.parser = parser{r: s, bufferPool: ac.dopts.copts.BufferPool}
William Kurkianea869482019-04-09 15:16:11 -04001334 ac.incrCallsStarted()
1335 if desc != unaryStreamDesc {
Abhay Kumara61c5222025-11-10 07:32:50 +00001336 // Listen on stream context to cleanup when the stream context is
1337 // canceled. Also listen for the addrConn's context in case the
1338 // addrConn is closed or reconnects to a different address. In all
1339 // other cases, an error should already be injected into the recv
1340 // buffer by the transport, which the client will eventually receive,
1341 // and then we will cancel the stream's context in
1342 // addrConnStream.finish.
William Kurkianea869482019-04-09 15:16:11 -04001343 go func() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001344 ac.mu.Lock()
1345 acCtx := ac.ctx
1346 ac.mu.Unlock()
William Kurkianea869482019-04-09 15:16:11 -04001347 select {
Abhay Kumara61c5222025-11-10 07:32:50 +00001348 case <-acCtx.Done():
William Kurkianea869482019-04-09 15:16:11 -04001349 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1350 case <-ctx.Done():
1351 as.finish(toRPCErr(ctx.Err()))
1352 }
1353 }()
1354 }
1355 return as, nil
1356}
1357
1358type addrConnStream struct {
Abhay Kumara61c5222025-11-10 07:32:50 +00001359 transportStream *transport.ClientStream
1360 ac *addrConn
1361 callHdr *transport.CallHdr
1362 cancel context.CancelFunc
1363 opts []CallOption
1364 callInfo *callInfo
1365 transport transport.ClientTransport
1366 ctx context.Context
1367 sentLast bool
1368 receivedFirstMsg bool
1369 desc *StreamDesc
1370 codec baseCodec
1371 sendCompressorV0 Compressor
1372 sendCompressorV1 encoding.Compressor
1373 decompressorSet bool
1374 decompressorV0 Decompressor
1375 decompressorV1 encoding.Compressor
bseeniva0b9cbcb2026-02-12 19:11:11 +05301376 parser parser
Abhay Kumara61c5222025-11-10 07:32:50 +00001377
1378 // mu guards finished and is held for the entire finish method.
1379 mu sync.Mutex
1380 finished bool
William Kurkianea869482019-04-09 15:16:11 -04001381}
1382
1383func (as *addrConnStream) Header() (metadata.MD, error) {
Abhay Kumara61c5222025-11-10 07:32:50 +00001384 m, err := as.transportStream.Header()
William Kurkianea869482019-04-09 15:16:11 -04001385 if err != nil {
1386 as.finish(toRPCErr(err))
1387 }
1388 return m, err
1389}
1390
1391func (as *addrConnStream) Trailer() metadata.MD {
Abhay Kumara61c5222025-11-10 07:32:50 +00001392 return as.transportStream.Trailer()
William Kurkianea869482019-04-09 15:16:11 -04001393}
1394
1395func (as *addrConnStream) CloseSend() error {
1396 if as.sentLast {
Abhay Kumara61c5222025-11-10 07:32:50 +00001397 // Return a nil error on repeated calls to this method.
William Kurkianea869482019-04-09 15:16:11 -04001398 return nil
1399 }
1400 as.sentLast = true
1401
Abhay Kumara61c5222025-11-10 07:32:50 +00001402 as.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
William Kurkianea869482019-04-09 15:16:11 -04001403 // Always return nil; io.EOF is the only error that might make sense
1404 // instead, but there is no need to signal the client to call RecvMsg
1405 // as the only use left for the stream after CloseSend is to call
1406 // RecvMsg. This also matches historical behavior.
1407 return nil
1408}
1409
1410func (as *addrConnStream) Context() context.Context {
Abhay Kumara61c5222025-11-10 07:32:50 +00001411 return as.transportStream.Context()
William Kurkianea869482019-04-09 15:16:11 -04001412}
1413
Abhay Kumara61c5222025-11-10 07:32:50 +00001414func (as *addrConnStream) SendMsg(m any) (err error) {
William Kurkianea869482019-04-09 15:16:11 -04001415 defer func() {
1416 if err != nil && err != io.EOF {
1417 // Call finish on the client stream for errors generated by this SendMsg
1418 // call, as these indicate problems created by this client. (Transport
1419 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1420 // error will be returned from RecvMsg eventually in that case, or be
1421 // retried.)
1422 as.finish(err)
1423 }
1424 }()
1425 if as.sentLast {
1426 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1427 }
1428 if !as.desc.ClientStreams {
1429 as.sentLast = true
1430 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301431
1432 // load hdr, payload, data
Abhay Kumara61c5222025-11-10 07:32:50 +00001433 hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.sendCompressorV0, as.sendCompressorV1, as.ac.dopts.copts.BufferPool)
William Kurkianea869482019-04-09 15:16:11 -04001434 if err != nil {
1435 return err
1436 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301437
Abhay Kumara61c5222025-11-10 07:32:50 +00001438 defer func() {
1439 data.Free()
1440 // only free payload if compression was made, and therefore it is a different set
1441 // of buffers from data.
1442 if pf.isCompressed() {
1443 payload.Free()
1444 }
1445 }()
1446
William Kurkianea869482019-04-09 15:16:11 -04001447 // TODO(dfawley): should we be checking len(data) instead?
Abhay Kumara61c5222025-11-10 07:32:50 +00001448 if payload.Len() > *as.callInfo.maxSendMessageSize {
1449 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payload.Len(), *as.callInfo.maxSendMessageSize)
William Kurkianea869482019-04-09 15:16:11 -04001450 }
1451
Abhay Kumara61c5222025-11-10 07:32:50 +00001452 if err := as.transportStream.Write(hdr, payload, &transport.WriteOptions{Last: !as.desc.ClientStreams}); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001453 if !as.desc.ClientStreams {
1454 // For non-client-streaming RPCs, we return nil instead of EOF on error
1455 // because the generated code requires it. finish is not called; RecvMsg()
1456 // will call it with the stream's status independently.
1457 return nil
1458 }
1459 return io.EOF
1460 }
1461
William Kurkianea869482019-04-09 15:16:11 -04001462 return nil
1463}
1464
Abhay Kumara61c5222025-11-10 07:32:50 +00001465func (as *addrConnStream) RecvMsg(m any) (err error) {
William Kurkianea869482019-04-09 15:16:11 -04001466 defer func() {
1467 if err != nil || !as.desc.ServerStreams {
1468 // err != nil or non-server-streaming indicates end of stream.
1469 as.finish(err)
1470 }
1471 }()
1472
Abhay Kumara61c5222025-11-10 07:32:50 +00001473 if !as.decompressorSet {
William Kurkianea869482019-04-09 15:16:11 -04001474 // Block until we receive headers containing received message encoding.
Abhay Kumara61c5222025-11-10 07:32:50 +00001475 if ct := as.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1476 if as.decompressorV0 == nil || as.decompressorV0.Type() != ct {
William Kurkianea869482019-04-09 15:16:11 -04001477 // No configured decompressor, or it does not match the incoming
1478 // message encoding; attempt to find a registered compressor that does.
Abhay Kumara61c5222025-11-10 07:32:50 +00001479 as.decompressorV0 = nil
1480 as.decompressorV1 = encoding.GetCompressor(ct)
William Kurkianea869482019-04-09 15:16:11 -04001481 }
1482 } else {
1483 // No compression is used; disable our decompressor.
Abhay Kumara61c5222025-11-10 07:32:50 +00001484 as.decompressorV0 = nil
William Kurkianea869482019-04-09 15:16:11 -04001485 }
1486 // Only initialize this state once per stream.
Abhay Kumara61c5222025-11-10 07:32:50 +00001487 as.decompressorSet = true
William Kurkianea869482019-04-09 15:16:11 -04001488 }
bseeniva0b9cbcb2026-02-12 19:11:11 +05301489 if err := recv(&as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001490 if err == io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001491 if statusErr := as.transportStream.Status().Err(); statusErr != nil {
William Kurkianea869482019-04-09 15:16:11 -04001492 return statusErr
1493 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001494 // Received no msg and status OK for non-server streaming rpcs.
1495 if !as.desc.ServerStreams && !as.receivedFirstMsg {
1496 return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
1497 }
William Kurkianea869482019-04-09 15:16:11 -04001498 return io.EOF // indicates successful end of stream.
1499 }
1500 return toRPCErr(err)
1501 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001502 as.receivedFirstMsg = true
William Kurkianea869482019-04-09 15:16:11 -04001503
William Kurkianea869482019-04-09 15:16:11 -04001504 if as.desc.ServerStreams {
1505 // Subsequent messages should be received by subsequent RecvMsg calls.
1506 return nil
1507 }
1508
1509 // Special handling for non-server-stream rpcs.
1510 // This recv expects EOF or errors, so we don't collect inPayload.
bseeniva0b9cbcb2026-02-12 19:11:11 +05301511 if err := recv(&as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err == io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001512 return as.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1513 } else if err != nil {
1514 return toRPCErr(err)
William Kurkianea869482019-04-09 15:16:11 -04001515 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001516 return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
William Kurkianea869482019-04-09 15:16:11 -04001517}
1518
1519func (as *addrConnStream) finish(err error) {
1520 as.mu.Lock()
1521 if as.finished {
1522 as.mu.Unlock()
1523 return
1524 }
1525 as.finished = true
1526 if err == io.EOF {
1527 // Ending a stream with EOF indicates a success.
1528 err = nil
1529 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001530 if as.transportStream != nil {
1531 as.transportStream.Close(err)
William Kurkianea869482019-04-09 15:16:11 -04001532 }
1533
1534 if err != nil {
1535 as.ac.incrCallsFailed()
1536 } else {
1537 as.ac.incrCallsSucceeded()
1538 }
1539 as.cancel()
1540 as.mu.Unlock()
1541}
1542
1543// ServerStream defines the server-side behavior of a streaming RPC.
1544//
Abhay Kumara61c5222025-11-10 07:32:50 +00001545// Errors returned from ServerStream methods are compatible with the status
1546// package. However, the status code will often not match the RPC status as
1547// seen by the client application, and therefore, should not be relied upon for
1548// this purpose.
William Kurkianea869482019-04-09 15:16:11 -04001549type ServerStream interface {
1550 // SetHeader sets the header metadata. It may be called multiple times.
1551 // When call multiple times, all the provided metadata will be merged.
1552 // All the metadata will be sent out when one of the following happens:
1553 // - ServerStream.SendHeader() is called;
1554 // - The first response is sent out;
1555 // - An RPC status is sent out (error or success).
1556 SetHeader(metadata.MD) error
1557 // SendHeader sends the header metadata.
1558 // The provided md and headers set by SetHeader() will be sent.
1559 // It fails if called multiple times.
1560 SendHeader(metadata.MD) error
1561 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1562 // When called more than once, all the provided metadata will be merged.
1563 SetTrailer(metadata.MD)
1564 // Context returns the context for this stream.
1565 Context() context.Context
1566 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1567 // error is returned directly.
1568 //
1569 // SendMsg blocks until:
1570 // - There is sufficient flow control to schedule m with the transport, or
1571 // - The stream is done, or
1572 // - The stream breaks.
1573 //
1574 // SendMsg does not wait until the message is received by the client. An
1575 // untimely stream closure may result in lost messages.
1576 //
1577 // It is safe to have a goroutine calling SendMsg and another goroutine
1578 // calling RecvMsg on the same stream at the same time, but it is not safe
1579 // to call SendMsg on the same stream in different goroutines.
Abhay Kumara61c5222025-11-10 07:32:50 +00001580 //
1581 // It is not safe to modify the message after calling SendMsg. Tracing
1582 // libraries and stats handlers may use the message lazily.
1583 SendMsg(m any) error
William Kurkianea869482019-04-09 15:16:11 -04001584 // RecvMsg blocks until it receives a message into m or the stream is
1585 // done. It returns io.EOF when the client has performed a CloseSend. On
1586 // any non-EOF error, the stream is aborted and the error contains the
1587 // RPC status.
1588 //
1589 // It is safe to have a goroutine calling SendMsg and another goroutine
1590 // calling RecvMsg on the same stream at the same time, but it is not
1591 // safe to call RecvMsg on the same stream in different goroutines.
Abhay Kumara61c5222025-11-10 07:32:50 +00001592 RecvMsg(m any) error
William Kurkianea869482019-04-09 15:16:11 -04001593}
1594
1595// serverStream implements a server side Stream.
1596type serverStream struct {
1597 ctx context.Context
Abhay Kumara61c5222025-11-10 07:32:50 +00001598 s *transport.ServerStream
bseeniva0b9cbcb2026-02-12 19:11:11 +05301599 p parser
William Kurkianea869482019-04-09 15:16:11 -04001600 codec baseCodec
Abhay Kumara61c5222025-11-10 07:32:50 +00001601 desc *StreamDesc
William Kurkianea869482019-04-09 15:16:11 -04001602
Abhay Kumara61c5222025-11-10 07:32:50 +00001603 compressorV0 Compressor
1604 compressorV1 encoding.Compressor
1605 decompressorV0 Decompressor
1606 decompressorV1 encoding.Compressor
1607
1608 sendCompressorName string
1609
1610 recvFirstMsg bool // set after the first message is received
William Kurkianea869482019-04-09 15:16:11 -04001611
1612 maxReceiveMessageSize int
1613 maxSendMessageSize int
1614 trInfo *traceInfo
1615
bseeniva0b9cbcb2026-02-12 19:11:11 +05301616 statsHandler stats.Handler
William Kurkianea869482019-04-09 15:16:11 -04001617
Abhay Kumara61c5222025-11-10 07:32:50 +00001618 binlogs []binarylog.MethodLogger
William Kurkianea869482019-04-09 15:16:11 -04001619 // serverHeaderBinlogged indicates whether server header has been logged. It
1620 // will happen when one of the following two happens: stream.SendHeader(),
1621 // stream.Send().
1622 //
1623 // It's only checked in send and sendHeader, doesn't need to be
1624 // synchronized.
1625 serverHeaderBinlogged bool
1626
1627 mu sync.Mutex // protects trInfo.tr after the service handler runs.
1628}
1629
1630func (ss *serverStream) Context() context.Context {
1631 return ss.ctx
1632}
1633
1634func (ss *serverStream) SetHeader(md metadata.MD) error {
1635 if md.Len() == 0 {
1636 return nil
1637 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001638 err := imetadata.Validate(md)
1639 if err != nil {
1640 return status.Error(codes.Internal, err.Error())
1641 }
William Kurkianea869482019-04-09 15:16:11 -04001642 return ss.s.SetHeader(md)
1643}
1644
1645func (ss *serverStream) SendHeader(md metadata.MD) error {
Abhay Kumara61c5222025-11-10 07:32:50 +00001646 err := imetadata.Validate(md)
1647 if err != nil {
1648 return status.Error(codes.Internal, err.Error())
1649 }
1650
1651 err = ss.s.SendHeader(md)
1652 if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
William Kurkianea869482019-04-09 15:16:11 -04001653 h, _ := ss.s.Header()
Abhay Kumara61c5222025-11-10 07:32:50 +00001654 sh := &binarylog.ServerHeader{
William Kurkianea869482019-04-09 15:16:11 -04001655 Header: h,
Abhay Kumara61c5222025-11-10 07:32:50 +00001656 }
William Kurkianea869482019-04-09 15:16:11 -04001657 ss.serverHeaderBinlogged = true
Abhay Kumara61c5222025-11-10 07:32:50 +00001658 for _, binlog := range ss.binlogs {
1659 binlog.Log(ss.ctx, sh)
1660 }
William Kurkianea869482019-04-09 15:16:11 -04001661 }
1662 return err
1663}
1664
1665func (ss *serverStream) SetTrailer(md metadata.MD) {
1666 if md.Len() == 0 {
1667 return
1668 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001669 if err := imetadata.Validate(md); err != nil {
1670 logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
1671 }
William Kurkianea869482019-04-09 15:16:11 -04001672 ss.s.SetTrailer(md)
1673}
1674
Abhay Kumara61c5222025-11-10 07:32:50 +00001675func (ss *serverStream) SendMsg(m any) (err error) {
William Kurkianea869482019-04-09 15:16:11 -04001676 defer func() {
1677 if ss.trInfo != nil {
1678 ss.mu.Lock()
1679 if ss.trInfo.tr != nil {
1680 if err == nil {
1681 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1682 } else {
Abhay Kumara61c5222025-11-10 07:32:50 +00001683 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
William Kurkianea869482019-04-09 15:16:11 -04001684 ss.trInfo.tr.SetError()
1685 }
1686 }
1687 ss.mu.Unlock()
1688 }
1689 if err != nil && err != io.EOF {
1690 st, _ := status.FromError(toRPCErr(err))
Abhay Kumara61c5222025-11-10 07:32:50 +00001691 ss.s.WriteStatus(st)
William Kurkianea869482019-04-09 15:16:11 -04001692 // Non-user specified status was sent out. This should be an error
1693 // case (as a server side Cancel maybe).
1694 //
1695 // This is not handled specifically now. User will return a final
1696 // status from the service handler, we will log that error instead.
1697 // This behavior is similar to an interceptor.
1698 }
William Kurkianea869482019-04-09 15:16:11 -04001699 }()
Abhilash S.L3b494632019-07-16 15:51:09 +05301700
Abhay Kumara61c5222025-11-10 07:32:50 +00001701 // Server handler could have set new compressor by calling SetSendCompressor.
1702 // In case it is set, we need to use it for compressing outbound message.
1703 if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
1704 ss.compressorV1 = encoding.GetCompressor(sendCompressorsName)
1705 ss.sendCompressorName = sendCompressorsName
1706 }
1707
Abhilash S.L3b494632019-07-16 15:51:09 +05301708 // load hdr, payload, data
Abhay Kumara61c5222025-11-10 07:32:50 +00001709 hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.compressorV0, ss.compressorV1, ss.p.bufferPool)
William Kurkianea869482019-04-09 15:16:11 -04001710 if err != nil {
1711 return err
1712 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301713
Abhay Kumara61c5222025-11-10 07:32:50 +00001714 defer func() {
1715 data.Free()
1716 // only free payload if compression was made, and therefore it is a different set
1717 // of buffers from data.
1718 if pf.isCompressed() {
1719 payload.Free()
1720 }
1721 }()
1722
1723 dataLen := data.Len()
1724 payloadLen := payload.Len()
1725
William Kurkianea869482019-04-09 15:16:11 -04001726 // TODO(dfawley): should we be checking len(data) instead?
Abhay Kumara61c5222025-11-10 07:32:50 +00001727 if payloadLen > ss.maxSendMessageSize {
1728 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, ss.maxSendMessageSize)
William Kurkianea869482019-04-09 15:16:11 -04001729 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001730 if err := ss.s.Write(hdr, payload, &transport.WriteOptions{Last: false}); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001731 return toRPCErr(err)
1732 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001733
1734 if len(ss.binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001735 if !ss.serverHeaderBinlogged {
1736 h, _ := ss.s.Header()
Abhay Kumara61c5222025-11-10 07:32:50 +00001737 sh := &binarylog.ServerHeader{
William Kurkianea869482019-04-09 15:16:11 -04001738 Header: h,
Abhay Kumara61c5222025-11-10 07:32:50 +00001739 }
William Kurkianea869482019-04-09 15:16:11 -04001740 ss.serverHeaderBinlogged = true
Abhay Kumara61c5222025-11-10 07:32:50 +00001741 for _, binlog := range ss.binlogs {
1742 binlog.Log(ss.ctx, sh)
1743 }
William Kurkianea869482019-04-09 15:16:11 -04001744 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001745 sm := &binarylog.ServerMessage{
1746 Message: data.Materialize(),
1747 }
1748 for _, binlog := range ss.binlogs {
1749 binlog.Log(ss.ctx, sm)
1750 }
William Kurkianea869482019-04-09 15:16:11 -04001751 }
bseeniva0b9cbcb2026-02-12 19:11:11 +05301752 if ss.statsHandler != nil {
1753 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, dataLen, payloadLen, time.Now()))
William Kurkianea869482019-04-09 15:16:11 -04001754 }
1755 return nil
1756}
1757
Abhay Kumara61c5222025-11-10 07:32:50 +00001758func (ss *serverStream) RecvMsg(m any) (err error) {
William Kurkianea869482019-04-09 15:16:11 -04001759 defer func() {
1760 if ss.trInfo != nil {
1761 ss.mu.Lock()
1762 if ss.trInfo.tr != nil {
1763 if err == nil {
1764 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1765 } else if err != io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001766 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
William Kurkianea869482019-04-09 15:16:11 -04001767 ss.trInfo.tr.SetError()
1768 }
1769 }
1770 ss.mu.Unlock()
1771 }
1772 if err != nil && err != io.EOF {
1773 st, _ := status.FromError(toRPCErr(err))
Abhay Kumara61c5222025-11-10 07:32:50 +00001774 ss.s.WriteStatus(st)
William Kurkianea869482019-04-09 15:16:11 -04001775 // Non-user specified status was sent out. This should be an error
1776 // case (as a server side Cancel maybe).
1777 //
1778 // This is not handled specifically now. User will return a final
1779 // status from the service handler, we will log that error instead.
1780 // This behavior is similar to an interceptor.
1781 }
William Kurkianea869482019-04-09 15:16:11 -04001782 }()
1783 var payInfo *payloadInfo
bseeniva0b9cbcb2026-02-12 19:11:11 +05301784 if ss.statsHandler != nil || len(ss.binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001785 payInfo = &payloadInfo{}
Abhay Kumara61c5222025-11-10 07:32:50 +00001786 defer payInfo.free()
William Kurkianea869482019-04-09 15:16:11 -04001787 }
bseeniva0b9cbcb2026-02-12 19:11:11 +05301788 if err := recv(&ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001789 if err == io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001790 if len(ss.binlogs) != 0 {
1791 chc := &binarylog.ClientHalfClose{}
1792 for _, binlog := range ss.binlogs {
1793 binlog.Log(ss.ctx, chc)
1794 }
1795 }
1796 // Received no request msg for non-client streaming rpcs.
1797 if !ss.desc.ClientStreams && !ss.recvFirstMsg {
1798 return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC")
William Kurkianea869482019-04-09 15:16:11 -04001799 }
1800 return err
1801 }
1802 if err == io.ErrUnexpectedEOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001803 err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
William Kurkianea869482019-04-09 15:16:11 -04001804 }
1805 return toRPCErr(err)
1806 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001807 ss.recvFirstMsg = true
bseeniva0b9cbcb2026-02-12 19:11:11 +05301808 if ss.statsHandler != nil {
1809 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
1810 RecvTime: time.Now(),
1811 Payload: m,
1812 Length: payInfo.uncompressedBytes.Len(),
1813 WireLength: payInfo.compressedLength + headerLen,
1814 CompressedLength: payInfo.compressedLength,
1815 })
William Kurkianea869482019-04-09 15:16:11 -04001816 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001817 if len(ss.binlogs) != 0 {
1818 cm := &binarylog.ClientMessage{
1819 Message: payInfo.uncompressedBytes.Materialize(),
1820 }
1821 for _, binlog := range ss.binlogs {
1822 binlog.Log(ss.ctx, cm)
1823 }
William Kurkianea869482019-04-09 15:16:11 -04001824 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001825
1826 if ss.desc.ClientStreams {
1827 // Subsequent messages should be received by subsequent RecvMsg calls.
1828 return nil
1829 }
1830 // Special handling for non-client-stream rpcs.
1831 // This recv expects EOF or errors, so we don't collect inPayload.
bseeniva0b9cbcb2026-02-12 19:11:11 +05301832 if err := recv(&ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, nil, ss.decompressorV1, true); err == io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001833 return nil
1834 } else if err != nil {
1835 return err
1836 }
1837 return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-client-streaming RPC")
William Kurkianea869482019-04-09 15:16:11 -04001838}
1839
1840// MethodFromServerStream returns the method string for the input stream.
1841// The returned string is in the format of "/service/method".
1842func MethodFromServerStream(stream ServerStream) (string, bool) {
1843 return Method(stream.Context())
1844}
Abhilash S.L3b494632019-07-16 15:51:09 +05301845
Abhay Kumara61c5222025-11-10 07:32:50 +00001846// prepareMsg returns the hdr, payload and data using the compressors passed or
1847// using the passed preparedmsg. The returned boolean indicates whether
1848// compression was made and therefore whether the payload needs to be freed in
1849// addition to the returned data. Freeing the payload if the returned boolean is
1850// false can lead to undefined behavior.
1851func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor, pool mem.BufferPool) (hdr []byte, data, payload mem.BufferSlice, pf payloadFormat, err error) {
Abhilash S.L3b494632019-07-16 15:51:09 +05301852 if preparedMsg, ok := m.(*PreparedMsg); ok {
Abhay Kumara61c5222025-11-10 07:32:50 +00001853 return preparedMsg.hdr, preparedMsg.encodedData, preparedMsg.payload, preparedMsg.pf, nil
Abhilash S.L3b494632019-07-16 15:51:09 +05301854 }
1855 // The input interface is not a prepared msg.
1856 // Marshal and Compress the data at this point
1857 data, err = encode(codec, m)
1858 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001859 return nil, nil, nil, 0, err
Abhilash S.L3b494632019-07-16 15:51:09 +05301860 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001861 compData, pf, err := compress(data, cp, comp, pool)
Abhilash S.L3b494632019-07-16 15:51:09 +05301862 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001863 data.Free()
1864 return nil, nil, nil, 0, err
Abhilash S.L3b494632019-07-16 15:51:09 +05301865 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001866 hdr, payload = msgHeader(data, compData, pf)
1867 return hdr, data, payload, pf, nil
Abhilash S.L3b494632019-07-16 15:51:09 +05301868}