blob: 0a0af8961f05673211b135a1e64c9db23b3af36b [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "io"
25 "math"
Abhay Kumara61c5222025-11-10 07:32:50 +000026 rand "math/rand/v2"
William Kurkianea869482019-04-09 15:16:11 -040027 "strconv"
28 "sync"
29 "time"
30
William Kurkianea869482019-04-09 15:16:11 -040031 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
William Kurkianea869482019-04-09 15:16:11 -040033 "google.golang.org/grpc/encoding"
Abhay Kumara61c5222025-11-10 07:32:50 +000034 "google.golang.org/grpc/internal"
Abhilash S.L3b494632019-07-16 15:51:09 +053035 "google.golang.org/grpc/internal/balancerload"
William Kurkianea869482019-04-09 15:16:11 -040036 "google.golang.org/grpc/internal/binarylog"
37 "google.golang.org/grpc/internal/channelz"
Abhay Kumara61c5222025-11-10 07:32:50 +000038 "google.golang.org/grpc/internal/grpcutil"
39 imetadata "google.golang.org/grpc/internal/metadata"
40 iresolver "google.golang.org/grpc/internal/resolver"
41 "google.golang.org/grpc/internal/serviceconfig"
42 istatus "google.golang.org/grpc/internal/status"
William Kurkianea869482019-04-09 15:16:11 -040043 "google.golang.org/grpc/internal/transport"
Abhay Kumara61c5222025-11-10 07:32:50 +000044 "google.golang.org/grpc/mem"
William Kurkianea869482019-04-09 15:16:11 -040045 "google.golang.org/grpc/metadata"
46 "google.golang.org/grpc/peer"
47 "google.golang.org/grpc/stats"
48 "google.golang.org/grpc/status"
49)
50
Abhay Kumara61c5222025-11-10 07:32:50 +000051var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
52
William Kurkianea869482019-04-09 15:16:11 -040053// StreamHandler defines the handler called by gRPC server to complete the
Abhay Kumara61c5222025-11-10 07:32:50 +000054// execution of a streaming RPC.
55//
56// If a StreamHandler returns an error, it should either be produced by the
57// status package, or be one of the context errors. Otherwise, gRPC will use
58// codes.Unknown as the status code and err.Error() as the status message of the
59// RPC.
60type StreamHandler func(srv any, stream ServerStream) error
William Kurkianea869482019-04-09 15:16:11 -040061
Abhay Kumara61c5222025-11-10 07:32:50 +000062// StreamDesc represents a streaming RPC service's method specification. Used
63// on the server when registering services and on the client when initiating
64// new streams.
William Kurkianea869482019-04-09 15:16:11 -040065type StreamDesc struct {
Abhay Kumara61c5222025-11-10 07:32:50 +000066 // StreamName and Handler are only used when registering handlers on a
67 // server.
68 StreamName string // the name of the method excluding the service
69 Handler StreamHandler // the handler called for the method
William Kurkianea869482019-04-09 15:16:11 -040070
Abhay Kumara61c5222025-11-10 07:32:50 +000071 // ServerStreams and ClientStreams are used for registering handlers on a
72 // server as well as defining RPC behavior when passed to NewClientStream
73 // and ClientConn.NewStream. At least one must be true.
74 ServerStreams bool // indicates the server can perform streaming sends
75 ClientStreams bool // indicates the client can perform streaming sends
William Kurkianea869482019-04-09 15:16:11 -040076}
77
78// Stream defines the common interface a client or server stream has to satisfy.
79//
80// Deprecated: See ClientStream and ServerStream documentation instead.
81type Stream interface {
82 // Deprecated: See ClientStream and ServerStream documentation instead.
83 Context() context.Context
84 // Deprecated: See ClientStream and ServerStream documentation instead.
Abhay Kumara61c5222025-11-10 07:32:50 +000085 SendMsg(m any) error
William Kurkianea869482019-04-09 15:16:11 -040086 // Deprecated: See ClientStream and ServerStream documentation instead.
Abhay Kumara61c5222025-11-10 07:32:50 +000087 RecvMsg(m any) error
William Kurkianea869482019-04-09 15:16:11 -040088}
89
90// ClientStream defines the client-side behavior of a streaming RPC.
91//
92// All errors returned from ClientStream methods are compatible with the
93// status package.
94type ClientStream interface {
95 // Header returns the header metadata received from the server if there
Abhay Kumara61c5222025-11-10 07:32:50 +000096 // is any. It blocks if the metadata is not ready to read. If the metadata
97 // is nil and the error is also nil, then the stream was terminated without
98 // headers, and the status can be discovered by calling RecvMsg.
William Kurkianea869482019-04-09 15:16:11 -040099 Header() (metadata.MD, error)
100 // Trailer returns the trailer metadata from the server, if there is any.
101 // It must only be called after stream.CloseAndRecv has returned, or
102 // stream.Recv has returned a non-nil error (including io.EOF).
103 Trailer() metadata.MD
Abhay Kumara61c5222025-11-10 07:32:50 +0000104 // CloseSend closes the send direction of the stream. This method always
105 // returns a nil error. The status of the stream may be discovered using
106 // RecvMsg. It is also not safe to call CloseSend concurrently with SendMsg.
William Kurkianea869482019-04-09 15:16:11 -0400107 CloseSend() error
108 // Context returns the context for this stream.
109 //
110 // It should not be called until after Header or RecvMsg has returned. Once
111 // called, subsequent client-side retries are disabled.
112 Context() context.Context
113 // SendMsg is generally called by generated code. On error, SendMsg aborts
114 // the stream. If the error was generated by the client, the status is
115 // returned directly; otherwise, io.EOF is returned and the status of
Abhay Kumara61c5222025-11-10 07:32:50 +0000116 // the stream may be discovered using RecvMsg. For unary or server-streaming
117 // RPCs (StreamDesc.ClientStreams is false), a nil error is returned
118 // unconditionally.
William Kurkianea869482019-04-09 15:16:11 -0400119 //
120 // SendMsg blocks until:
121 // - There is sufficient flow control to schedule m with the transport, or
122 // - The stream is done, or
123 // - The stream breaks.
124 //
125 // SendMsg does not wait until the message is received by the server. An
126 // untimely stream closure may result in lost messages. To ensure delivery,
127 // users should ensure the RPC completed successfully using RecvMsg.
128 //
129 // It is safe to have a goroutine calling SendMsg and another goroutine
130 // calling RecvMsg on the same stream at the same time, but it is not safe
131 // to call SendMsg on the same stream in different goroutines. It is also
132 // not safe to call CloseSend concurrently with SendMsg.
Abhay Kumara61c5222025-11-10 07:32:50 +0000133 //
134 // It is not safe to modify the message after calling SendMsg. Tracing
135 // libraries and stats handlers may use the message lazily.
136 SendMsg(m any) error
William Kurkianea869482019-04-09 15:16:11 -0400137 // RecvMsg blocks until it receives a message into m or the stream is
138 // done. It returns io.EOF when the stream completes successfully. On
139 // any other error, the stream is aborted and the error contains the RPC
140 // status.
141 //
142 // It is safe to have a goroutine calling SendMsg and another goroutine
143 // calling RecvMsg on the same stream at the same time, but it is not
144 // safe to call RecvMsg on the same stream in different goroutines.
Abhay Kumara61c5222025-11-10 07:32:50 +0000145 RecvMsg(m any) error
William Kurkianea869482019-04-09 15:16:11 -0400146}
147
148// NewStream creates a new Stream for the client side. This is typically
149// called by generated code. ctx is used for the lifetime of the stream.
150//
151// To ensure resources are not leaked due to the stream returned, one of the following
152// actions must be performed:
153//
Abhay Kumara61c5222025-11-10 07:32:50 +0000154// 1. Call Close on the ClientConn.
155// 2. Cancel the context provided.
156// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
157// client-streaming RPC, for instance, might use the helper function
158// CloseAndRecv (note that CloseSend does not Recv, therefore is not
159// guaranteed to release all resources).
160// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
William Kurkianea869482019-04-09 15:16:11 -0400161//
162// If none of the above happen, a goroutine and a context will be leaked, and grpc
163// will not call the optionally-configured stats handler with a stats.End message.
164func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
165 // allow interceptor to see all applicable call options, which means those
166 // configured as defaults from dial option as well as per-call options
167 opts = combine(cc.dopts.callOptions, opts)
168
169 if cc.dopts.streamInt != nil {
170 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
171 }
172 return newClientStream(ctx, desc, cc, method, opts...)
173}
174
175// NewClientStream is a wrapper for ClientConn.NewStream.
176func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
177 return cc.NewStream(ctx, desc, method, opts...)
178}
179
180func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
Abhay Kumara61c5222025-11-10 07:32:50 +0000181 // Start tracking the RPC for idleness purposes. This is where a stream is
182 // created for both streaming and unary RPCs, and hence is a good place to
183 // track active RPC count.
184 if err := cc.idlenessMgr.OnCallBegin(); err != nil {
185 return nil, err
186 }
187 // Add a calloption, to decrement the active call count, that gets executed
188 // when the RPC completes.
189 opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
190
191 if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
192 // validate md
193 if err := imetadata.Validate(md); err != nil {
194 return nil, status.Error(codes.Internal, err.Error())
195 }
196 // validate added
197 for _, kvs := range added {
198 for i := 0; i < len(kvs); i += 2 {
199 if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
200 return nil, status.Error(codes.Internal, err.Error())
201 }
202 }
203 }
204 }
William Kurkianea869482019-04-09 15:16:11 -0400205 if channelz.IsOn() {
206 cc.incrCallsStarted()
207 defer func() {
208 if err != nil {
209 cc.incrCallsFailed()
210 }
211 }()
212 }
William Kurkianea869482019-04-09 15:16:11 -0400213 // Provide an opportunity for the first RPC to see the first service config
214 // provided by the resolver.
Abhay Kumara61c5222025-11-10 07:32:50 +0000215 nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx)
216 if err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400217 return nil, err
218 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000219
220 var mc serviceconfig.MethodConfig
221 var onCommit func()
222 newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
223 return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, nameResolutionDelayed, opts...)
224 }
225
226 rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
227 rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
228 if err != nil {
229 if st, ok := status.FromError(err); ok {
230 // Restrict the code to the list allowed by gRFC A54.
231 if istatus.IsRestrictedControlPlaneCode(st) {
232 err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
233 }
234 return nil, err
235 }
236 return nil, toRPCErr(err)
237 }
238
239 if rpcConfig != nil {
240 if rpcConfig.Context != nil {
241 ctx = rpcConfig.Context
242 }
243 mc = rpcConfig.MethodConfig
244 onCommit = rpcConfig.OnCommitted
245 if rpcConfig.Interceptor != nil {
246 rpcInfo.Context = nil
247 ns := newStream
248 newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
249 cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
250 if err != nil {
251 return nil, toRPCErr(err)
252 }
253 return cs, nil
254 }
255 }
256 }
257
258 return newStream(ctx, func() {})
259}
260
261func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), nameResolutionDelayed bool, opts ...CallOption) (_ iresolver.ClientStream, err error) {
262 callInfo := defaultCallInfo()
William Kurkianea869482019-04-09 15:16:11 -0400263 if mc.WaitForReady != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +0000264 callInfo.failFast = !*mc.WaitForReady
William Kurkianea869482019-04-09 15:16:11 -0400265 }
266
267 // Possible context leak:
268 // The cancel function for the child context we create will only be called
269 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
270 // an error is generated by SendMsg.
271 // https://github.com/grpc/grpc-go/issues/1818.
272 var cancel context.CancelFunc
273 if mc.Timeout != nil && *mc.Timeout >= 0 {
274 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
275 } else {
276 ctx, cancel = context.WithCancel(ctx)
277 }
278 defer func() {
279 if err != nil {
280 cancel()
281 }
282 }()
283
284 for _, o := range opts {
Abhay Kumara61c5222025-11-10 07:32:50 +0000285 if err := o.before(callInfo); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400286 return nil, toRPCErr(err)
287 }
288 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000289 callInfo.maxSendMessageSize = getMaxSize(mc.MaxReqSize, callInfo.maxSendMessageSize, defaultClientMaxSendMessageSize)
290 callInfo.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, callInfo.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
291 if err := setCallInfoCodec(callInfo); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400292 return nil, err
293 }
294
295 callHdr := &transport.CallHdr{
296 Host: cc.authority,
297 Method: method,
Abhay Kumara61c5222025-11-10 07:32:50 +0000298 ContentSubtype: callInfo.contentSubtype,
299 DoneFunc: doneFunc,
300 Authority: callInfo.authority,
William Kurkianea869482019-04-09 15:16:11 -0400301 }
302
303 // Set our outgoing compression according to the UseCompressor CallOption, if
304 // set. In that case, also find the compressor from the encoding package.
305 // Otherwise, use the compressor configured by the WithCompressor DialOption,
306 // if set.
Abhay Kumara61c5222025-11-10 07:32:50 +0000307 var compressorV0 Compressor
308 var compressorV1 encoding.Compressor
309 if ct := callInfo.compressorName; ct != "" {
William Kurkianea869482019-04-09 15:16:11 -0400310 callHdr.SendCompress = ct
311 if ct != encoding.Identity {
Abhay Kumara61c5222025-11-10 07:32:50 +0000312 compressorV1 = encoding.GetCompressor(ct)
313 if compressorV1 == nil {
William Kurkianea869482019-04-09 15:16:11 -0400314 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
315 }
316 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000317 } else if cc.dopts.compressorV0 != nil {
318 callHdr.SendCompress = cc.dopts.compressorV0.Type()
319 compressorV0 = cc.dopts.compressorV0
William Kurkianea869482019-04-09 15:16:11 -0400320 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000321 if callInfo.creds != nil {
322 callHdr.Creds = callInfo.creds
William Kurkianea869482019-04-09 15:16:11 -0400323 }
324
325 cs := &clientStream{
Abhay Kumara61c5222025-11-10 07:32:50 +0000326 callHdr: callHdr,
327 ctx: ctx,
328 methodConfig: &mc,
329 opts: opts,
330 callInfo: callInfo,
331 cc: cc,
332 desc: desc,
333 codec: callInfo.codec,
334 compressorV0: compressorV0,
335 compressorV1: compressorV1,
336 cancel: cancel,
337 firstAttempt: true,
338 onCommit: onCommit,
339 nameResolutionDelay: nameResolutionDelayed,
William Kurkianea869482019-04-09 15:16:11 -0400340 }
341 if !cc.dopts.disableRetry {
342 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
343 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000344 if ml := binarylog.GetMethodLogger(method); ml != nil {
345 cs.binlogs = append(cs.binlogs, ml)
346 }
347 if cc.dopts.binaryLogger != nil {
348 if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
349 cs.binlogs = append(cs.binlogs, ml)
350 }
351 }
William Kurkianea869482019-04-09 15:16:11 -0400352
Abhay Kumara61c5222025-11-10 07:32:50 +0000353 // Pick the transport to use and create a new stream on the transport.
354 // Assign cs.attempt upon success.
355 op := func(a *csAttempt) error {
356 if err := a.getTransport(); err != nil {
357 return err
358 }
359 if err := a.newStream(); err != nil {
360 return err
361 }
362 // Because this operation is always called either here (while creating
363 // the clientStream) or by the retry code while locked when replaying
364 // the operation, it is safe to access cs.attempt directly.
365 cs.attempt = a
366 return nil
367 }
368 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400369 return nil, err
370 }
371
Abhay Kumara61c5222025-11-10 07:32:50 +0000372 if len(cs.binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -0400373 md, _ := metadata.FromOutgoingContext(ctx)
374 logEntry := &binarylog.ClientHeader{
375 OnClientSide: true,
376 Header: md,
377 MethodName: method,
378 Authority: cs.cc.authority,
379 }
380 if deadline, ok := ctx.Deadline(); ok {
381 logEntry.Timeout = time.Until(deadline)
382 if logEntry.Timeout < 0 {
383 logEntry.Timeout = 0
384 }
385 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000386 for _, binlog := range cs.binlogs {
387 binlog.Log(cs.ctx, logEntry)
388 }
William Kurkianea869482019-04-09 15:16:11 -0400389 }
390
391 if desc != unaryStreamDesc {
392 // Listen on cc and stream contexts to cleanup when the user closes the
393 // ClientConn or cancels the stream context. In all other cases, an error
394 // should already be injected into the recv buffer by the transport, which
395 // the client will eventually receive, and then we will cancel the stream's
396 // context in clientStream.finish.
397 go func() {
398 select {
399 case <-cc.ctx.Done():
400 cs.finish(ErrClientConnClosing)
401 case <-ctx.Done():
402 cs.finish(toRPCErr(ctx.Err()))
403 }
404 }()
405 }
406 return cs, nil
407}
408
Abhay Kumara61c5222025-11-10 07:32:50 +0000409// newAttemptLocked creates a new csAttempt without a transport or stream.
410func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
William Kurkianea869482019-04-09 15:16:11 -0400411 if err := cs.ctx.Err(); err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +0000412 return nil, toRPCErr(err)
William Kurkianea869482019-04-09 15:16:11 -0400413 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000414 if err := cs.cc.ctx.Err(); err != nil {
415 return nil, ErrClientConnClosing
416 }
417
418 ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.compressorV0, cs.compressorV1)
419 method := cs.callHdr.Method
420 var beginTime time.Time
421 shs := cs.cc.dopts.copts.StatsHandlers
422 for _, sh := range shs {
423 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast, NameResolutionDelay: cs.nameResolutionDelay})
424 beginTime = time.Now()
425 begin := &stats.Begin{
426 Client: true,
427 BeginTime: beginTime,
428 FailFast: cs.callInfo.failFast,
429 IsClientStream: cs.desc.ClientStreams,
430 IsServerStream: cs.desc.ServerStreams,
431 IsTransparentRetryAttempt: isTransparent,
432 }
433 sh.HandleRPC(ctx, begin)
434 }
435
436 var trInfo *traceInfo
437 if EnableTracing {
438 trInfo = &traceInfo{
439 tr: newTrace("grpc.Sent."+methodFamily(method), method),
440 firstLine: firstLine{
441 client: true,
442 },
443 }
444 if deadline, ok := ctx.Deadline(); ok {
445 trInfo.firstLine.deadline = time.Until(deadline)
446 }
447 trInfo.tr.LazyLog(&trInfo.firstLine, false)
448 ctx = newTraceContext(ctx, trInfo.tr)
449 }
450
451 if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {
452 // Add extra metadata (metadata that will be added by transport) to context
453 // so the balancer can see them.
454 ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
455 "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
456 ))
457 }
458
459 return &csAttempt{
460 ctx: ctx,
461 beginTime: beginTime,
462 cs: cs,
463 decompressorV0: cs.cc.dopts.dc,
464 statsHandlers: shs,
465 trInfo: trInfo,
466 }, nil
467}
468
469func (a *csAttempt) getTransport() error {
470 cs := a.cs
471
472 pickInfo := balancer.PickInfo{Ctx: a.ctx, FullMethodName: cs.callHdr.Method}
473 pick, err := cs.cc.pickerWrapper.pick(a.ctx, cs.callInfo.failFast, pickInfo)
474 a.transport, a.pickResult = pick.transport, pick.result
William Kurkianea869482019-04-09 15:16:11 -0400475 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +0000476 if de, ok := err.(dropError); ok {
477 err = de.error
478 a.drop = true
479 }
William Kurkianea869482019-04-09 15:16:11 -0400480 return err
481 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000482 if a.trInfo != nil {
483 a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
Abhilash S.L3b494632019-07-16 15:51:09 +0530484 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000485 if pick.blocked {
486 for _, sh := range a.statsHandlers {
487 sh.HandleRPC(a.ctx, &stats.DelayedPickComplete{})
488 }
489 }
William Kurkianea869482019-04-09 15:16:11 -0400490 return nil
491}
492
493func (a *csAttempt) newStream() error {
494 cs := a.cs
495 cs.callHdr.PreviousAttempts = cs.numRetries
Abhay Kumara61c5222025-11-10 07:32:50 +0000496
497 // Merge metadata stored in PickResult, if any, with existing call metadata.
498 // It is safe to overwrite the csAttempt's context here, since all state
499 // maintained in it are local to the attempt. When the attempt has to be
500 // retried, a new instance of csAttempt will be created.
501 if a.pickResult.Metadata != nil {
502 // We currently do not have a function it the metadata package which
503 // merges given metadata with existing metadata in a context. Existing
504 // function `AppendToOutgoingContext()` takes a variadic argument of key
505 // value pairs.
506 //
507 // TODO: Make it possible to retrieve key value pairs from metadata.MD
508 // in a form passable to AppendToOutgoingContext(), or create a version
509 // of AppendToOutgoingContext() that accepts a metadata.MD.
510 md, _ := metadata.FromOutgoingContext(a.ctx)
511 md = metadata.Join(md, a.pickResult.Metadata)
512 a.ctx = metadata.NewOutgoingContext(a.ctx, md)
William Kurkianea869482019-04-09 15:16:11 -0400513 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000514
515 s, err := a.transport.NewStream(a.ctx, cs.callHdr)
516 if err != nil {
517 nse, ok := err.(*transport.NewStreamError)
518 if !ok {
519 // Unexpected.
520 return err
521 }
522
523 if nse.AllowTransparentRetry {
524 a.allowTransparentRetry = true
525 }
526
527 // Unwrap and convert error.
528 return toRPCErr(nse.Err)
529 }
530 a.transportStream = s
531 a.ctx = s.Context()
532 a.parser = &parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
William Kurkianea869482019-04-09 15:16:11 -0400533 return nil
534}
535
536// clientStream implements a client side Stream.
537type clientStream struct {
538 callHdr *transport.CallHdr
539 opts []CallOption
540 callInfo *callInfo
541 cc *ClientConn
542 desc *StreamDesc
543
Abhay Kumara61c5222025-11-10 07:32:50 +0000544 codec baseCodec
545 compressorV0 Compressor
546 compressorV1 encoding.Compressor
William Kurkianea869482019-04-09 15:16:11 -0400547
548 cancel context.CancelFunc // cancels all attempts
549
Abhay Kumara61c5222025-11-10 07:32:50 +0000550 sentLast bool // sent an end stream
551
552 receivedFirstMsg bool // set after the first message is received
William Kurkianea869482019-04-09 15:16:11 -0400553
554 methodConfig *MethodConfig
555
556 ctx context.Context // the application's context, wrapped by stats/tracing
557
558 retryThrottler *retryThrottler // The throttler active when the RPC began.
559
Abhay Kumara61c5222025-11-10 07:32:50 +0000560 binlogs []binarylog.MethodLogger
William Kurkianea869482019-04-09 15:16:11 -0400561 // serverHeaderBinlogged is a boolean for whether server header has been
562 // logged. Server header will be logged when the first time one of those
563 // happens: stream.Header(), stream.Recv().
564 //
565 // It's only read and used by Recv() and Header(), so it doesn't need to be
566 // synchronized.
567 serverHeaderBinlogged bool
568
569 mu sync.Mutex
David Bainbridge788e5202019-10-21 18:49:40 +0000570 firstAttempt bool // if true, transparent retry is valid
571 numRetries int // exclusive of transparent retry attempt(s)
572 numRetriesSincePushback int // retries since pushback; to reset backoff
573 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
574 // attempt is the active client stream attempt.
575 // The only place where it is written is the newAttemptLocked method and this method never writes nil.
576 // So, attempt can be nil only inside newClientStream function when clientStream is first created.
577 // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
578 // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
579 // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
580 // place where we need to check if the attempt is nil.
581 attempt *csAttempt
William Kurkianea869482019-04-09 15:16:11 -0400582 // TODO(hedging): hedging will have multiple attempts simultaneously.
Abhay Kumara61c5222025-11-10 07:32:50 +0000583 committed bool // active attempt committed for retry?
584 onCommit func()
585 replayBuffer []replayOp // operations to replay on retry
586 replayBufferSize int // current size of replayBuffer
587 // nameResolutionDelay indicates if there was a delay in the name resolution.
588 // This field is only valid on client side, it's always false on server side.
589 nameResolutionDelay bool
590}
591
592type replayOp struct {
593 op func(a *csAttempt) error
594 cleanup func()
William Kurkianea869482019-04-09 15:16:11 -0400595}
596
597// csAttempt implements a single transport stream attempt within a
598// clientStream.
599type csAttempt struct {
Abhay Kumara61c5222025-11-10 07:32:50 +0000600 ctx context.Context
601 cs *clientStream
602 transport transport.ClientTransport
603 transportStream *transport.ClientStream
604 parser *parser
605 pickResult balancer.PickResult
William Kurkianea869482019-04-09 15:16:11 -0400606
Abhay Kumara61c5222025-11-10 07:32:50 +0000607 finished bool
608 decompressorV0 Decompressor
609 decompressorV1 encoding.Compressor
610 decompressorSet bool
William Kurkianea869482019-04-09 15:16:11 -0400611
612 mu sync.Mutex // guards trInfo.tr
Abhilash S.L3b494632019-07-16 15:51:09 +0530613 // trInfo may be nil (if EnableTracing is false).
William Kurkianea869482019-04-09 15:16:11 -0400614 // trInfo.tr is set when created (if EnableTracing is true),
615 // and cleared when the finish method is called.
Abhilash S.L3b494632019-07-16 15:51:09 +0530616 trInfo *traceInfo
William Kurkianea869482019-04-09 15:16:11 -0400617
Abhay Kumara61c5222025-11-10 07:32:50 +0000618 statsHandlers []stats.Handler
619 beginTime time.Time
620
621 // set for newStream errors that may be transparently retried
622 allowTransparentRetry bool
623 // set for pick errors that are returned as a status
624 drop bool
William Kurkianea869482019-04-09 15:16:11 -0400625}
626
627func (cs *clientStream) commitAttemptLocked() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000628 if !cs.committed && cs.onCommit != nil {
629 cs.onCommit()
630 }
William Kurkianea869482019-04-09 15:16:11 -0400631 cs.committed = true
Abhay Kumara61c5222025-11-10 07:32:50 +0000632 for _, op := range cs.replayBuffer {
633 if op.cleanup != nil {
634 op.cleanup()
635 }
636 }
637 cs.replayBuffer = nil
William Kurkianea869482019-04-09 15:16:11 -0400638}
639
640func (cs *clientStream) commitAttempt() {
641 cs.mu.Lock()
642 cs.commitAttemptLocked()
643 cs.mu.Unlock()
644}
645
646// shouldRetry returns nil if the RPC should be retried; otherwise it returns
Abhay Kumara61c5222025-11-10 07:32:50 +0000647// the error that should be returned by the operation. If the RPC should be
648// retried, the bool indicates whether it is being retried transparently.
649func (a *csAttempt) shouldRetry(err error) (bool, error) {
650 cs := a.cs
651
652 if cs.finished || cs.committed || a.drop {
653 // RPC is finished or committed or was dropped by the picker; cannot retry.
654 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400655 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000656 if a.transportStream == nil && a.allowTransparentRetry {
657 return true, nil
William Kurkianea869482019-04-09 15:16:11 -0400658 }
659 // Wait for the trailers.
Abhay Kumara61c5222025-11-10 07:32:50 +0000660 unprocessed := false
661 if a.transportStream != nil {
662 <-a.transportStream.Done()
663 unprocessed = a.transportStream.Unprocessed()
William Kurkianea869482019-04-09 15:16:11 -0400664 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000665 if cs.firstAttempt && unprocessed {
David Bainbridge788e5202019-10-21 18:49:40 +0000666 // First attempt, stream unprocessed: transparently retry.
Abhay Kumara61c5222025-11-10 07:32:50 +0000667 return true, nil
William Kurkianea869482019-04-09 15:16:11 -0400668 }
William Kurkianea869482019-04-09 15:16:11 -0400669 if cs.cc.dopts.disableRetry {
Abhay Kumara61c5222025-11-10 07:32:50 +0000670 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400671 }
672
673 pushback := 0
674 hasPushback := false
Abhay Kumara61c5222025-11-10 07:32:50 +0000675 if a.transportStream != nil {
676 if !a.transportStream.TrailersOnly() {
677 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400678 }
679
680 // TODO(retry): Move down if the spec changes to not check server pushback
681 // before considering this a failure for throttling.
Abhay Kumara61c5222025-11-10 07:32:50 +0000682 sps := a.transportStream.Trailer()["grpc-retry-pushback-ms"]
William Kurkianea869482019-04-09 15:16:11 -0400683 if len(sps) == 1 {
684 var e error
685 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
Abhay Kumara61c5222025-11-10 07:32:50 +0000686 channelz.Infof(logger, cs.cc.channelz, "Server retry pushback specified to abort (%q).", sps[0])
William Kurkianea869482019-04-09 15:16:11 -0400687 cs.retryThrottler.throttle() // This counts as a failure for throttling.
Abhay Kumara61c5222025-11-10 07:32:50 +0000688 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400689 }
690 hasPushback = true
691 } else if len(sps) > 1 {
Abhay Kumara61c5222025-11-10 07:32:50 +0000692 channelz.Warningf(logger, cs.cc.channelz, "Server retry pushback specified multiple values (%q); not retrying.", sps)
William Kurkianea869482019-04-09 15:16:11 -0400693 cs.retryThrottler.throttle() // This counts as a failure for throttling.
Abhay Kumara61c5222025-11-10 07:32:50 +0000694 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400695 }
696 }
697
698 var code codes.Code
Abhay Kumara61c5222025-11-10 07:32:50 +0000699 if a.transportStream != nil {
700 code = a.transportStream.Status().Code()
William Kurkianea869482019-04-09 15:16:11 -0400701 } else {
Abhay Kumara61c5222025-11-10 07:32:50 +0000702 code = status.Code(err)
William Kurkianea869482019-04-09 15:16:11 -0400703 }
704
Abhay Kumara61c5222025-11-10 07:32:50 +0000705 rp := cs.methodConfig.RetryPolicy
706 if rp == nil || !rp.RetryableStatusCodes[code] {
707 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400708 }
709
710 // Note: the ordering here is important; we count this as a failure
711 // only if the code matched a retryable code.
712 if cs.retryThrottler.throttle() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000713 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400714 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000715 if cs.numRetries+1 >= rp.MaxAttempts {
716 return false, err
William Kurkianea869482019-04-09 15:16:11 -0400717 }
718
719 var dur time.Duration
720 if hasPushback {
721 dur = time.Millisecond * time.Duration(pushback)
722 cs.numRetriesSincePushback = 0
723 } else {
Abhay Kumara61c5222025-11-10 07:32:50 +0000724 fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
725 cur := min(float64(rp.InitialBackoff)*fact, float64(rp.MaxBackoff))
726 // Apply jitter by multiplying with a random factor between 0.8 and 1.2
727 cur *= 0.8 + 0.4*rand.Float64()
728 dur = time.Duration(int64(cur))
William Kurkianea869482019-04-09 15:16:11 -0400729 cs.numRetriesSincePushback++
730 }
731
732 // TODO(dfawley): we could eagerly fail here if dur puts us past the
733 // deadline, but unsure if it is worth doing.
734 t := time.NewTimer(dur)
735 select {
736 case <-t.C:
737 cs.numRetries++
Abhay Kumara61c5222025-11-10 07:32:50 +0000738 return false, nil
William Kurkianea869482019-04-09 15:16:11 -0400739 case <-cs.ctx.Done():
740 t.Stop()
Abhay Kumara61c5222025-11-10 07:32:50 +0000741 return false, status.FromContextError(cs.ctx.Err()).Err()
William Kurkianea869482019-04-09 15:16:11 -0400742 }
743}
744
745// Returns nil if a retry was performed and succeeded; error otherwise.
Abhay Kumara61c5222025-11-10 07:32:50 +0000746func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
William Kurkianea869482019-04-09 15:16:11 -0400747 for {
Abhay Kumara61c5222025-11-10 07:32:50 +0000748 attempt.finish(toRPCErr(lastErr))
749 isTransparent, err := attempt.shouldRetry(lastErr)
750 if err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400751 cs.commitAttemptLocked()
752 return err
753 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000754 cs.firstAttempt = false
755 attempt, err = cs.newAttemptLocked(isTransparent)
756 if err != nil {
757 // Only returns error if the clientconn is closed or the context of
758 // the stream is canceled.
William Kurkianea869482019-04-09 15:16:11 -0400759 return err
760 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000761 // Note that the first op in replayBuffer always sets cs.attempt
762 // if it is able to pick a transport and create a stream.
763 if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
William Kurkianea869482019-04-09 15:16:11 -0400764 return nil
765 }
766 }
767}
768
769func (cs *clientStream) Context() context.Context {
770 cs.commitAttempt()
771 // No need to lock before using attempt, since we know it is committed and
772 // cannot change.
Abhay Kumara61c5222025-11-10 07:32:50 +0000773 if cs.attempt.transportStream != nil {
774 return cs.attempt.transportStream.Context()
775 }
776 return cs.ctx
William Kurkianea869482019-04-09 15:16:11 -0400777}
778
779func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
780 cs.mu.Lock()
781 for {
782 if cs.committed {
783 cs.mu.Unlock()
Abhay Kumara61c5222025-11-10 07:32:50 +0000784 // toRPCErr is used in case the error from the attempt comes from
785 // NewClientStream, which intentionally doesn't return a status
786 // error to allow for further inspection; all other errors should
787 // already be status errors.
788 return toRPCErr(op(cs.attempt))
789 }
790 if len(cs.replayBuffer) == 0 {
791 // For the first op, which controls creation of the stream and
792 // assigns cs.attempt, we need to create a new attempt inline
793 // before executing the first op. On subsequent ops, the attempt
794 // is created immediately before replaying the ops.
795 var err error
796 if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
797 cs.mu.Unlock()
798 cs.finish(err)
799 return err
800 }
William Kurkianea869482019-04-09 15:16:11 -0400801 }
802 a := cs.attempt
803 cs.mu.Unlock()
804 err := op(a)
805 cs.mu.Lock()
806 if a != cs.attempt {
807 // We started another attempt already.
808 continue
809 }
810 if err == io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +0000811 <-a.transportStream.Done()
William Kurkianea869482019-04-09 15:16:11 -0400812 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000813 if err == nil || (err == io.EOF && a.transportStream.Status().Code() == codes.OK) {
William Kurkianea869482019-04-09 15:16:11 -0400814 onSuccess()
815 cs.mu.Unlock()
816 return err
817 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000818 if err := cs.retryLocked(a, err); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400819 cs.mu.Unlock()
820 return err
821 }
822 }
823}
824
825func (cs *clientStream) Header() (metadata.MD, error) {
826 var m metadata.MD
827 err := cs.withRetry(func(a *csAttempt) error {
828 var err error
Abhay Kumara61c5222025-11-10 07:32:50 +0000829 m, err = a.transportStream.Header()
William Kurkianea869482019-04-09 15:16:11 -0400830 return toRPCErr(err)
831 }, cs.commitAttemptLocked)
Abhay Kumara61c5222025-11-10 07:32:50 +0000832
833 if m == nil && err == nil {
834 // The stream ended with success. Finish the clientStream.
835 err = io.EOF
836 }
837
William Kurkianea869482019-04-09 15:16:11 -0400838 if err != nil {
839 cs.finish(err)
Abhay Kumara61c5222025-11-10 07:32:50 +0000840 // Do not return the error. The user should get it by calling Recv().
841 return nil, nil
William Kurkianea869482019-04-09 15:16:11 -0400842 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000843
844 if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && m != nil {
845 // Only log if binary log is on and header has not been logged, and
846 // there is actually headers to log.
William Kurkianea869482019-04-09 15:16:11 -0400847 logEntry := &binarylog.ServerHeader{
848 OnClientSide: true,
849 Header: m,
850 PeerAddr: nil,
851 }
852 if peer, ok := peer.FromContext(cs.Context()); ok {
853 logEntry.PeerAddr = peer.Addr
854 }
William Kurkianea869482019-04-09 15:16:11 -0400855 cs.serverHeaderBinlogged = true
Abhay Kumara61c5222025-11-10 07:32:50 +0000856 for _, binlog := range cs.binlogs {
857 binlog.Log(cs.ctx, logEntry)
858 }
William Kurkianea869482019-04-09 15:16:11 -0400859 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000860
861 return m, nil
William Kurkianea869482019-04-09 15:16:11 -0400862}
863
864func (cs *clientStream) Trailer() metadata.MD {
865 // On RPC failure, we never need to retry, because usage requires that
866 // RecvMsg() returned a non-nil error before calling this function is valid.
867 // We would have retried earlier if necessary.
868 //
869 // Commit the attempt anyway, just in case users are not following those
870 // directions -- it will prevent races and should not meaningfully impact
871 // performance.
872 cs.commitAttempt()
Abhay Kumara61c5222025-11-10 07:32:50 +0000873 if cs.attempt.transportStream == nil {
William Kurkianea869482019-04-09 15:16:11 -0400874 return nil
875 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000876 return cs.attempt.transportStream.Trailer()
William Kurkianea869482019-04-09 15:16:11 -0400877}
878
Abhay Kumara61c5222025-11-10 07:32:50 +0000879func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
880 for _, f := range cs.replayBuffer {
881 if err := f.op(attempt); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400882 return err
883 }
884 }
885 return nil
886}
887
Abhay Kumara61c5222025-11-10 07:32:50 +0000888func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error, cleanup func()) {
William Kurkianea869482019-04-09 15:16:11 -0400889 // Note: we still will buffer if retry is disabled (for transparent retries).
890 if cs.committed {
891 return
892 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000893 cs.replayBufferSize += sz
894 if cs.replayBufferSize > cs.callInfo.maxRetryRPCBufferSize {
William Kurkianea869482019-04-09 15:16:11 -0400895 cs.commitAttemptLocked()
Abhay Kumara61c5222025-11-10 07:32:50 +0000896 cleanup()
William Kurkianea869482019-04-09 15:16:11 -0400897 return
898 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000899 cs.replayBuffer = append(cs.replayBuffer, replayOp{op: op, cleanup: cleanup})
William Kurkianea869482019-04-09 15:16:11 -0400900}
901
Abhay Kumara61c5222025-11-10 07:32:50 +0000902func (cs *clientStream) SendMsg(m any) (err error) {
William Kurkianea869482019-04-09 15:16:11 -0400903 defer func() {
904 if err != nil && err != io.EOF {
905 // Call finish on the client stream for errors generated by this SendMsg
906 // call, as these indicate problems created by this client. (Transport
907 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
908 // error will be returned from RecvMsg eventually in that case, or be
909 // retried.)
910 cs.finish(err)
911 }
912 }()
913 if cs.sentLast {
914 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
915 }
916 if !cs.desc.ClientStreams {
917 cs.sentLast = true
918 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530919
920 // load hdr, payload, data
Abhay Kumara61c5222025-11-10 07:32:50 +0000921 hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.compressorV0, cs.compressorV1, cs.cc.dopts.copts.BufferPool)
William Kurkianea869482019-04-09 15:16:11 -0400922 if err != nil {
923 return err
924 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530925
Abhay Kumara61c5222025-11-10 07:32:50 +0000926 defer func() {
927 data.Free()
928 // only free payload if compression was made, and therefore it is a different set
929 // of buffers from data.
930 if pf.isCompressed() {
931 payload.Free()
932 }
933 }()
934
935 dataLen := data.Len()
936 payloadLen := payload.Len()
William Kurkianea869482019-04-09 15:16:11 -0400937 // TODO(dfawley): should we be checking len(data) instead?
Abhay Kumara61c5222025-11-10 07:32:50 +0000938 if payloadLen > *cs.callInfo.maxSendMessageSize {
939 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, *cs.callInfo.maxSendMessageSize)
William Kurkianea869482019-04-09 15:16:11 -0400940 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000941
942 // always take an extra ref in case data == payload (i.e. when the data isn't
943 // compressed). The original ref will always be freed by the deferred free above.
944 payload.Ref()
William Kurkianea869482019-04-09 15:16:11 -0400945 op := func(a *csAttempt) error {
Abhay Kumara61c5222025-11-10 07:32:50 +0000946 return a.sendMsg(m, hdr, payload, dataLen, payloadLen)
William Kurkianea869482019-04-09 15:16:11 -0400947 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000948
949 // onSuccess is invoked when the op is captured for a subsequent retry. If the
950 // stream was established by a previous message and therefore retries are
951 // disabled, onSuccess will not be invoked, and payloadRef can be freed
952 // immediately.
953 onSuccessCalled := false
954 err = cs.withRetry(op, func() {
955 cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free)
956 onSuccessCalled = true
957 })
958 if !onSuccessCalled {
959 payload.Free()
960 }
961 if len(cs.binlogs) != 0 && err == nil {
962 cm := &binarylog.ClientMessage{
William Kurkianea869482019-04-09 15:16:11 -0400963 OnClientSide: true,
Abhay Kumara61c5222025-11-10 07:32:50 +0000964 Message: data.Materialize(),
965 }
966 for _, binlog := range cs.binlogs {
967 binlog.Log(cs.ctx, cm)
968 }
William Kurkianea869482019-04-09 15:16:11 -0400969 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000970 return err
William Kurkianea869482019-04-09 15:16:11 -0400971}
972
Abhay Kumara61c5222025-11-10 07:32:50 +0000973func (cs *clientStream) RecvMsg(m any) error {
974 if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
William Kurkianea869482019-04-09 15:16:11 -0400975 // Call Header() to binary log header if it's not already logged.
976 cs.Header()
977 }
978 var recvInfo *payloadInfo
Abhay Kumara61c5222025-11-10 07:32:50 +0000979 if len(cs.binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -0400980 recvInfo = &payloadInfo{}
Abhay Kumara61c5222025-11-10 07:32:50 +0000981 defer recvInfo.free()
William Kurkianea869482019-04-09 15:16:11 -0400982 }
983 err := cs.withRetry(func(a *csAttempt) error {
984 return a.recvMsg(m, recvInfo)
985 }, cs.commitAttemptLocked)
Abhay Kumara61c5222025-11-10 07:32:50 +0000986 if len(cs.binlogs) != 0 && err == nil {
987 sm := &binarylog.ServerMessage{
William Kurkianea869482019-04-09 15:16:11 -0400988 OnClientSide: true,
Abhay Kumara61c5222025-11-10 07:32:50 +0000989 Message: recvInfo.uncompressedBytes.Materialize(),
990 }
991 for _, binlog := range cs.binlogs {
992 binlog.Log(cs.ctx, sm)
993 }
William Kurkianea869482019-04-09 15:16:11 -0400994 }
995 if err != nil || !cs.desc.ServerStreams {
996 // err != nil or non-server-streaming indicates end of stream.
997 cs.finish(err)
William Kurkianea869482019-04-09 15:16:11 -0400998 }
999 return err
1000}
1001
1002func (cs *clientStream) CloseSend() error {
1003 if cs.sentLast {
Abhay Kumara61c5222025-11-10 07:32:50 +00001004 // Return a nil error on repeated calls to this method.
William Kurkianea869482019-04-09 15:16:11 -04001005 return nil
1006 }
1007 cs.sentLast = true
1008 op := func(a *csAttempt) error {
Abhay Kumara61c5222025-11-10 07:32:50 +00001009 a.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
William Kurkianea869482019-04-09 15:16:11 -04001010 // Always return nil; io.EOF is the only error that might make sense
1011 // instead, but there is no need to signal the client to call RecvMsg
1012 // as the only use left for the stream after CloseSend is to call
1013 // RecvMsg. This also matches historical behavior.
1014 return nil
1015 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001016 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) })
1017 if len(cs.binlogs) != 0 {
1018 chc := &binarylog.ClientHalfClose{
William Kurkianea869482019-04-09 15:16:11 -04001019 OnClientSide: true,
Abhay Kumara61c5222025-11-10 07:32:50 +00001020 }
1021 for _, binlog := range cs.binlogs {
1022 binlog.Log(cs.ctx, chc)
1023 }
William Kurkianea869482019-04-09 15:16:11 -04001024 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001025 // We don't return an error here as we expect users to read all messages
1026 // from the stream and get the RPC status from RecvMsg(). Note that
1027 // SendMsg() must return an error when one occurs so the application
1028 // knows to stop sending messages, but that does not apply here.
William Kurkianea869482019-04-09 15:16:11 -04001029 return nil
1030}
1031
1032func (cs *clientStream) finish(err error) {
1033 if err == io.EOF {
1034 // Ending a stream with EOF indicates a success.
1035 err = nil
1036 }
1037 cs.mu.Lock()
1038 if cs.finished {
1039 cs.mu.Unlock()
1040 return
1041 }
1042 cs.finished = true
Abhay Kumara61c5222025-11-10 07:32:50 +00001043 for _, onFinish := range cs.callInfo.onFinish {
1044 onFinish(err)
1045 }
William Kurkianea869482019-04-09 15:16:11 -04001046 cs.commitAttemptLocked()
Abhay Kumara61c5222025-11-10 07:32:50 +00001047 if cs.attempt != nil {
1048 cs.attempt.finish(err)
1049 // after functions all rely upon having a stream.
1050 if cs.attempt.transportStream != nil {
1051 for _, o := range cs.opts {
1052 o.after(cs.callInfo, cs.attempt)
1053 }
1054 }
1055 }
1056
William Kurkianea869482019-04-09 15:16:11 -04001057 cs.mu.Unlock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001058 // Only one of cancel or trailer needs to be logged.
1059 if len(cs.binlogs) != 0 {
1060 switch err {
1061 case errContextCanceled, errContextDeadline, ErrClientConnClosing:
1062 c := &binarylog.Cancel{
1063 OnClientSide: true,
1064 }
1065 for _, binlog := range cs.binlogs {
1066 binlog.Log(cs.ctx, c)
1067 }
1068 default:
1069 logEntry := &binarylog.ServerTrailer{
1070 OnClientSide: true,
1071 Trailer: cs.Trailer(),
1072 Err: err,
1073 }
1074 if peer, ok := peer.FromContext(cs.Context()); ok {
1075 logEntry.PeerAddr = peer.Addr
1076 }
1077 for _, binlog := range cs.binlogs {
1078 binlog.Log(cs.ctx, logEntry)
1079 }
1080 }
William Kurkianea869482019-04-09 15:16:11 -04001081 }
1082 if err == nil {
1083 cs.retryThrottler.successfulRPC()
1084 }
1085 if channelz.IsOn() {
1086 if err != nil {
1087 cs.cc.incrCallsFailed()
1088 } else {
1089 cs.cc.incrCallsSucceeded()
1090 }
1091 }
William Kurkianea869482019-04-09 15:16:11 -04001092 cs.cancel()
1093}
1094
Abhay Kumara61c5222025-11-10 07:32:50 +00001095func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error {
William Kurkianea869482019-04-09 15:16:11 -04001096 cs := a.cs
Abhilash S.L3b494632019-07-16 15:51:09 +05301097 if a.trInfo != nil {
William Kurkianea869482019-04-09 15:16:11 -04001098 a.mu.Lock()
1099 if a.trInfo.tr != nil {
1100 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1101 }
1102 a.mu.Unlock()
1103 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001104 if err := a.transportStream.Write(hdr, payld, &transport.WriteOptions{Last: !cs.desc.ClientStreams}); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001105 if !cs.desc.ClientStreams {
1106 // For non-client-streaming RPCs, we return nil instead of EOF on error
1107 // because the generated code requires it. finish is not called; RecvMsg()
1108 // will call it with the stream's status independently.
1109 return nil
1110 }
1111 return io.EOF
1112 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001113 if len(a.statsHandlers) != 0 {
1114 for _, sh := range a.statsHandlers {
1115 sh.HandleRPC(a.ctx, outPayload(true, m, dataLength, payloadLength, time.Now()))
1116 }
William Kurkianea869482019-04-09 15:16:11 -04001117 }
1118 return nil
1119}
1120
Abhay Kumara61c5222025-11-10 07:32:50 +00001121func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
William Kurkianea869482019-04-09 15:16:11 -04001122 cs := a.cs
Abhay Kumara61c5222025-11-10 07:32:50 +00001123 if len(a.statsHandlers) != 0 && payInfo == nil {
William Kurkianea869482019-04-09 15:16:11 -04001124 payInfo = &payloadInfo{}
Abhay Kumara61c5222025-11-10 07:32:50 +00001125 defer payInfo.free()
William Kurkianea869482019-04-09 15:16:11 -04001126 }
1127
Abhay Kumara61c5222025-11-10 07:32:50 +00001128 if !a.decompressorSet {
William Kurkianea869482019-04-09 15:16:11 -04001129 // Block until we receive headers containing received message encoding.
Abhay Kumara61c5222025-11-10 07:32:50 +00001130 if ct := a.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1131 if a.decompressorV0 == nil || a.decompressorV0.Type() != ct {
William Kurkianea869482019-04-09 15:16:11 -04001132 // No configured decompressor, or it does not match the incoming
1133 // message encoding; attempt to find a registered compressor that does.
Abhay Kumara61c5222025-11-10 07:32:50 +00001134 a.decompressorV0 = nil
1135 a.decompressorV1 = encoding.GetCompressor(ct)
William Kurkianea869482019-04-09 15:16:11 -04001136 }
1137 } else {
1138 // No compression is used; disable our decompressor.
Abhay Kumara61c5222025-11-10 07:32:50 +00001139 a.decompressorV0 = nil
William Kurkianea869482019-04-09 15:16:11 -04001140 }
1141 // Only initialize this state once per stream.
Abhay Kumara61c5222025-11-10 07:32:50 +00001142 a.decompressorSet = true
William Kurkianea869482019-04-09 15:16:11 -04001143 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001144 if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001145 if err == io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001146 if statusErr := a.transportStream.Status().Err(); statusErr != nil {
William Kurkianea869482019-04-09 15:16:11 -04001147 return statusErr
1148 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001149 // Received no msg and status OK for non-server streaming rpcs.
1150 if !cs.desc.ServerStreams && !cs.receivedFirstMsg {
1151 return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
1152 }
William Kurkianea869482019-04-09 15:16:11 -04001153 return io.EOF // indicates successful end of stream.
1154 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001155
William Kurkianea869482019-04-09 15:16:11 -04001156 return toRPCErr(err)
1157 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001158 cs.receivedFirstMsg = true
Abhilash S.L3b494632019-07-16 15:51:09 +05301159 if a.trInfo != nil {
William Kurkianea869482019-04-09 15:16:11 -04001160 a.mu.Lock()
1161 if a.trInfo.tr != nil {
1162 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1163 }
1164 a.mu.Unlock()
1165 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001166 for _, sh := range a.statsHandlers {
1167 sh.HandleRPC(a.ctx, &stats.InPayload{
1168 Client: true,
1169 RecvTime: time.Now(),
1170 Payload: m,
1171 WireLength: payInfo.compressedLength + headerLen,
1172 CompressedLength: payInfo.compressedLength,
1173 Length: payInfo.uncompressedBytes.Len(),
William Kurkianea869482019-04-09 15:16:11 -04001174 })
1175 }
William Kurkianea869482019-04-09 15:16:11 -04001176 if cs.desc.ServerStreams {
1177 // Subsequent messages should be received by subsequent RecvMsg calls.
1178 return nil
1179 }
1180 // Special handling for non-server-stream rpcs.
1181 // This recv expects EOF or errors, so we don't collect inPayload.
Abhay Kumara61c5222025-11-10 07:32:50 +00001182 if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF {
1183 return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1184 } else if err != nil {
1185 return toRPCErr(err)
William Kurkianea869482019-04-09 15:16:11 -04001186 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001187 return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
William Kurkianea869482019-04-09 15:16:11 -04001188}
1189
1190func (a *csAttempt) finish(err error) {
1191 a.mu.Lock()
1192 if a.finished {
1193 a.mu.Unlock()
1194 return
1195 }
1196 a.finished = true
1197 if err == io.EOF {
1198 // Ending a stream with EOF indicates a success.
1199 err = nil
1200 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301201 var tr metadata.MD
Abhay Kumara61c5222025-11-10 07:32:50 +00001202 if a.transportStream != nil {
1203 a.transportStream.Close(err)
1204 tr = a.transportStream.Trailer()
William Kurkianea869482019-04-09 15:16:11 -04001205 }
1206
Abhay Kumara61c5222025-11-10 07:32:50 +00001207 if a.pickResult.Done != nil {
William Kurkianea869482019-04-09 15:16:11 -04001208 br := false
Abhay Kumara61c5222025-11-10 07:32:50 +00001209 if a.transportStream != nil {
1210 br = a.transportStream.BytesReceived()
William Kurkianea869482019-04-09 15:16:11 -04001211 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001212 a.pickResult.Done(balancer.DoneInfo{
William Kurkianea869482019-04-09 15:16:11 -04001213 Err: err,
1214 Trailer: tr,
Abhay Kumara61c5222025-11-10 07:32:50 +00001215 BytesSent: a.transportStream != nil,
William Kurkianea869482019-04-09 15:16:11 -04001216 BytesReceived: br,
Abhilash S.L3b494632019-07-16 15:51:09 +05301217 ServerLoad: balancerload.Parse(tr),
William Kurkianea869482019-04-09 15:16:11 -04001218 })
1219 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001220 for _, sh := range a.statsHandlers {
William Kurkianea869482019-04-09 15:16:11 -04001221 end := &stats.End{
1222 Client: true,
Abhay Kumara61c5222025-11-10 07:32:50 +00001223 BeginTime: a.beginTime,
William Kurkianea869482019-04-09 15:16:11 -04001224 EndTime: time.Now(),
Abhilash S.L3b494632019-07-16 15:51:09 +05301225 Trailer: tr,
William Kurkianea869482019-04-09 15:16:11 -04001226 Error: err,
1227 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001228 sh.HandleRPC(a.ctx, end)
William Kurkianea869482019-04-09 15:16:11 -04001229 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301230 if a.trInfo != nil && a.trInfo.tr != nil {
William Kurkianea869482019-04-09 15:16:11 -04001231 if err == nil {
1232 a.trInfo.tr.LazyPrintf("RPC: [OK]")
1233 } else {
1234 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
1235 a.trInfo.tr.SetError()
1236 }
1237 a.trInfo.tr.Finish()
1238 a.trInfo.tr = nil
1239 }
1240 a.mu.Unlock()
1241}
1242
Abhay Kumara61c5222025-11-10 07:32:50 +00001243// newNonRetryClientStream creates a ClientStream with the specified transport, on the
Abhilash S.L3b494632019-07-16 15:51:09 +05301244// given addrConn.
1245//
1246// It's expected that the given transport is either the same one in addrConn, or
1247// is already closed. To avoid race, transport is specified separately, instead
Abhay Kumara61c5222025-11-10 07:32:50 +00001248// of using ac.transport.
Abhilash S.L3b494632019-07-16 15:51:09 +05301249//
1250// Main difference between this and ClientConn.NewStream:
1251// - no retry
1252// - no service config (or wait for service config)
1253// - no tracing or stats
1254func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
William Kurkianea869482019-04-09 15:16:11 -04001255 if t == nil {
1256 // TODO: return RPC error here?
1257 return nil, errors.New("transport provided is nil")
1258 }
1259 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
1260 c := &callInfo{}
1261
William Kurkianea869482019-04-09 15:16:11 -04001262 // Possible context leak:
1263 // The cancel function for the child context we create will only be called
1264 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
1265 // an error is generated by SendMsg.
1266 // https://github.com/grpc/grpc-go/issues/1818.
1267 ctx, cancel := context.WithCancel(ctx)
1268 defer func() {
1269 if err != nil {
1270 cancel()
1271 }
1272 }()
1273
Abhilash S.L3b494632019-07-16 15:51:09 +05301274 for _, o := range opts {
1275 if err := o.before(c); err != nil {
1276 return nil, toRPCErr(err)
1277 }
1278 }
1279 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
1280 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
William Kurkianea869482019-04-09 15:16:11 -04001281 if err := setCallInfoCodec(c); err != nil {
1282 return nil, err
1283 }
1284
1285 callHdr := &transport.CallHdr{
1286 Host: ac.cc.authority,
1287 Method: method,
1288 ContentSubtype: c.contentSubtype,
1289 }
1290
1291 // Set our outgoing compression according to the UseCompressor CallOption, if
1292 // set. In that case, also find the compressor from the encoding package.
1293 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1294 // if set.
1295 var cp Compressor
1296 var comp encoding.Compressor
Abhay Kumara61c5222025-11-10 07:32:50 +00001297 if ct := c.compressorName; ct != "" {
William Kurkianea869482019-04-09 15:16:11 -04001298 callHdr.SendCompress = ct
1299 if ct != encoding.Identity {
1300 comp = encoding.GetCompressor(ct)
1301 if comp == nil {
1302 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1303 }
1304 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001305 } else if ac.cc.dopts.compressorV0 != nil {
1306 callHdr.SendCompress = ac.cc.dopts.compressorV0.Type()
1307 cp = ac.cc.dopts.compressorV0
William Kurkianea869482019-04-09 15:16:11 -04001308 }
1309 if c.creds != nil {
1310 callHdr.Creds = c.creds
1311 }
1312
Abhilash S.L3b494632019-07-16 15:51:09 +05301313 // Use a special addrConnStream to avoid retry.
William Kurkianea869482019-04-09 15:16:11 -04001314 as := &addrConnStream{
Abhay Kumara61c5222025-11-10 07:32:50 +00001315 callHdr: callHdr,
1316 ac: ac,
1317 ctx: ctx,
1318 cancel: cancel,
1319 opts: opts,
1320 callInfo: c,
1321 desc: desc,
1322 codec: c.codec,
1323 sendCompressorV0: cp,
1324 sendCompressorV1: comp,
1325 transport: t,
William Kurkianea869482019-04-09 15:16:11 -04001326 }
1327
Abhay Kumara61c5222025-11-10 07:32:50 +00001328 s, err := as.transport.NewStream(as.ctx, as.callHdr)
William Kurkianea869482019-04-09 15:16:11 -04001329 if err != nil {
1330 err = toRPCErr(err)
1331 return nil, err
1332 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001333 as.transportStream = s
1334 as.parser = &parser{r: s, bufferPool: ac.dopts.copts.BufferPool}
William Kurkianea869482019-04-09 15:16:11 -04001335 ac.incrCallsStarted()
1336 if desc != unaryStreamDesc {
Abhay Kumara61c5222025-11-10 07:32:50 +00001337 // Listen on stream context to cleanup when the stream context is
1338 // canceled. Also listen for the addrConn's context in case the
1339 // addrConn is closed or reconnects to a different address. In all
1340 // other cases, an error should already be injected into the recv
1341 // buffer by the transport, which the client will eventually receive,
1342 // and then we will cancel the stream's context in
1343 // addrConnStream.finish.
William Kurkianea869482019-04-09 15:16:11 -04001344 go func() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001345 ac.mu.Lock()
1346 acCtx := ac.ctx
1347 ac.mu.Unlock()
William Kurkianea869482019-04-09 15:16:11 -04001348 select {
Abhay Kumara61c5222025-11-10 07:32:50 +00001349 case <-acCtx.Done():
William Kurkianea869482019-04-09 15:16:11 -04001350 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1351 case <-ctx.Done():
1352 as.finish(toRPCErr(ctx.Err()))
1353 }
1354 }()
1355 }
1356 return as, nil
1357}
1358
1359type addrConnStream struct {
Abhay Kumara61c5222025-11-10 07:32:50 +00001360 transportStream *transport.ClientStream
1361 ac *addrConn
1362 callHdr *transport.CallHdr
1363 cancel context.CancelFunc
1364 opts []CallOption
1365 callInfo *callInfo
1366 transport transport.ClientTransport
1367 ctx context.Context
1368 sentLast bool
1369 receivedFirstMsg bool
1370 desc *StreamDesc
1371 codec baseCodec
1372 sendCompressorV0 Compressor
1373 sendCompressorV1 encoding.Compressor
1374 decompressorSet bool
1375 decompressorV0 Decompressor
1376 decompressorV1 encoding.Compressor
1377 parser *parser
1378
1379 // mu guards finished and is held for the entire finish method.
1380 mu sync.Mutex
1381 finished bool
William Kurkianea869482019-04-09 15:16:11 -04001382}
1383
1384func (as *addrConnStream) Header() (metadata.MD, error) {
Abhay Kumara61c5222025-11-10 07:32:50 +00001385 m, err := as.transportStream.Header()
William Kurkianea869482019-04-09 15:16:11 -04001386 if err != nil {
1387 as.finish(toRPCErr(err))
1388 }
1389 return m, err
1390}
1391
1392func (as *addrConnStream) Trailer() metadata.MD {
Abhay Kumara61c5222025-11-10 07:32:50 +00001393 return as.transportStream.Trailer()
William Kurkianea869482019-04-09 15:16:11 -04001394}
1395
1396func (as *addrConnStream) CloseSend() error {
1397 if as.sentLast {
Abhay Kumara61c5222025-11-10 07:32:50 +00001398 // Return a nil error on repeated calls to this method.
William Kurkianea869482019-04-09 15:16:11 -04001399 return nil
1400 }
1401 as.sentLast = true
1402
Abhay Kumara61c5222025-11-10 07:32:50 +00001403 as.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
William Kurkianea869482019-04-09 15:16:11 -04001404 // Always return nil; io.EOF is the only error that might make sense
1405 // instead, but there is no need to signal the client to call RecvMsg
1406 // as the only use left for the stream after CloseSend is to call
1407 // RecvMsg. This also matches historical behavior.
1408 return nil
1409}
1410
1411func (as *addrConnStream) Context() context.Context {
Abhay Kumara61c5222025-11-10 07:32:50 +00001412 return as.transportStream.Context()
William Kurkianea869482019-04-09 15:16:11 -04001413}
1414
Abhay Kumara61c5222025-11-10 07:32:50 +00001415func (as *addrConnStream) SendMsg(m any) (err error) {
William Kurkianea869482019-04-09 15:16:11 -04001416 defer func() {
1417 if err != nil && err != io.EOF {
1418 // Call finish on the client stream for errors generated by this SendMsg
1419 // call, as these indicate problems created by this client. (Transport
1420 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1421 // error will be returned from RecvMsg eventually in that case, or be
1422 // retried.)
1423 as.finish(err)
1424 }
1425 }()
1426 if as.sentLast {
1427 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1428 }
1429 if !as.desc.ClientStreams {
1430 as.sentLast = true
1431 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301432
1433 // load hdr, payload, data
Abhay Kumara61c5222025-11-10 07:32:50 +00001434 hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.sendCompressorV0, as.sendCompressorV1, as.ac.dopts.copts.BufferPool)
William Kurkianea869482019-04-09 15:16:11 -04001435 if err != nil {
1436 return err
1437 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301438
Abhay Kumara61c5222025-11-10 07:32:50 +00001439 defer func() {
1440 data.Free()
1441 // only free payload if compression was made, and therefore it is a different set
1442 // of buffers from data.
1443 if pf.isCompressed() {
1444 payload.Free()
1445 }
1446 }()
1447
William Kurkianea869482019-04-09 15:16:11 -04001448 // TODO(dfawley): should we be checking len(data) instead?
Abhay Kumara61c5222025-11-10 07:32:50 +00001449 if payload.Len() > *as.callInfo.maxSendMessageSize {
1450 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payload.Len(), *as.callInfo.maxSendMessageSize)
William Kurkianea869482019-04-09 15:16:11 -04001451 }
1452
Abhay Kumara61c5222025-11-10 07:32:50 +00001453 if err := as.transportStream.Write(hdr, payload, &transport.WriteOptions{Last: !as.desc.ClientStreams}); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001454 if !as.desc.ClientStreams {
1455 // For non-client-streaming RPCs, we return nil instead of EOF on error
1456 // because the generated code requires it. finish is not called; RecvMsg()
1457 // will call it with the stream's status independently.
1458 return nil
1459 }
1460 return io.EOF
1461 }
1462
William Kurkianea869482019-04-09 15:16:11 -04001463 return nil
1464}
1465
Abhay Kumara61c5222025-11-10 07:32:50 +00001466func (as *addrConnStream) RecvMsg(m any) (err error) {
William Kurkianea869482019-04-09 15:16:11 -04001467 defer func() {
1468 if err != nil || !as.desc.ServerStreams {
1469 // err != nil or non-server-streaming indicates end of stream.
1470 as.finish(err)
1471 }
1472 }()
1473
Abhay Kumara61c5222025-11-10 07:32:50 +00001474 if !as.decompressorSet {
William Kurkianea869482019-04-09 15:16:11 -04001475 // Block until we receive headers containing received message encoding.
Abhay Kumara61c5222025-11-10 07:32:50 +00001476 if ct := as.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1477 if as.decompressorV0 == nil || as.decompressorV0.Type() != ct {
William Kurkianea869482019-04-09 15:16:11 -04001478 // No configured decompressor, or it does not match the incoming
1479 // message encoding; attempt to find a registered compressor that does.
Abhay Kumara61c5222025-11-10 07:32:50 +00001480 as.decompressorV0 = nil
1481 as.decompressorV1 = encoding.GetCompressor(ct)
William Kurkianea869482019-04-09 15:16:11 -04001482 }
1483 } else {
1484 // No compression is used; disable our decompressor.
Abhay Kumara61c5222025-11-10 07:32:50 +00001485 as.decompressorV0 = nil
William Kurkianea869482019-04-09 15:16:11 -04001486 }
1487 // Only initialize this state once per stream.
Abhay Kumara61c5222025-11-10 07:32:50 +00001488 as.decompressorSet = true
William Kurkianea869482019-04-09 15:16:11 -04001489 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001490 if err := recv(as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001491 if err == io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001492 if statusErr := as.transportStream.Status().Err(); statusErr != nil {
William Kurkianea869482019-04-09 15:16:11 -04001493 return statusErr
1494 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001495 // Received no msg and status OK for non-server streaming rpcs.
1496 if !as.desc.ServerStreams && !as.receivedFirstMsg {
1497 return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
1498 }
William Kurkianea869482019-04-09 15:16:11 -04001499 return io.EOF // indicates successful end of stream.
1500 }
1501 return toRPCErr(err)
1502 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001503 as.receivedFirstMsg = true
William Kurkianea869482019-04-09 15:16:11 -04001504
William Kurkianea869482019-04-09 15:16:11 -04001505 if as.desc.ServerStreams {
1506 // Subsequent messages should be received by subsequent RecvMsg calls.
1507 return nil
1508 }
1509
1510 // Special handling for non-server-stream rpcs.
1511 // This recv expects EOF or errors, so we don't collect inPayload.
Abhay Kumara61c5222025-11-10 07:32:50 +00001512 if err := recv(as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err == io.EOF {
1513 return as.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1514 } else if err != nil {
1515 return toRPCErr(err)
William Kurkianea869482019-04-09 15:16:11 -04001516 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001517 return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
William Kurkianea869482019-04-09 15:16:11 -04001518}
1519
1520func (as *addrConnStream) finish(err error) {
1521 as.mu.Lock()
1522 if as.finished {
1523 as.mu.Unlock()
1524 return
1525 }
1526 as.finished = true
1527 if err == io.EOF {
1528 // Ending a stream with EOF indicates a success.
1529 err = nil
1530 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001531 if as.transportStream != nil {
1532 as.transportStream.Close(err)
William Kurkianea869482019-04-09 15:16:11 -04001533 }
1534
1535 if err != nil {
1536 as.ac.incrCallsFailed()
1537 } else {
1538 as.ac.incrCallsSucceeded()
1539 }
1540 as.cancel()
1541 as.mu.Unlock()
1542}
1543
1544// ServerStream defines the server-side behavior of a streaming RPC.
1545//
Abhay Kumara61c5222025-11-10 07:32:50 +00001546// Errors returned from ServerStream methods are compatible with the status
1547// package. However, the status code will often not match the RPC status as
1548// seen by the client application, and therefore, should not be relied upon for
1549// this purpose.
William Kurkianea869482019-04-09 15:16:11 -04001550type ServerStream interface {
1551 // SetHeader sets the header metadata. It may be called multiple times.
1552 // When call multiple times, all the provided metadata will be merged.
1553 // All the metadata will be sent out when one of the following happens:
1554 // - ServerStream.SendHeader() is called;
1555 // - The first response is sent out;
1556 // - An RPC status is sent out (error or success).
1557 SetHeader(metadata.MD) error
1558 // SendHeader sends the header metadata.
1559 // The provided md and headers set by SetHeader() will be sent.
1560 // It fails if called multiple times.
1561 SendHeader(metadata.MD) error
1562 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1563 // When called more than once, all the provided metadata will be merged.
1564 SetTrailer(metadata.MD)
1565 // Context returns the context for this stream.
1566 Context() context.Context
1567 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1568 // error is returned directly.
1569 //
1570 // SendMsg blocks until:
1571 // - There is sufficient flow control to schedule m with the transport, or
1572 // - The stream is done, or
1573 // - The stream breaks.
1574 //
1575 // SendMsg does not wait until the message is received by the client. An
1576 // untimely stream closure may result in lost messages.
1577 //
1578 // It is safe to have a goroutine calling SendMsg and another goroutine
1579 // calling RecvMsg on the same stream at the same time, but it is not safe
1580 // to call SendMsg on the same stream in different goroutines.
Abhay Kumara61c5222025-11-10 07:32:50 +00001581 //
1582 // It is not safe to modify the message after calling SendMsg. Tracing
1583 // libraries and stats handlers may use the message lazily.
1584 SendMsg(m any) error
William Kurkianea869482019-04-09 15:16:11 -04001585 // RecvMsg blocks until it receives a message into m or the stream is
1586 // done. It returns io.EOF when the client has performed a CloseSend. On
1587 // any non-EOF error, the stream is aborted and the error contains the
1588 // RPC status.
1589 //
1590 // It is safe to have a goroutine calling SendMsg and another goroutine
1591 // calling RecvMsg on the same stream at the same time, but it is not
1592 // safe to call RecvMsg on the same stream in different goroutines.
Abhay Kumara61c5222025-11-10 07:32:50 +00001593 RecvMsg(m any) error
William Kurkianea869482019-04-09 15:16:11 -04001594}
1595
1596// serverStream implements a server side Stream.
1597type serverStream struct {
1598 ctx context.Context
Abhay Kumara61c5222025-11-10 07:32:50 +00001599 s *transport.ServerStream
William Kurkianea869482019-04-09 15:16:11 -04001600 p *parser
1601 codec baseCodec
Abhay Kumara61c5222025-11-10 07:32:50 +00001602 desc *StreamDesc
William Kurkianea869482019-04-09 15:16:11 -04001603
Abhay Kumara61c5222025-11-10 07:32:50 +00001604 compressorV0 Compressor
1605 compressorV1 encoding.Compressor
1606 decompressorV0 Decompressor
1607 decompressorV1 encoding.Compressor
1608
1609 sendCompressorName string
1610
1611 recvFirstMsg bool // set after the first message is received
William Kurkianea869482019-04-09 15:16:11 -04001612
1613 maxReceiveMessageSize int
1614 maxSendMessageSize int
1615 trInfo *traceInfo
1616
Abhay Kumara61c5222025-11-10 07:32:50 +00001617 statsHandler []stats.Handler
William Kurkianea869482019-04-09 15:16:11 -04001618
Abhay Kumara61c5222025-11-10 07:32:50 +00001619 binlogs []binarylog.MethodLogger
William Kurkianea869482019-04-09 15:16:11 -04001620 // serverHeaderBinlogged indicates whether server header has been logged. It
1621 // will happen when one of the following two happens: stream.SendHeader(),
1622 // stream.Send().
1623 //
1624 // It's only checked in send and sendHeader, doesn't need to be
1625 // synchronized.
1626 serverHeaderBinlogged bool
1627
1628 mu sync.Mutex // protects trInfo.tr after the service handler runs.
1629}
1630
1631func (ss *serverStream) Context() context.Context {
1632 return ss.ctx
1633}
1634
1635func (ss *serverStream) SetHeader(md metadata.MD) error {
1636 if md.Len() == 0 {
1637 return nil
1638 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001639 err := imetadata.Validate(md)
1640 if err != nil {
1641 return status.Error(codes.Internal, err.Error())
1642 }
William Kurkianea869482019-04-09 15:16:11 -04001643 return ss.s.SetHeader(md)
1644}
1645
1646func (ss *serverStream) SendHeader(md metadata.MD) error {
Abhay Kumara61c5222025-11-10 07:32:50 +00001647 err := imetadata.Validate(md)
1648 if err != nil {
1649 return status.Error(codes.Internal, err.Error())
1650 }
1651
1652 err = ss.s.SendHeader(md)
1653 if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
William Kurkianea869482019-04-09 15:16:11 -04001654 h, _ := ss.s.Header()
Abhay Kumara61c5222025-11-10 07:32:50 +00001655 sh := &binarylog.ServerHeader{
William Kurkianea869482019-04-09 15:16:11 -04001656 Header: h,
Abhay Kumara61c5222025-11-10 07:32:50 +00001657 }
William Kurkianea869482019-04-09 15:16:11 -04001658 ss.serverHeaderBinlogged = true
Abhay Kumara61c5222025-11-10 07:32:50 +00001659 for _, binlog := range ss.binlogs {
1660 binlog.Log(ss.ctx, sh)
1661 }
William Kurkianea869482019-04-09 15:16:11 -04001662 }
1663 return err
1664}
1665
1666func (ss *serverStream) SetTrailer(md metadata.MD) {
1667 if md.Len() == 0 {
1668 return
1669 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001670 if err := imetadata.Validate(md); err != nil {
1671 logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
1672 }
William Kurkianea869482019-04-09 15:16:11 -04001673 ss.s.SetTrailer(md)
1674}
1675
Abhay Kumara61c5222025-11-10 07:32:50 +00001676func (ss *serverStream) SendMsg(m any) (err error) {
William Kurkianea869482019-04-09 15:16:11 -04001677 defer func() {
1678 if ss.trInfo != nil {
1679 ss.mu.Lock()
1680 if ss.trInfo.tr != nil {
1681 if err == nil {
1682 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1683 } else {
Abhay Kumara61c5222025-11-10 07:32:50 +00001684 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
William Kurkianea869482019-04-09 15:16:11 -04001685 ss.trInfo.tr.SetError()
1686 }
1687 }
1688 ss.mu.Unlock()
1689 }
1690 if err != nil && err != io.EOF {
1691 st, _ := status.FromError(toRPCErr(err))
Abhay Kumara61c5222025-11-10 07:32:50 +00001692 ss.s.WriteStatus(st)
William Kurkianea869482019-04-09 15:16:11 -04001693 // Non-user specified status was sent out. This should be an error
1694 // case (as a server side Cancel maybe).
1695 //
1696 // This is not handled specifically now. User will return a final
1697 // status from the service handler, we will log that error instead.
1698 // This behavior is similar to an interceptor.
1699 }
William Kurkianea869482019-04-09 15:16:11 -04001700 }()
Abhilash S.L3b494632019-07-16 15:51:09 +05301701
Abhay Kumara61c5222025-11-10 07:32:50 +00001702 // Server handler could have set new compressor by calling SetSendCompressor.
1703 // In case it is set, we need to use it for compressing outbound message.
1704 if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
1705 ss.compressorV1 = encoding.GetCompressor(sendCompressorsName)
1706 ss.sendCompressorName = sendCompressorsName
1707 }
1708
Abhilash S.L3b494632019-07-16 15:51:09 +05301709 // load hdr, payload, data
Abhay Kumara61c5222025-11-10 07:32:50 +00001710 hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.compressorV0, ss.compressorV1, ss.p.bufferPool)
William Kurkianea869482019-04-09 15:16:11 -04001711 if err != nil {
1712 return err
1713 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301714
Abhay Kumara61c5222025-11-10 07:32:50 +00001715 defer func() {
1716 data.Free()
1717 // only free payload if compression was made, and therefore it is a different set
1718 // of buffers from data.
1719 if pf.isCompressed() {
1720 payload.Free()
1721 }
1722 }()
1723
1724 dataLen := data.Len()
1725 payloadLen := payload.Len()
1726
William Kurkianea869482019-04-09 15:16:11 -04001727 // TODO(dfawley): should we be checking len(data) instead?
Abhay Kumara61c5222025-11-10 07:32:50 +00001728 if payloadLen > ss.maxSendMessageSize {
1729 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, ss.maxSendMessageSize)
William Kurkianea869482019-04-09 15:16:11 -04001730 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001731 if err := ss.s.Write(hdr, payload, &transport.WriteOptions{Last: false}); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001732 return toRPCErr(err)
1733 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001734
1735 if len(ss.binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001736 if !ss.serverHeaderBinlogged {
1737 h, _ := ss.s.Header()
Abhay Kumara61c5222025-11-10 07:32:50 +00001738 sh := &binarylog.ServerHeader{
William Kurkianea869482019-04-09 15:16:11 -04001739 Header: h,
Abhay Kumara61c5222025-11-10 07:32:50 +00001740 }
William Kurkianea869482019-04-09 15:16:11 -04001741 ss.serverHeaderBinlogged = true
Abhay Kumara61c5222025-11-10 07:32:50 +00001742 for _, binlog := range ss.binlogs {
1743 binlog.Log(ss.ctx, sh)
1744 }
William Kurkianea869482019-04-09 15:16:11 -04001745 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001746 sm := &binarylog.ServerMessage{
1747 Message: data.Materialize(),
1748 }
1749 for _, binlog := range ss.binlogs {
1750 binlog.Log(ss.ctx, sm)
1751 }
William Kurkianea869482019-04-09 15:16:11 -04001752 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001753 if len(ss.statsHandler) != 0 {
1754 for _, sh := range ss.statsHandler {
1755 sh.HandleRPC(ss.s.Context(), outPayload(false, m, dataLen, payloadLen, time.Now()))
1756 }
William Kurkianea869482019-04-09 15:16:11 -04001757 }
1758 return nil
1759}
1760
Abhay Kumara61c5222025-11-10 07:32:50 +00001761func (ss *serverStream) RecvMsg(m any) (err error) {
William Kurkianea869482019-04-09 15:16:11 -04001762 defer func() {
1763 if ss.trInfo != nil {
1764 ss.mu.Lock()
1765 if ss.trInfo.tr != nil {
1766 if err == nil {
1767 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1768 } else if err != io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001769 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
William Kurkianea869482019-04-09 15:16:11 -04001770 ss.trInfo.tr.SetError()
1771 }
1772 }
1773 ss.mu.Unlock()
1774 }
1775 if err != nil && err != io.EOF {
1776 st, _ := status.FromError(toRPCErr(err))
Abhay Kumara61c5222025-11-10 07:32:50 +00001777 ss.s.WriteStatus(st)
William Kurkianea869482019-04-09 15:16:11 -04001778 // Non-user specified status was sent out. This should be an error
1779 // case (as a server side Cancel maybe).
1780 //
1781 // This is not handled specifically now. User will return a final
1782 // status from the service handler, we will log that error instead.
1783 // This behavior is similar to an interceptor.
1784 }
William Kurkianea869482019-04-09 15:16:11 -04001785 }()
1786 var payInfo *payloadInfo
Abhay Kumara61c5222025-11-10 07:32:50 +00001787 if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001788 payInfo = &payloadInfo{}
Abhay Kumara61c5222025-11-10 07:32:50 +00001789 defer payInfo.free()
William Kurkianea869482019-04-09 15:16:11 -04001790 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001791 if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001792 if err == io.EOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001793 if len(ss.binlogs) != 0 {
1794 chc := &binarylog.ClientHalfClose{}
1795 for _, binlog := range ss.binlogs {
1796 binlog.Log(ss.ctx, chc)
1797 }
1798 }
1799 // Received no request msg for non-client streaming rpcs.
1800 if !ss.desc.ClientStreams && !ss.recvFirstMsg {
1801 return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC")
William Kurkianea869482019-04-09 15:16:11 -04001802 }
1803 return err
1804 }
1805 if err == io.ErrUnexpectedEOF {
Abhay Kumara61c5222025-11-10 07:32:50 +00001806 err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
William Kurkianea869482019-04-09 15:16:11 -04001807 }
1808 return toRPCErr(err)
1809 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001810 ss.recvFirstMsg = true
1811 if len(ss.statsHandler) != 0 {
1812 for _, sh := range ss.statsHandler {
1813 sh.HandleRPC(ss.s.Context(), &stats.InPayload{
1814 RecvTime: time.Now(),
1815 Payload: m,
1816 Length: payInfo.uncompressedBytes.Len(),
1817 WireLength: payInfo.compressedLength + headerLen,
1818 CompressedLength: payInfo.compressedLength,
1819 })
1820 }
William Kurkianea869482019-04-09 15:16:11 -04001821 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001822 if len(ss.binlogs) != 0 {
1823 cm := &binarylog.ClientMessage{
1824 Message: payInfo.uncompressedBytes.Materialize(),
1825 }
1826 for _, binlog := range ss.binlogs {
1827 binlog.Log(ss.ctx, cm)
1828 }
William Kurkianea869482019-04-09 15:16:11 -04001829 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001830
1831 if ss.desc.ClientStreams {
1832 // Subsequent messages should be received by subsequent RecvMsg calls.
1833 return nil
1834 }
1835 // Special handling for non-client-stream rpcs.
1836 // This recv expects EOF or errors, so we don't collect inPayload.
1837 if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, nil, ss.decompressorV1, true); err == io.EOF {
1838 return nil
1839 } else if err != nil {
1840 return err
1841 }
1842 return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-client-streaming RPC")
William Kurkianea869482019-04-09 15:16:11 -04001843}
1844
1845// MethodFromServerStream returns the method string for the input stream.
1846// The returned string is in the format of "/service/method".
1847func MethodFromServerStream(stream ServerStream) (string, bool) {
1848 return Method(stream.Context())
1849}
Abhilash S.L3b494632019-07-16 15:51:09 +05301850
Abhay Kumara61c5222025-11-10 07:32:50 +00001851// prepareMsg returns the hdr, payload and data using the compressors passed or
1852// using the passed preparedmsg. The returned boolean indicates whether
1853// compression was made and therefore whether the payload needs to be freed in
1854// addition to the returned data. Freeing the payload if the returned boolean is
1855// false can lead to undefined behavior.
1856func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor, pool mem.BufferPool) (hdr []byte, data, payload mem.BufferSlice, pf payloadFormat, err error) {
Abhilash S.L3b494632019-07-16 15:51:09 +05301857 if preparedMsg, ok := m.(*PreparedMsg); ok {
Abhay Kumara61c5222025-11-10 07:32:50 +00001858 return preparedMsg.hdr, preparedMsg.encodedData, preparedMsg.payload, preparedMsg.pf, nil
Abhilash S.L3b494632019-07-16 15:51:09 +05301859 }
1860 // The input interface is not a prepared msg.
1861 // Marshal and Compress the data at this point
1862 data, err = encode(codec, m)
1863 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001864 return nil, nil, nil, 0, err
Abhilash S.L3b494632019-07-16 15:51:09 +05301865 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001866 compData, pf, err := compress(data, cp, comp, pool)
Abhilash S.L3b494632019-07-16 15:51:09 +05301867 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001868 data.Free()
1869 return nil, nil, nil, 0, err
Abhilash S.L3b494632019-07-16 15:51:09 +05301870 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001871 hdr, payload = msgHeader(data, compData, pf)
1872 return hdr, data, payload, pf, nil
Abhilash S.L3b494632019-07-16 15:51:09 +05301873}