blob: ddd377341191d05dbde6543be2f73710b6df6700 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35
William Kurkianea869482019-04-09 15:16:11 -040036 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/encoding"
39 "google.golang.org/grpc/encoding/proto"
Abhay Kumara61c5222025-11-10 07:32:50 +000040 estats "google.golang.org/grpc/experimental/stats"
William Kurkianea869482019-04-09 15:16:11 -040041 "google.golang.org/grpc/grpclog"
Abhay Kumara61c5222025-11-10 07:32:50 +000042 "google.golang.org/grpc/internal"
William Kurkianea869482019-04-09 15:16:11 -040043 "google.golang.org/grpc/internal/binarylog"
44 "google.golang.org/grpc/internal/channelz"
David Bainbridge788e5202019-10-21 18:49:40 +000045 "google.golang.org/grpc/internal/grpcsync"
Abhay Kumara61c5222025-11-10 07:32:50 +000046 "google.golang.org/grpc/internal/grpcutil"
47 istats "google.golang.org/grpc/internal/stats"
William Kurkianea869482019-04-09 15:16:11 -040048 "google.golang.org/grpc/internal/transport"
49 "google.golang.org/grpc/keepalive"
Abhay Kumara61c5222025-11-10 07:32:50 +000050 "google.golang.org/grpc/mem"
William Kurkianea869482019-04-09 15:16:11 -040051 "google.golang.org/grpc/metadata"
52 "google.golang.org/grpc/peer"
53 "google.golang.org/grpc/stats"
54 "google.golang.org/grpc/status"
55 "google.golang.org/grpc/tap"
56)
57
58const (
59 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
60 defaultServerMaxSendMessageSize = math.MaxInt32
Abhay Kumara61c5222025-11-10 07:32:50 +000061
62 // Server transports are tracked in a map which is keyed on listener
63 // address. For regular gRPC traffic, connections are accepted in Serve()
64 // through a call to Accept(), and we use the actual listener address as key
65 // when we add it to the map. But for connections received through
66 // ServeHTTP(), we do not have a listener and hence use this dummy value.
67 listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
William Kurkianea869482019-04-09 15:16:11 -040068)
69
Abhay Kumara61c5222025-11-10 07:32:50 +000070func init() {
71 internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
72 return srv.opts.creds
73 }
74 internal.IsRegisteredMethod = func(srv *Server, method string) bool {
75 return srv.isRegisteredMethod(method)
76 }
77 internal.ServerFromContext = serverFromContext
78 internal.AddGlobalServerOptions = func(opt ...ServerOption) {
79 globalServerOptions = append(globalServerOptions, opt...)
80 }
81 internal.ClearGlobalServerOptions = func() {
82 globalServerOptions = nil
83 }
84 internal.BinaryLogger = binaryLogger
85 internal.JoinServerOptions = newJoinServerOption
86 internal.BufferPool = bufferPool
87 internal.MetricsRecorderForServer = func(srv *Server) estats.MetricsRecorder {
88 return istats.NewMetricsRecorderList(srv.opts.statsHandlers)
89 }
90}
David Bainbridge788e5202019-10-21 18:49:40 +000091
Abhay Kumara61c5222025-11-10 07:32:50 +000092var statusOK = status.New(codes.OK, "")
93var logger = grpclog.Component("core")
94
95// MethodHandler is a function type that processes a unary RPC method call.
96type MethodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error)
William Kurkianea869482019-04-09 15:16:11 -040097
98// MethodDesc represents an RPC service's method specification.
99type MethodDesc struct {
100 MethodName string
Abhay Kumara61c5222025-11-10 07:32:50 +0000101 Handler MethodHandler
William Kurkianea869482019-04-09 15:16:11 -0400102}
103
104// ServiceDesc represents an RPC service's specification.
105type ServiceDesc struct {
106 ServiceName string
107 // The pointer to the service interface. Used to check whether the user
108 // provided implementation satisfies the interface requirements.
Abhay Kumara61c5222025-11-10 07:32:50 +0000109 HandlerType any
William Kurkianea869482019-04-09 15:16:11 -0400110 Methods []MethodDesc
111 Streams []StreamDesc
Abhay Kumara61c5222025-11-10 07:32:50 +0000112 Metadata any
William Kurkianea869482019-04-09 15:16:11 -0400113}
114
Abhay Kumara61c5222025-11-10 07:32:50 +0000115// serviceInfo wraps information about a service. It is very similar to
116// ServiceDesc and is constructed from it for internal purposes.
117type serviceInfo struct {
118 // Contains the implementation for the methods in this service.
119 serviceImpl any
120 methods map[string]*MethodDesc
121 streams map[string]*StreamDesc
122 mdata any
William Kurkianea869482019-04-09 15:16:11 -0400123}
124
125// Server is a gRPC server to serve RPC requests.
126type Server struct {
bseeniva0b9cbcb2026-02-12 19:11:11 +0530127 opts serverOptions
128 statsHandler stats.Handler
William Kurkianea869482019-04-09 15:16:11 -0400129
Abhay Kumara61c5222025-11-10 07:32:50 +0000130 mu sync.Mutex // guards following
131 lis map[net.Listener]bool
132 // conns contains all active server transports. It is a map keyed on a
133 // listener address with the value being the set of active transports
134 // belonging to that listener.
135 conns map[string]map[transport.ServerTransport]bool
136 serve bool
137 drain bool
138 cv *sync.Cond // signaled when connections close for GracefulStop
139 services map[string]*serviceInfo // service name -> service info
140 events traceEventLog
William Kurkianea869482019-04-09 15:16:11 -0400141
David Bainbridge788e5202019-10-21 18:49:40 +0000142 quit *grpcsync.Event
143 done *grpcsync.Event
William Kurkianea869482019-04-09 15:16:11 -0400144 channelzRemoveOnce sync.Once
Abhay Kumara61c5222025-11-10 07:32:50 +0000145 serveWG sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop
146 handlersWG sync.WaitGroup // counts active method handler goroutines
William Kurkianea869482019-04-09 15:16:11 -0400147
Abhay Kumara61c5222025-11-10 07:32:50 +0000148 channelz *channelz.Server
149
150 serverWorkerChannel chan func()
151 serverWorkerChannelClose func()
William Kurkianea869482019-04-09 15:16:11 -0400152}
153
Abhilash S.L3b494632019-07-16 15:51:09 +0530154type serverOptions struct {
William Kurkianea869482019-04-09 15:16:11 -0400155 creds credentials.TransportCredentials
156 codec baseCodec
157 cp Compressor
158 dc Decompressor
159 unaryInt UnaryServerInterceptor
160 streamInt StreamServerInterceptor
Abhay Kumara61c5222025-11-10 07:32:50 +0000161 chainUnaryInts []UnaryServerInterceptor
162 chainStreamInts []StreamServerInterceptor
163 binaryLogger binarylog.Logger
William Kurkianea869482019-04-09 15:16:11 -0400164 inTapHandle tap.ServerInHandle
Abhay Kumara61c5222025-11-10 07:32:50 +0000165 statsHandlers []stats.Handler
William Kurkianea869482019-04-09 15:16:11 -0400166 maxConcurrentStreams uint32
167 maxReceiveMessageSize int
168 maxSendMessageSize int
169 unknownStreamDesc *StreamDesc
170 keepaliveParams keepalive.ServerParameters
171 keepalivePolicy keepalive.EnforcementPolicy
172 initialWindowSize int32
173 initialConnWindowSize int32
174 writeBufferSize int
175 readBufferSize int
Abhay Kumara61c5222025-11-10 07:32:50 +0000176 sharedWriteBuffer bool
William Kurkianea869482019-04-09 15:16:11 -0400177 connectionTimeout time.Duration
178 maxHeaderListSize *uint32
Devmalya Pauldd23a992019-11-14 07:06:31 +0000179 headerTableSize *uint32
Abhay Kumara61c5222025-11-10 07:32:50 +0000180 numServerWorkers uint32
181 bufferPool mem.BufferPool
182 waitForHandlers bool
183 staticWindowSize bool
William Kurkianea869482019-04-09 15:16:11 -0400184}
185
Abhilash S.L3b494632019-07-16 15:51:09 +0530186var defaultServerOptions = serverOptions{
Abhay Kumara61c5222025-11-10 07:32:50 +0000187 maxConcurrentStreams: math.MaxUint32,
William Kurkianea869482019-04-09 15:16:11 -0400188 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
189 maxSendMessageSize: defaultServerMaxSendMessageSize,
190 connectionTimeout: 120 * time.Second,
191 writeBufferSize: defaultWriteBufSize,
192 readBufferSize: defaultReadBufSize,
Abhay Kumara61c5222025-11-10 07:32:50 +0000193 bufferPool: mem.DefaultBufferPool(),
William Kurkianea869482019-04-09 15:16:11 -0400194}
Abhay Kumara61c5222025-11-10 07:32:50 +0000195var globalServerOptions []ServerOption
William Kurkianea869482019-04-09 15:16:11 -0400196
197// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
Abhilash S.L3b494632019-07-16 15:51:09 +0530198type ServerOption interface {
199 apply(*serverOptions)
200}
201
202// EmptyServerOption does not alter the server configuration. It can be embedded
203// in another structure to build custom server options.
204//
Abhay Kumara61c5222025-11-10 07:32:50 +0000205// # Experimental
206//
207// Notice: This type is EXPERIMENTAL and may be changed or removed in a
208// later release.
Abhilash S.L3b494632019-07-16 15:51:09 +0530209type EmptyServerOption struct{}
210
211func (EmptyServerOption) apply(*serverOptions) {}
212
213// funcServerOption wraps a function that modifies serverOptions into an
214// implementation of the ServerOption interface.
215type funcServerOption struct {
216 f func(*serverOptions)
217}
218
219func (fdo *funcServerOption) apply(do *serverOptions) {
220 fdo.f(do)
221}
222
223func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
224 return &funcServerOption{
225 f: f,
226 }
227}
William Kurkianea869482019-04-09 15:16:11 -0400228
Abhay Kumara61c5222025-11-10 07:32:50 +0000229// joinServerOption provides a way to combine arbitrary number of server
230// options into one.
231type joinServerOption struct {
232 opts []ServerOption
233}
234
235func (mdo *joinServerOption) apply(do *serverOptions) {
236 for _, opt := range mdo.opts {
237 opt.apply(do)
238 }
239}
240
241func newJoinServerOption(opts ...ServerOption) ServerOption {
242 return &joinServerOption{opts: opts}
243}
244
245// SharedWriteBuffer allows reusing per-connection transport write buffer.
246// If this option is set to true every connection will release the buffer after
247// flushing the data on the wire.
248//
249// # Experimental
250//
251// Notice: This API is EXPERIMENTAL and may be changed or removed in a
252// later release.
253func SharedWriteBuffer(val bool) ServerOption {
254 return newFuncServerOption(func(o *serverOptions) {
255 o.sharedWriteBuffer = val
256 })
257}
258
259// WriteBufferSize determines how much data can be batched before doing a write
260// on the wire. The default value for this buffer is 32KB. Zero or negative
261// values will disable the write buffer such that each write will be on underlying
262// connection. Note: A Send call may not directly translate to a write.
William Kurkianea869482019-04-09 15:16:11 -0400263func WriteBufferSize(s int) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530264 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400265 o.writeBufferSize = s
Abhilash S.L3b494632019-07-16 15:51:09 +0530266 })
William Kurkianea869482019-04-09 15:16:11 -0400267}
268
Abhay Kumara61c5222025-11-10 07:32:50 +0000269// ReadBufferSize lets you set the size of read buffer, this determines how much
270// data can be read at most for one read syscall. The default value for this
271// buffer is 32KB. Zero or negative values will disable read buffer for a
272// connection so data framer can access the underlying conn directly.
William Kurkianea869482019-04-09 15:16:11 -0400273func ReadBufferSize(s int) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530274 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400275 o.readBufferSize = s
Abhilash S.L3b494632019-07-16 15:51:09 +0530276 })
William Kurkianea869482019-04-09 15:16:11 -0400277}
278
279// InitialWindowSize returns a ServerOption that sets window size for stream.
280// The lower bound for window size is 64K and any value smaller than that will be ignored.
281func InitialWindowSize(s int32) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530282 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400283 o.initialWindowSize = s
Abhay Kumara61c5222025-11-10 07:32:50 +0000284 o.staticWindowSize = true
Abhilash S.L3b494632019-07-16 15:51:09 +0530285 })
William Kurkianea869482019-04-09 15:16:11 -0400286}
287
288// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
289// The lower bound for window size is 64K and any value smaller than that will be ignored.
290func InitialConnWindowSize(s int32) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530291 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400292 o.initialConnWindowSize = s
Abhay Kumara61c5222025-11-10 07:32:50 +0000293 o.staticWindowSize = true
294 })
295}
296
297// StaticStreamWindowSize returns a ServerOption to set the initial stream
298// window size to the value provided and disables dynamic flow control.
299// The lower bound for window size is 64K and any value smaller than that
300// will be ignored.
301func StaticStreamWindowSize(s int32) ServerOption {
302 return newFuncServerOption(func(o *serverOptions) {
303 o.initialWindowSize = s
304 o.staticWindowSize = true
305 })
306}
307
308// StaticConnWindowSize returns a ServerOption to set the initial connection
309// window size to the value provided and disables dynamic flow control.
310// The lower bound for window size is 64K and any value smaller than that
311// will be ignored.
312func StaticConnWindowSize(s int32) ServerOption {
313 return newFuncServerOption(func(o *serverOptions) {
314 o.initialConnWindowSize = s
315 o.staticWindowSize = true
Abhilash S.L3b494632019-07-16 15:51:09 +0530316 })
William Kurkianea869482019-04-09 15:16:11 -0400317}
318
319// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
320func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
Abhay Kumara61c5222025-11-10 07:32:50 +0000321 if kp.Time > 0 && kp.Time < internal.KeepaliveMinServerPingTime {
322 logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
323 kp.Time = internal.KeepaliveMinServerPingTime
William Kurkianea869482019-04-09 15:16:11 -0400324 }
325
Abhilash S.L3b494632019-07-16 15:51:09 +0530326 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400327 o.keepaliveParams = kp
Abhilash S.L3b494632019-07-16 15:51:09 +0530328 })
William Kurkianea869482019-04-09 15:16:11 -0400329}
330
331// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
332func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530333 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400334 o.keepalivePolicy = kep
Abhilash S.L3b494632019-07-16 15:51:09 +0530335 })
William Kurkianea869482019-04-09 15:16:11 -0400336}
337
338// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
339//
340// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
Abhay Kumara61c5222025-11-10 07:32:50 +0000341//
342// Deprecated: register codecs using encoding.RegisterCodec. The server will
343// automatically use registered codecs based on the incoming requests' headers.
344// See also
345// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
346// Will be supported throughout 1.x.
William Kurkianea869482019-04-09 15:16:11 -0400347func CustomCodec(codec Codec) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530348 return newFuncServerOption(func(o *serverOptions) {
Abhay Kumara61c5222025-11-10 07:32:50 +0000349 o.codec = newCodecV0Bridge(codec)
350 })
351}
352
353// ForceServerCodec returns a ServerOption that sets a codec for message
354// marshaling and unmarshaling.
355//
356// This will override any lookups by content-subtype for Codecs registered
357// with RegisterCodec.
358//
359// See Content-Type on
360// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
361// more details. Also see the documentation on RegisterCodec and
362// CallContentSubtype for more details on the interaction between encoding.Codec
363// and content-subtype.
364//
365// This function is provided for advanced users; prefer to register codecs
366// using encoding.RegisterCodec.
367// The server will automatically use registered codecs based on the incoming
368// requests' headers. See also
369// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
370// Will be supported throughout 1.x.
371//
372// # Experimental
373//
374// Notice: This API is EXPERIMENTAL and may be changed or removed in a
375// later release.
376func ForceServerCodec(codec encoding.Codec) ServerOption {
377 return newFuncServerOption(func(o *serverOptions) {
378 o.codec = newCodecV1Bridge(codec)
379 })
380}
381
382// ForceServerCodecV2 is the equivalent of ForceServerCodec, but for the new
383// CodecV2 interface.
384//
385// Will be supported throughout 1.x.
386//
387// # Experimental
388//
389// Notice: This API is EXPERIMENTAL and may be changed or removed in a
390// later release.
391func ForceServerCodecV2(codecV2 encoding.CodecV2) ServerOption {
392 return newFuncServerOption(func(o *serverOptions) {
393 o.codec = codecV2
Abhilash S.L3b494632019-07-16 15:51:09 +0530394 })
William Kurkianea869482019-04-09 15:16:11 -0400395}
396
397// RPCCompressor returns a ServerOption that sets a compressor for outbound
398// messages. For backward compatibility, all outbound messages will be sent
399// using this compressor, regardless of incoming message compression. By
400// default, server messages will be sent using the same compressor with which
401// request messages were sent.
402//
Abhay Kumara61c5222025-11-10 07:32:50 +0000403// Deprecated: use encoding.RegisterCompressor instead. Will be supported
404// throughout 1.x.
William Kurkianea869482019-04-09 15:16:11 -0400405func RPCCompressor(cp Compressor) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530406 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400407 o.cp = cp
Abhilash S.L3b494632019-07-16 15:51:09 +0530408 })
William Kurkianea869482019-04-09 15:16:11 -0400409}
410
411// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
412// messages. It has higher priority than decompressors registered via
413// encoding.RegisterCompressor.
414//
Abhay Kumara61c5222025-11-10 07:32:50 +0000415// Deprecated: use encoding.RegisterCompressor instead. Will be supported
416// throughout 1.x.
William Kurkianea869482019-04-09 15:16:11 -0400417func RPCDecompressor(dc Decompressor) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530418 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400419 o.dc = dc
Abhilash S.L3b494632019-07-16 15:51:09 +0530420 })
William Kurkianea869482019-04-09 15:16:11 -0400421}
422
423// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
424// If this is not set, gRPC uses the default limit.
425//
Abhay Kumara61c5222025-11-10 07:32:50 +0000426// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
William Kurkianea869482019-04-09 15:16:11 -0400427func MaxMsgSize(m int) ServerOption {
428 return MaxRecvMsgSize(m)
429}
430
431// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
432// If this is not set, gRPC uses the default 4MB.
433func MaxRecvMsgSize(m int) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530434 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400435 o.maxReceiveMessageSize = m
Abhilash S.L3b494632019-07-16 15:51:09 +0530436 })
William Kurkianea869482019-04-09 15:16:11 -0400437}
438
439// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
440// If this is not set, gRPC uses the default `math.MaxInt32`.
441func MaxSendMsgSize(m int) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530442 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400443 o.maxSendMessageSize = m
Abhilash S.L3b494632019-07-16 15:51:09 +0530444 })
William Kurkianea869482019-04-09 15:16:11 -0400445}
446
447// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
448// of concurrent streams to each ServerTransport.
449func MaxConcurrentStreams(n uint32) ServerOption {
Abhay Kumara61c5222025-11-10 07:32:50 +0000450 if n == 0 {
451 n = math.MaxUint32
452 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530453 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400454 o.maxConcurrentStreams = n
Abhilash S.L3b494632019-07-16 15:51:09 +0530455 })
William Kurkianea869482019-04-09 15:16:11 -0400456}
457
458// Creds returns a ServerOption that sets credentials for server connections.
459func Creds(c credentials.TransportCredentials) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530460 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400461 o.creds = c
Abhilash S.L3b494632019-07-16 15:51:09 +0530462 })
William Kurkianea869482019-04-09 15:16:11 -0400463}
464
465// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
466// server. Only one unary interceptor can be installed. The construction of multiple
467// interceptors (e.g., chaining) can be implemented at the caller.
468func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530469 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400470 if o.unaryInt != nil {
471 panic("The unary server interceptor was already set and may not be reset.")
472 }
473 o.unaryInt = i
Abhilash S.L3b494632019-07-16 15:51:09 +0530474 })
William Kurkianea869482019-04-09 15:16:11 -0400475}
476
Abhay Kumara61c5222025-11-10 07:32:50 +0000477// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
478// for unary RPCs. The first interceptor will be the outer most,
479// while the last interceptor will be the inner most wrapper around the real call.
480// All unary interceptors added by this method will be chained.
481func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
482 return newFuncServerOption(func(o *serverOptions) {
483 o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
484 })
485}
486
William Kurkianea869482019-04-09 15:16:11 -0400487// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
488// server. Only one stream interceptor can be installed.
489func StreamInterceptor(i StreamServerInterceptor) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530490 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400491 if o.streamInt != nil {
492 panic("The stream server interceptor was already set and may not be reset.")
493 }
494 o.streamInt = i
Abhilash S.L3b494632019-07-16 15:51:09 +0530495 })
William Kurkianea869482019-04-09 15:16:11 -0400496}
497
Abhay Kumara61c5222025-11-10 07:32:50 +0000498// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
499// for streaming RPCs. The first interceptor will be the outer most,
500// while the last interceptor will be the inner most wrapper around the real call.
501// All stream interceptors added by this method will be chained.
502func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
503 return newFuncServerOption(func(o *serverOptions) {
504 o.chainStreamInts = append(o.chainStreamInts, interceptors...)
505 })
506}
507
William Kurkianea869482019-04-09 15:16:11 -0400508// InTapHandle returns a ServerOption that sets the tap handle for all the server
509// transport to be created. Only one can be installed.
Abhay Kumara61c5222025-11-10 07:32:50 +0000510//
511// # Experimental
512//
513// Notice: This API is EXPERIMENTAL and may be changed or removed in a
514// later release.
William Kurkianea869482019-04-09 15:16:11 -0400515func InTapHandle(h tap.ServerInHandle) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530516 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400517 if o.inTapHandle != nil {
518 panic("The tap handle was already set and may not be reset.")
519 }
520 o.inTapHandle = h
Abhilash S.L3b494632019-07-16 15:51:09 +0530521 })
William Kurkianea869482019-04-09 15:16:11 -0400522}
523
524// StatsHandler returns a ServerOption that sets the stats handler for the server.
525func StatsHandler(h stats.Handler) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530526 return newFuncServerOption(func(o *serverOptions) {
Abhay Kumara61c5222025-11-10 07:32:50 +0000527 if h == nil {
528 logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
529 // Do not allow a nil stats handler, which would otherwise cause
530 // panics.
531 return
532 }
533 o.statsHandlers = append(o.statsHandlers, h)
534 })
535}
536
537// binaryLogger returns a ServerOption that can set the binary logger for the
538// server.
539func binaryLogger(bl binarylog.Logger) ServerOption {
540 return newFuncServerOption(func(o *serverOptions) {
541 o.binaryLogger = bl
Abhilash S.L3b494632019-07-16 15:51:09 +0530542 })
William Kurkianea869482019-04-09 15:16:11 -0400543}
544
545// UnknownServiceHandler returns a ServerOption that allows for adding a custom
546// unknown service handler. The provided method is a bidi-streaming RPC service
547// handler that will be invoked instead of returning the "unimplemented" gRPC
548// error whenever a request is received for an unregistered service or method.
Abhay Kumara61c5222025-11-10 07:32:50 +0000549// The handling function and stream interceptor (if set) have full access to
550// the ServerStream, including its Context.
William Kurkianea869482019-04-09 15:16:11 -0400551func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530552 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400553 o.unknownStreamDesc = &StreamDesc{
554 StreamName: "unknown_service_handler",
555 Handler: streamHandler,
556 // We need to assume that the users of the streamHandler will want to use both.
557 ClientStreams: true,
558 ServerStreams: true,
559 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530560 })
William Kurkianea869482019-04-09 15:16:11 -0400561}
562
563// ConnectionTimeout returns a ServerOption that sets the timeout for
564// connection establishment (up to and including HTTP/2 handshaking) for all
565// new connections. If this is not set, the default is 120 seconds. A zero or
566// negative value will result in an immediate timeout.
567//
Abhay Kumara61c5222025-11-10 07:32:50 +0000568// # Experimental
569//
570// Notice: This API is EXPERIMENTAL and may be changed or removed in a
571// later release.
William Kurkianea869482019-04-09 15:16:11 -0400572func ConnectionTimeout(d time.Duration) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530573 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400574 o.connectionTimeout = d
Abhilash S.L3b494632019-07-16 15:51:09 +0530575 })
William Kurkianea869482019-04-09 15:16:11 -0400576}
577
Abhay Kumara61c5222025-11-10 07:32:50 +0000578// MaxHeaderListSizeServerOption is a ServerOption that sets the max
579// (uncompressed) size of header list that the server is prepared to accept.
580type MaxHeaderListSizeServerOption struct {
581 MaxHeaderListSize uint32
582}
583
584func (o MaxHeaderListSizeServerOption) apply(so *serverOptions) {
585 so.maxHeaderListSize = &o.MaxHeaderListSize
586}
587
William Kurkianea869482019-04-09 15:16:11 -0400588// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
589// of header list that the server is prepared to accept.
590func MaxHeaderListSize(s uint32) ServerOption {
Abhay Kumara61c5222025-11-10 07:32:50 +0000591 return MaxHeaderListSizeServerOption{
592 MaxHeaderListSize: s,
593 }
William Kurkianea869482019-04-09 15:16:11 -0400594}
595
Devmalya Pauldd23a992019-11-14 07:06:31 +0000596// HeaderTableSize returns a ServerOption that sets the size of dynamic
597// header table for stream.
598//
Abhay Kumara61c5222025-11-10 07:32:50 +0000599// # Experimental
600//
601// Notice: This API is EXPERIMENTAL and may be changed or removed in a
602// later release.
Devmalya Pauldd23a992019-11-14 07:06:31 +0000603func HeaderTableSize(s uint32) ServerOption {
604 return newFuncServerOption(func(o *serverOptions) {
605 o.headerTableSize = &s
606 })
607}
608
Abhay Kumara61c5222025-11-10 07:32:50 +0000609// NumStreamWorkers returns a ServerOption that sets the number of worker
610// goroutines that should be used to process incoming streams. Setting this to
611// zero (default) will disable workers and spawn a new goroutine for each
612// stream.
613//
614// # Experimental
615//
616// Notice: This API is EXPERIMENTAL and may be changed or removed in a
617// later release.
618func NumStreamWorkers(numServerWorkers uint32) ServerOption {
619 // TODO: If/when this API gets stabilized (i.e. stream workers become the
620 // only way streams are processed), change the behavior of the zero value to
621 // a sane default. Preliminary experiments suggest that a value equal to the
622 // number of CPUs available is most performant; requires thorough testing.
623 return newFuncServerOption(func(o *serverOptions) {
624 o.numServerWorkers = numServerWorkers
625 })
626}
627
628// WaitForHandlers cause Stop to wait until all outstanding method handlers have
629// exited before returning. If false, Stop will return as soon as all
630// connections have closed, but method handlers may still be running. By
631// default, Stop does not wait for method handlers to return.
632//
633// # Experimental
634//
635// Notice: This API is EXPERIMENTAL and may be changed or removed in a
636// later release.
637func WaitForHandlers(w bool) ServerOption {
638 return newFuncServerOption(func(o *serverOptions) {
639 o.waitForHandlers = w
640 })
641}
642
643func bufferPool(bufferPool mem.BufferPool) ServerOption {
644 return newFuncServerOption(func(o *serverOptions) {
645 o.bufferPool = bufferPool
646 })
647}
648
649// serverWorkerResetThreshold defines how often the stack must be reset. Every
650// N requests, by spawning a new goroutine in its place, a worker can reset its
651// stack so that large stacks don't live in memory forever. 2^16 should allow
652// each goroutine stack to live for at least a few seconds in a typical
653// workload (assuming a QPS of a few thousand requests/sec).
654const serverWorkerResetThreshold = 1 << 16
655
656// serverWorker blocks on a *transport.ServerStream channel forever and waits
657// for data to be fed by serveStreams. This allows multiple requests to be
658// processed by the same goroutine, removing the need for expensive stack
659// re-allocations (see the runtime.morestack problem [1]).
660//
661// [1] https://github.com/golang/go/issues/18138
662func (s *Server) serverWorker() {
663 for completed := 0; completed < serverWorkerResetThreshold; completed++ {
664 f, ok := <-s.serverWorkerChannel
665 if !ok {
666 return
667 }
668 f()
669 }
670 go s.serverWorker()
671}
672
673// initServerWorkers creates worker goroutines and a channel to process incoming
674// connections to reduce the time spent overall on runtime.morestack.
675func (s *Server) initServerWorkers() {
676 s.serverWorkerChannel = make(chan func())
677 s.serverWorkerChannelClose = sync.OnceFunc(func() {
678 close(s.serverWorkerChannel)
679 })
680 for i := uint32(0); i < s.opts.numServerWorkers; i++ {
681 go s.serverWorker()
682 }
683}
684
William Kurkianea869482019-04-09 15:16:11 -0400685// NewServer creates a gRPC server which has no service registered and has not
686// started to accept requests yet.
687func NewServer(opt ...ServerOption) *Server {
688 opts := defaultServerOptions
Abhay Kumara61c5222025-11-10 07:32:50 +0000689 for _, o := range globalServerOptions {
690 o.apply(&opts)
691 }
William Kurkianea869482019-04-09 15:16:11 -0400692 for _, o := range opt {
Abhilash S.L3b494632019-07-16 15:51:09 +0530693 o.apply(&opts)
William Kurkianea869482019-04-09 15:16:11 -0400694 }
695 s := &Server{
bseeniva0b9cbcb2026-02-12 19:11:11 +0530696 lis: make(map[net.Listener]bool),
697 opts: opts,
698 statsHandler: istats.NewCombinedHandler(opts.statsHandlers...),
699 conns: make(map[string]map[transport.ServerTransport]bool),
700 services: make(map[string]*serviceInfo),
701 quit: grpcsync.NewEvent(),
702 done: grpcsync.NewEvent(),
703 channelz: channelz.RegisterServer(""),
William Kurkianea869482019-04-09 15:16:11 -0400704 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000705 chainUnaryServerInterceptors(s)
706 chainStreamServerInterceptors(s)
William Kurkianea869482019-04-09 15:16:11 -0400707 s.cv = sync.NewCond(&s.mu)
708 if EnableTracing {
709 _, file, line, _ := runtime.Caller(1)
Abhay Kumara61c5222025-11-10 07:32:50 +0000710 s.events = newTraceEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
William Kurkianea869482019-04-09 15:16:11 -0400711 }
712
Abhay Kumara61c5222025-11-10 07:32:50 +0000713 if s.opts.numServerWorkers > 0 {
714 s.initServerWorkers()
William Kurkianea869482019-04-09 15:16:11 -0400715 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000716
717 channelz.Info(logger, s.channelz, "Server created")
William Kurkianea869482019-04-09 15:16:11 -0400718 return s
719}
720
721// printf records an event in s's event log, unless s has been stopped.
722// REQUIRES s.mu is held.
Abhay Kumara61c5222025-11-10 07:32:50 +0000723func (s *Server) printf(format string, a ...any) {
William Kurkianea869482019-04-09 15:16:11 -0400724 if s.events != nil {
725 s.events.Printf(format, a...)
726 }
727}
728
729// errorf records an error in s's event log, unless s has been stopped.
730// REQUIRES s.mu is held.
Abhay Kumara61c5222025-11-10 07:32:50 +0000731func (s *Server) errorf(format string, a ...any) {
William Kurkianea869482019-04-09 15:16:11 -0400732 if s.events != nil {
733 s.events.Errorf(format, a...)
734 }
735}
736
Abhay Kumara61c5222025-11-10 07:32:50 +0000737// ServiceRegistrar wraps a single method that supports service registration. It
738// enables users to pass concrete types other than grpc.Server to the service
739// registration methods exported by the IDL generated code.
740type ServiceRegistrar interface {
741 // RegisterService registers a service and its implementation to the
742 // concrete type implementing this interface. It may not be called
743 // once the server has started serving.
744 // desc describes the service and its methods and handlers. impl is the
745 // service implementation which is passed to the method handlers.
746 RegisterService(desc *ServiceDesc, impl any)
747}
748
William Kurkianea869482019-04-09 15:16:11 -0400749// RegisterService registers a service and its implementation to the gRPC
750// server. It is called from the IDL generated code. This must be called before
Abhay Kumara61c5222025-11-10 07:32:50 +0000751// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
752// ensure it implements sd.HandlerType.
753func (s *Server) RegisterService(sd *ServiceDesc, ss any) {
754 if ss != nil {
755 ht := reflect.TypeOf(sd.HandlerType).Elem()
756 st := reflect.TypeOf(ss)
757 if !st.Implements(ht) {
758 logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
759 }
William Kurkianea869482019-04-09 15:16:11 -0400760 }
761 s.register(sd, ss)
762}
763
Abhay Kumara61c5222025-11-10 07:32:50 +0000764func (s *Server) register(sd *ServiceDesc, ss any) {
William Kurkianea869482019-04-09 15:16:11 -0400765 s.mu.Lock()
766 defer s.mu.Unlock()
767 s.printf("RegisterService(%q)", sd.ServiceName)
768 if s.serve {
Abhay Kumara61c5222025-11-10 07:32:50 +0000769 logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
William Kurkianea869482019-04-09 15:16:11 -0400770 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000771 if _, ok := s.services[sd.ServiceName]; ok {
772 logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
William Kurkianea869482019-04-09 15:16:11 -0400773 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000774 info := &serviceInfo{
775 serviceImpl: ss,
776 methods: make(map[string]*MethodDesc),
777 streams: make(map[string]*StreamDesc),
778 mdata: sd.Metadata,
William Kurkianea869482019-04-09 15:16:11 -0400779 }
780 for i := range sd.Methods {
781 d := &sd.Methods[i]
Abhay Kumara61c5222025-11-10 07:32:50 +0000782 info.methods[d.MethodName] = d
William Kurkianea869482019-04-09 15:16:11 -0400783 }
784 for i := range sd.Streams {
785 d := &sd.Streams[i]
Abhay Kumara61c5222025-11-10 07:32:50 +0000786 info.streams[d.StreamName] = d
William Kurkianea869482019-04-09 15:16:11 -0400787 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000788 s.services[sd.ServiceName] = info
William Kurkianea869482019-04-09 15:16:11 -0400789}
790
791// MethodInfo contains the information of an RPC including its method name and type.
792type MethodInfo struct {
793 // Name is the method name only, without the service name or package name.
794 Name string
795 // IsClientStream indicates whether the RPC is a client streaming RPC.
796 IsClientStream bool
797 // IsServerStream indicates whether the RPC is a server streaming RPC.
798 IsServerStream bool
799}
800
801// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
802type ServiceInfo struct {
803 Methods []MethodInfo
804 // Metadata is the metadata specified in ServiceDesc when registering service.
Abhay Kumara61c5222025-11-10 07:32:50 +0000805 Metadata any
William Kurkianea869482019-04-09 15:16:11 -0400806}
807
808// GetServiceInfo returns a map from service names to ServiceInfo.
809// Service names include the package names, in the form of <package>.<service>.
810func (s *Server) GetServiceInfo() map[string]ServiceInfo {
811 ret := make(map[string]ServiceInfo)
Abhay Kumara61c5222025-11-10 07:32:50 +0000812 for n, srv := range s.services {
813 methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
814 for m := range srv.methods {
William Kurkianea869482019-04-09 15:16:11 -0400815 methods = append(methods, MethodInfo{
816 Name: m,
817 IsClientStream: false,
818 IsServerStream: false,
819 })
820 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000821 for m, d := range srv.streams {
William Kurkianea869482019-04-09 15:16:11 -0400822 methods = append(methods, MethodInfo{
823 Name: m,
824 IsClientStream: d.ClientStreams,
825 IsServerStream: d.ServerStreams,
826 })
827 }
828
829 ret[n] = ServiceInfo{
830 Methods: methods,
831 Metadata: srv.mdata,
832 }
833 }
834 return ret
835}
836
837// ErrServerStopped indicates that the operation is now illegal because of
838// the server being stopped.
839var ErrServerStopped = errors.New("grpc: the server has been stopped")
840
William Kurkianea869482019-04-09 15:16:11 -0400841type listenSocket struct {
842 net.Listener
Abhay Kumara61c5222025-11-10 07:32:50 +0000843 channelz *channelz.Socket
William Kurkianea869482019-04-09 15:16:11 -0400844}
845
846func (l *listenSocket) Close() error {
847 err := l.Listener.Close()
Abhay Kumara61c5222025-11-10 07:32:50 +0000848 channelz.RemoveEntry(l.channelz.ID)
849 channelz.Info(logger, l.channelz, "ListenSocket deleted")
William Kurkianea869482019-04-09 15:16:11 -0400850 return err
851}
852
853// Serve accepts incoming connections on the listener lis, creating a new
854// ServerTransport and service goroutine for each. The service goroutines
855// read gRPC requests and then call the registered handlers to reply to them.
856// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
857// this method returns.
858// Serve will return a non-nil error unless Stop or GracefulStop is called.
Abhay Kumara61c5222025-11-10 07:32:50 +0000859//
860// Note: All supported releases of Go (as of December 2023) override the OS
861// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
862// with OS defaults for keepalive time and interval, callers need to do the
863// following two things:
864// - pass a net.Listener created by calling the Listen method on a
865// net.ListenConfig with the `KeepAlive` field set to a negative value. This
866// will result in the Go standard library not overriding OS defaults for TCP
867// keepalive interval and time. But this will also result in the Go standard
868// library not enabling TCP keepalives by default.
869// - override the Accept method on the passed in net.Listener and set the
870// SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
William Kurkianea869482019-04-09 15:16:11 -0400871func (s *Server) Serve(lis net.Listener) error {
872 s.mu.Lock()
873 s.printf("serving")
874 s.serve = true
875 if s.lis == nil {
876 // Serve called after Stop or GracefulStop.
877 s.mu.Unlock()
878 lis.Close()
879 return ErrServerStopped
880 }
881
882 s.serveWG.Add(1)
883 defer func() {
884 s.serveWG.Done()
David Bainbridge788e5202019-10-21 18:49:40 +0000885 if s.quit.HasFired() {
886 // Stop or GracefulStop called; block until done and return nil.
887 <-s.done.Done()
William Kurkianea869482019-04-09 15:16:11 -0400888 }
889 }()
890
Abhay Kumara61c5222025-11-10 07:32:50 +0000891 ls := &listenSocket{
892 Listener: lis,
893 channelz: channelz.RegisterSocket(&channelz.Socket{
894 SocketType: channelz.SocketTypeListen,
895 Parent: s.channelz,
896 RefName: lis.Addr().String(),
897 LocalAddr: lis.Addr(),
898 SocketOptions: channelz.GetSocketOption(lis)},
899 ),
William Kurkianea869482019-04-09 15:16:11 -0400900 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000901 s.lis[ls] = true
William Kurkianea869482019-04-09 15:16:11 -0400902
903 defer func() {
904 s.mu.Lock()
905 if s.lis != nil && s.lis[ls] {
906 ls.Close()
907 delete(s.lis, ls)
908 }
909 s.mu.Unlock()
910 }()
911
Abhay Kumara61c5222025-11-10 07:32:50 +0000912 s.mu.Unlock()
913 channelz.Info(logger, ls.channelz, "ListenSocket created")
William Kurkianea869482019-04-09 15:16:11 -0400914
Abhay Kumara61c5222025-11-10 07:32:50 +0000915 var tempDelay time.Duration // how long to sleep on accept failure
William Kurkianea869482019-04-09 15:16:11 -0400916 for {
917 rawConn, err := lis.Accept()
918 if err != nil {
919 if ne, ok := err.(interface {
920 Temporary() bool
921 }); ok && ne.Temporary() {
922 if tempDelay == 0 {
923 tempDelay = 5 * time.Millisecond
924 } else {
925 tempDelay *= 2
926 }
927 if max := 1 * time.Second; tempDelay > max {
928 tempDelay = max
929 }
930 s.mu.Lock()
931 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
932 s.mu.Unlock()
933 timer := time.NewTimer(tempDelay)
934 select {
935 case <-timer.C:
David Bainbridge788e5202019-10-21 18:49:40 +0000936 case <-s.quit.Done():
William Kurkianea869482019-04-09 15:16:11 -0400937 timer.Stop()
938 return nil
939 }
940 continue
941 }
942 s.mu.Lock()
943 s.printf("done serving; Accept = %v", err)
944 s.mu.Unlock()
945
David Bainbridge788e5202019-10-21 18:49:40 +0000946 if s.quit.HasFired() {
William Kurkianea869482019-04-09 15:16:11 -0400947 return nil
William Kurkianea869482019-04-09 15:16:11 -0400948 }
949 return err
950 }
951 tempDelay = 0
952 // Start a new goroutine to deal with rawConn so we don't stall this Accept
953 // loop goroutine.
954 //
955 // Make sure we account for the goroutine so GracefulStop doesn't nil out
956 // s.conns before this conn can be added.
957 s.serveWG.Add(1)
958 go func() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000959 s.handleRawConn(lis.Addr().String(), rawConn)
William Kurkianea869482019-04-09 15:16:11 -0400960 s.serveWG.Done()
961 }()
962 }
963}
964
965// handleRawConn forks a goroutine to handle a just-accepted connection that
966// has not had any I/O performed on it yet.
Abhay Kumara61c5222025-11-10 07:32:50 +0000967func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
David Bainbridge788e5202019-10-21 18:49:40 +0000968 if s.quit.HasFired() {
969 rawConn.Close()
970 return
971 }
William Kurkianea869482019-04-09 15:16:11 -0400972 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
William Kurkianea869482019-04-09 15:16:11 -0400973
William Kurkianea869482019-04-09 15:16:11 -0400974 // Finish handshaking (HTTP2)
Abhay Kumara61c5222025-11-10 07:32:50 +0000975 st := s.newHTTP2Transport(rawConn)
976 rawConn.SetDeadline(time.Time{})
William Kurkianea869482019-04-09 15:16:11 -0400977 if st == nil {
978 return
979 }
980
Abhay Kumara61c5222025-11-10 07:32:50 +0000981 if cc, ok := rawConn.(interface {
982 PassServerTransport(transport.ServerTransport)
983 }); ok {
984 cc.PassServerTransport(st)
985 }
986
987 if !s.addConn(lisAddr, st) {
William Kurkianea869482019-04-09 15:16:11 -0400988 return
989 }
990 go func() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000991 s.serveStreams(context.Background(), st, rawConn)
992 s.removeConn(lisAddr, st)
William Kurkianea869482019-04-09 15:16:11 -0400993 }()
994}
995
996// newHTTP2Transport sets up a http/2 transport (using the
997// gRPC http2 server transport in transport/http2_server.go).
Abhay Kumara61c5222025-11-10 07:32:50 +0000998func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
William Kurkianea869482019-04-09 15:16:11 -0400999 config := &transport.ServerConfig{
1000 MaxStreams: s.opts.maxConcurrentStreams,
Abhay Kumara61c5222025-11-10 07:32:50 +00001001 ConnectionTimeout: s.opts.connectionTimeout,
1002 Credentials: s.opts.creds,
William Kurkianea869482019-04-09 15:16:11 -04001003 InTapHandle: s.opts.inTapHandle,
bseeniva0b9cbcb2026-02-12 19:11:11 +05301004 StatsHandler: s.statsHandler,
William Kurkianea869482019-04-09 15:16:11 -04001005 KeepaliveParams: s.opts.keepaliveParams,
1006 KeepalivePolicy: s.opts.keepalivePolicy,
1007 InitialWindowSize: s.opts.initialWindowSize,
1008 InitialConnWindowSize: s.opts.initialConnWindowSize,
1009 WriteBufferSize: s.opts.writeBufferSize,
1010 ReadBufferSize: s.opts.readBufferSize,
Abhay Kumara61c5222025-11-10 07:32:50 +00001011 SharedWriteBuffer: s.opts.sharedWriteBuffer,
1012 ChannelzParent: s.channelz,
William Kurkianea869482019-04-09 15:16:11 -04001013 MaxHeaderListSize: s.opts.maxHeaderListSize,
Devmalya Pauldd23a992019-11-14 07:06:31 +00001014 HeaderTableSize: s.opts.headerTableSize,
Abhay Kumara61c5222025-11-10 07:32:50 +00001015 BufferPool: s.opts.bufferPool,
1016 StaticWindowSize: s.opts.staticWindowSize,
William Kurkianea869482019-04-09 15:16:11 -04001017 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001018 st, err := transport.NewServerTransport(c, config)
William Kurkianea869482019-04-09 15:16:11 -04001019 if err != nil {
1020 s.mu.Lock()
1021 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
1022 s.mu.Unlock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001023 // ErrConnDispatched means that the connection was dispatched away from
1024 // gRPC; those connections should be left open.
1025 if err != credentials.ErrConnDispatched {
1026 // Don't log on ErrConnDispatched and io.EOF to prevent log spam.
1027 if err != io.EOF {
1028 channelz.Info(logger, s.channelz, "grpc: Server.Serve failed to create ServerTransport: ", err)
1029 }
1030 c.Close()
1031 }
William Kurkianea869482019-04-09 15:16:11 -04001032 return nil
1033 }
1034
1035 return st
1036}
1037
Abhay Kumara61c5222025-11-10 07:32:50 +00001038func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) {
1039 ctx = transport.SetConnection(ctx, rawConn)
1040 ctx = peer.NewContext(ctx, st.Peer())
bseeniva0b9cbcb2026-02-12 19:11:11 +05301041 if s.statsHandler != nil {
1042 ctx = s.statsHandler.TagConn(ctx, &stats.ConnTagInfo{
Abhay Kumara61c5222025-11-10 07:32:50 +00001043 RemoteAddr: st.Peer().Addr,
1044 LocalAddr: st.Peer().LocalAddr,
1045 })
bseeniva0b9cbcb2026-02-12 19:11:11 +05301046 s.statsHandler.HandleConn(ctx, &stats.ConnBegin{})
Abhay Kumara61c5222025-11-10 07:32:50 +00001047 }
1048
1049 defer func() {
1050 st.Close(errors.New("finished serving streams for the server transport"))
bseeniva0b9cbcb2026-02-12 19:11:11 +05301051 if s.statsHandler != nil {
1052 s.statsHandler.HandleConn(ctx, &stats.ConnEnd{})
William Kurkianea869482019-04-09 15:16:11 -04001053 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001054 }()
1055
1056 streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
1057 st.HandleStreams(ctx, func(stream *transport.ServerStream) {
1058 s.handlersWG.Add(1)
1059 streamQuota.acquire()
1060 f := func() {
1061 defer streamQuota.release()
1062 defer s.handlersWG.Done()
1063 s.handleStream(st, stream)
1064 }
1065
1066 if s.opts.numServerWorkers > 0 {
1067 select {
1068 case s.serverWorkerChannel <- f:
1069 return
1070 default:
1071 // If all stream workers are busy, fallback to the default code path.
1072 }
1073 }
1074 go f()
William Kurkianea869482019-04-09 15:16:11 -04001075 })
William Kurkianea869482019-04-09 15:16:11 -04001076}
1077
1078var _ http.Handler = (*Server)(nil)
1079
1080// ServeHTTP implements the Go standard library's http.Handler
1081// interface by responding to the gRPC request r, by looking up
1082// the requested gRPC method in the gRPC server s.
1083//
1084// The provided HTTP request must have arrived on an HTTP/2
1085// connection. When using the Go standard library's server,
1086// practically this means that the Request must also have arrived
1087// over TLS.
1088//
1089// To share one port (such as 443 for https) between gRPC and an
1090// existing http.Handler, use a root http.Handler such as:
1091//
Abhay Kumara61c5222025-11-10 07:32:50 +00001092// if r.ProtoMajor == 2 && strings.HasPrefix(
1093// r.Header.Get("Content-Type"), "application/grpc") {
1094// grpcServer.ServeHTTP(w, r)
1095// } else {
1096// yourMux.ServeHTTP(w, r)
1097// }
William Kurkianea869482019-04-09 15:16:11 -04001098//
1099// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
1100// separate from grpc-go's HTTP/2 server. Performance and features may vary
1101// between the two paths. ServeHTTP does not support some gRPC features
Abhay Kumara61c5222025-11-10 07:32:50 +00001102// available through grpc-go's HTTP/2 server.
1103//
1104// # Experimental
1105//
1106// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1107// later release.
William Kurkianea869482019-04-09 15:16:11 -04001108func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
bseeniva0b9cbcb2026-02-12 19:11:11 +05301109 st, err := transport.NewServerHandlerTransport(w, r, s.statsHandler, s.opts.bufferPool)
William Kurkianea869482019-04-09 15:16:11 -04001110 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001111 // Errors returned from transport.NewServerHandlerTransport have
1112 // already been written to w.
William Kurkianea869482019-04-09 15:16:11 -04001113 return
1114 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001115 if !s.addConn(listenerAddressForServeHTTP, st) {
William Kurkianea869482019-04-09 15:16:11 -04001116 return
1117 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001118 defer s.removeConn(listenerAddressForServeHTTP, st)
1119 s.serveStreams(r.Context(), st, nil)
William Kurkianea869482019-04-09 15:16:11 -04001120}
1121
Abhay Kumara61c5222025-11-10 07:32:50 +00001122func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
William Kurkianea869482019-04-09 15:16:11 -04001123 s.mu.Lock()
1124 defer s.mu.Unlock()
1125 if s.conns == nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001126 st.Close(errors.New("Server.addConn called when server has already been stopped"))
William Kurkianea869482019-04-09 15:16:11 -04001127 return false
1128 }
1129 if s.drain {
1130 // Transport added after we drained our existing conns: drain it
1131 // immediately.
Abhay Kumara61c5222025-11-10 07:32:50 +00001132 st.Drain("")
William Kurkianea869482019-04-09 15:16:11 -04001133 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001134
1135 if s.conns[addr] == nil {
1136 // Create a map entry if this is the first connection on this listener.
1137 s.conns[addr] = make(map[transport.ServerTransport]bool)
1138 }
1139 s.conns[addr][st] = true
William Kurkianea869482019-04-09 15:16:11 -04001140 return true
1141}
1142
Abhay Kumara61c5222025-11-10 07:32:50 +00001143func (s *Server) removeConn(addr string, st transport.ServerTransport) {
William Kurkianea869482019-04-09 15:16:11 -04001144 s.mu.Lock()
1145 defer s.mu.Unlock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001146
1147 conns := s.conns[addr]
1148 if conns != nil {
1149 delete(conns, st)
1150 if len(conns) == 0 {
1151 // If the last connection for this address is being removed, also
1152 // remove the map entry corresponding to the address. This is used
1153 // in GracefulStop() when waiting for all connections to be closed.
1154 delete(s.conns, addr)
1155 }
William Kurkianea869482019-04-09 15:16:11 -04001156 s.cv.Broadcast()
1157 }
1158}
1159
William Kurkianea869482019-04-09 15:16:11 -04001160func (s *Server) incrCallsStarted() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001161 s.channelz.ServerMetrics.CallsStarted.Add(1)
1162 s.channelz.ServerMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
William Kurkianea869482019-04-09 15:16:11 -04001163}
1164
1165func (s *Server) incrCallsSucceeded() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001166 s.channelz.ServerMetrics.CallsSucceeded.Add(1)
William Kurkianea869482019-04-09 15:16:11 -04001167}
1168
1169func (s *Server) incrCallsFailed() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001170 s.channelz.ServerMetrics.CallsFailed.Add(1)
William Kurkianea869482019-04-09 15:16:11 -04001171}
1172
Abhay Kumara61c5222025-11-10 07:32:50 +00001173func (s *Server) sendResponse(ctx context.Context, stream *transport.ServerStream, msg any, cp Compressor, opts *transport.WriteOptions, comp encoding.Compressor) error {
William Kurkianea869482019-04-09 15:16:11 -04001174 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
1175 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001176 channelz.Error(logger, s.channelz, "grpc: server failed to encode response: ", err)
William Kurkianea869482019-04-09 15:16:11 -04001177 return err
1178 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001179
1180 compData, pf, err := compress(data, cp, comp, s.opts.bufferPool)
William Kurkianea869482019-04-09 15:16:11 -04001181 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001182 data.Free()
1183 channelz.Error(logger, s.channelz, "grpc: server failed to compress response: ", err)
William Kurkianea869482019-04-09 15:16:11 -04001184 return err
1185 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001186
1187 hdr, payload := msgHeader(data, compData, pf)
1188
1189 defer func() {
1190 compData.Free()
1191 data.Free()
1192 // payload does not need to be freed here, it is either data or compData, both of
1193 // which are already freed.
1194 }()
1195
1196 dataLen := data.Len()
1197 payloadLen := payload.Len()
William Kurkianea869482019-04-09 15:16:11 -04001198 // TODO(dfawley): should we be checking len(data) instead?
Abhay Kumara61c5222025-11-10 07:32:50 +00001199 if payloadLen > s.opts.maxSendMessageSize {
1200 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", payloadLen, s.opts.maxSendMessageSize)
William Kurkianea869482019-04-09 15:16:11 -04001201 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001202 err = stream.Write(hdr, payload, opts)
bseeniva0b9cbcb2026-02-12 19:11:11 +05301203 if err == nil && s.statsHandler != nil {
1204 s.statsHandler.HandleRPC(ctx, outPayload(false, msg, dataLen, payloadLen, time.Now()))
William Kurkianea869482019-04-09 15:16:11 -04001205 }
1206 return err
1207}
1208
Abhay Kumara61c5222025-11-10 07:32:50 +00001209// chainUnaryServerInterceptors chains all unary server interceptors into one.
1210func chainUnaryServerInterceptors(s *Server) {
1211 // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
1212 // be executed before any other chained interceptors.
1213 interceptors := s.opts.chainUnaryInts
1214 if s.opts.unaryInt != nil {
1215 interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
William Kurkianea869482019-04-09 15:16:11 -04001216 }
1217
Abhay Kumara61c5222025-11-10 07:32:50 +00001218 var chainedInt UnaryServerInterceptor
1219 if len(interceptors) == 0 {
1220 chainedInt = nil
1221 } else if len(interceptors) == 1 {
1222 chainedInt = interceptors[0]
1223 } else {
1224 chainedInt = chainUnaryInterceptors(interceptors)
1225 }
1226
1227 s.opts.unaryInt = chainedInt
1228}
1229
1230func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
1231 return func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (any, error) {
1232 return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
1233 }
1234}
1235
1236func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
1237 if curr == len(interceptors)-1 {
1238 return finalHandler
1239 }
1240 return func(ctx context.Context, req any) (any, error) {
1241 return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
1242 }
1243}
1244
1245func (s *Server) processUnaryRPC(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
bseeniva0b9cbcb2026-02-12 19:11:11 +05301246 sh := s.statsHandler
1247 if sh != nil || trInfo != nil || channelz.IsOn() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001248 if channelz.IsOn() {
1249 s.incrCallsStarted()
1250 }
1251 var statsBegin *stats.Begin
bseeniva0b9cbcb2026-02-12 19:11:11 +05301252 if sh != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001253 statsBegin = &stats.Begin{
bseeniva0b9cbcb2026-02-12 19:11:11 +05301254 BeginTime: time.Now(),
Abhay Kumara61c5222025-11-10 07:32:50 +00001255 IsClientStream: false,
1256 IsServerStream: false,
1257 }
1258 sh.HandleRPC(ctx, statsBegin)
1259 }
1260 if trInfo != nil {
1261 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1262 }
1263 // The deferred error handling for tracing, stats handler and channelz are
1264 // combined into one function to reduce stack usage -- a defer takes ~56-64
1265 // bytes on the stack, so overflowing the stack will require a stack
1266 // re-allocation, which is expensive.
1267 //
1268 // To maintain behavior similar to separate deferred statements, statements
1269 // should be executed in the reverse order. That is, tracing first, stats
1270 // handler second, and channelz last. Note that panics *within* defers will
1271 // lead to different behavior, but that's an acceptable compromise; that
1272 // would be undefined behavior territory anyway.
1273 defer func() {
1274 if trInfo != nil {
1275 if err != nil && err != io.EOF {
1276 trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1277 trInfo.tr.SetError()
1278 }
1279 trInfo.tr.Finish()
1280 }
1281
bseeniva0b9cbcb2026-02-12 19:11:11 +05301282 if sh != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001283 end := &stats.End{
1284 BeginTime: statsBegin.BeginTime,
1285 EndTime: time.Now(),
1286 }
1287 if err != nil && err != io.EOF {
1288 end.Error = toRPCErr(err)
1289 }
1290 sh.HandleRPC(ctx, end)
1291 }
1292
1293 if channelz.IsOn() {
1294 if err != nil && err != io.EOF {
1295 s.incrCallsFailed()
1296 } else {
1297 s.incrCallsSucceeded()
1298 }
1299 }
1300 }()
1301 }
1302 var binlogs []binarylog.MethodLogger
1303 if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1304 binlogs = append(binlogs, ml)
1305 }
1306 if s.opts.binaryLogger != nil {
1307 if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1308 binlogs = append(binlogs, ml)
1309 }
1310 }
1311 if len(binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001312 md, _ := metadata.FromIncomingContext(ctx)
1313 logEntry := &binarylog.ClientHeader{
1314 Header: md,
1315 MethodName: stream.Method(),
1316 PeerAddr: nil,
1317 }
1318 if deadline, ok := ctx.Deadline(); ok {
1319 logEntry.Timeout = time.Until(deadline)
1320 if logEntry.Timeout < 0 {
1321 logEntry.Timeout = 0
1322 }
1323 }
1324 if a := md[":authority"]; len(a) > 0 {
1325 logEntry.Authority = a[0]
1326 }
1327 if peer, ok := peer.FromContext(ctx); ok {
1328 logEntry.PeerAddr = peer.Addr
1329 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001330 for _, binlog := range binlogs {
1331 binlog.Log(ctx, logEntry)
1332 }
William Kurkianea869482019-04-09 15:16:11 -04001333 }
1334
1335 // comp and cp are used for compression. decomp and dc are used for
1336 // decompression. If comp and decomp are both set, they are the same;
1337 // however they are kept separate to ensure that at most one of the
1338 // compressor/decompressor variable pairs are set for use later.
1339 var comp, decomp encoding.Compressor
1340 var cp Compressor
1341 var dc Decompressor
Abhay Kumara61c5222025-11-10 07:32:50 +00001342 var sendCompressorName string
William Kurkianea869482019-04-09 15:16:11 -04001343
1344 // If dc is set and matches the stream's compression, use it. Otherwise, try
1345 // to find a matching registered compressor for decomp.
1346 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1347 dc = s.opts.dc
1348 } else if rc != "" && rc != encoding.Identity {
1349 decomp = encoding.GetCompressor(rc)
1350 if decomp == nil {
1351 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
Abhay Kumara61c5222025-11-10 07:32:50 +00001352 stream.WriteStatus(st)
William Kurkianea869482019-04-09 15:16:11 -04001353 return st.Err()
1354 }
1355 }
1356
1357 // If cp is set, use it. Otherwise, attempt to compress the response using
1358 // the incoming message compression method.
1359 //
1360 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1361 if s.opts.cp != nil {
1362 cp = s.opts.cp
Abhay Kumara61c5222025-11-10 07:32:50 +00001363 sendCompressorName = cp.Type()
William Kurkianea869482019-04-09 15:16:11 -04001364 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1365 // Legacy compressor not specified; attempt to respond with same encoding.
1366 comp = encoding.GetCompressor(rc)
1367 if comp != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001368 sendCompressorName = comp.Name()
1369 }
1370 }
1371
1372 if sendCompressorName != "" {
1373 if err := stream.SetSendCompress(sendCompressorName); err != nil {
1374 return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
William Kurkianea869482019-04-09 15:16:11 -04001375 }
1376 }
1377
1378 var payInfo *payloadInfo
bseeniva0b9cbcb2026-02-12 19:11:11 +05301379 if sh != nil || len(binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001380 payInfo = &payloadInfo{}
Abhay Kumara61c5222025-11-10 07:32:50 +00001381 defer payInfo.free()
William Kurkianea869482019-04-09 15:16:11 -04001382 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001383
1384 d, err := recvAndDecompress(&parser{r: stream, bufferPool: s.opts.bufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp, true)
William Kurkianea869482019-04-09 15:16:11 -04001385 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001386 if e := stream.WriteStatus(status.Convert(err)); e != nil {
1387 channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
William Kurkianea869482019-04-09 15:16:11 -04001388 }
1389 return err
1390 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001391 freed := false
1392 dataFree := func() {
1393 if !freed {
1394 d.Free()
1395 freed = true
1396 }
William Kurkianea869482019-04-09 15:16:11 -04001397 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001398 defer dataFree()
1399 df := func(v any) error {
1400 defer dataFree()
William Kurkianea869482019-04-09 15:16:11 -04001401 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1402 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1403 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001404
bseeniva0b9cbcb2026-02-12 19:11:11 +05301405 if sh != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001406 sh.HandleRPC(ctx, &stats.InPayload{
1407 RecvTime: time.Now(),
1408 Payload: v,
1409 Length: d.Len(),
1410 WireLength: payInfo.compressedLength + headerLen,
1411 CompressedLength: payInfo.compressedLength,
William Kurkianea869482019-04-09 15:16:11 -04001412 })
1413 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001414 if len(binlogs) != 0 {
1415 cm := &binarylog.ClientMessage{
1416 Message: d.Materialize(),
1417 }
1418 for _, binlog := range binlogs {
1419 binlog.Log(ctx, cm)
1420 }
William Kurkianea869482019-04-09 15:16:11 -04001421 }
1422 if trInfo != nil {
1423 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1424 }
1425 return nil
1426 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001427 ctx = NewContextWithServerTransportStream(ctx, stream)
1428 reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
William Kurkianea869482019-04-09 15:16:11 -04001429 if appErr != nil {
1430 appStatus, ok := status.FromError(appErr)
1431 if !ok {
Abhay Kumara61c5222025-11-10 07:32:50 +00001432 // Convert non-status application error to a status error with code
1433 // Unknown, but handle context errors specifically.
1434 appStatus = status.FromContextError(appErr)
1435 appErr = appStatus.Err()
William Kurkianea869482019-04-09 15:16:11 -04001436 }
1437 if trInfo != nil {
1438 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1439 trInfo.tr.SetError()
1440 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001441 if e := stream.WriteStatus(appStatus); e != nil {
1442 channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
William Kurkianea869482019-04-09 15:16:11 -04001443 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001444 if len(binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001445 if h, _ := stream.Header(); h.Len() > 0 {
1446 // Only log serverHeader if there was header. Otherwise it can
1447 // be trailer only.
Abhay Kumara61c5222025-11-10 07:32:50 +00001448 sh := &binarylog.ServerHeader{
William Kurkianea869482019-04-09 15:16:11 -04001449 Header: h,
Abhay Kumara61c5222025-11-10 07:32:50 +00001450 }
1451 for _, binlog := range binlogs {
1452 binlog.Log(ctx, sh)
1453 }
William Kurkianea869482019-04-09 15:16:11 -04001454 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001455 st := &binarylog.ServerTrailer{
William Kurkianea869482019-04-09 15:16:11 -04001456 Trailer: stream.Trailer(),
1457 Err: appErr,
Abhay Kumara61c5222025-11-10 07:32:50 +00001458 }
1459 for _, binlog := range binlogs {
1460 binlog.Log(ctx, st)
1461 }
William Kurkianea869482019-04-09 15:16:11 -04001462 }
1463 return appErr
1464 }
1465 if trInfo != nil {
1466 trInfo.tr.LazyLog(stringer("OK"), false)
1467 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001468 opts := &transport.WriteOptions{Last: true}
William Kurkianea869482019-04-09 15:16:11 -04001469
Abhay Kumara61c5222025-11-10 07:32:50 +00001470 // Server handler could have set new compressor by calling SetSendCompressor.
1471 // In case it is set, we need to use it for compressing outbound message.
1472 if stream.SendCompress() != sendCompressorName {
1473 comp = encoding.GetCompressor(stream.SendCompress())
1474 }
1475 if err := s.sendResponse(ctx, stream, reply, cp, opts, comp); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001476 if err == io.EOF {
1477 // The entire stream is done (for unary RPC only).
1478 return err
1479 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001480 if sts, ok := status.FromError(err); ok {
1481 if e := stream.WriteStatus(sts); e != nil {
1482 channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
William Kurkianea869482019-04-09 15:16:11 -04001483 }
1484 } else {
1485 switch st := err.(type) {
1486 case transport.ConnectionError:
1487 // Nothing to do here.
1488 default:
1489 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1490 }
1491 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001492 if len(binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001493 h, _ := stream.Header()
Abhay Kumara61c5222025-11-10 07:32:50 +00001494 sh := &binarylog.ServerHeader{
William Kurkianea869482019-04-09 15:16:11 -04001495 Header: h,
Abhay Kumara61c5222025-11-10 07:32:50 +00001496 }
1497 st := &binarylog.ServerTrailer{
William Kurkianea869482019-04-09 15:16:11 -04001498 Trailer: stream.Trailer(),
1499 Err: appErr,
Abhay Kumara61c5222025-11-10 07:32:50 +00001500 }
1501 for _, binlog := range binlogs {
1502 binlog.Log(ctx, sh)
1503 binlog.Log(ctx, st)
1504 }
William Kurkianea869482019-04-09 15:16:11 -04001505 }
1506 return err
1507 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001508 if len(binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001509 h, _ := stream.Header()
Abhay Kumara61c5222025-11-10 07:32:50 +00001510 sh := &binarylog.ServerHeader{
William Kurkianea869482019-04-09 15:16:11 -04001511 Header: h,
Abhay Kumara61c5222025-11-10 07:32:50 +00001512 }
1513 sm := &binarylog.ServerMessage{
William Kurkianea869482019-04-09 15:16:11 -04001514 Message: reply,
Abhay Kumara61c5222025-11-10 07:32:50 +00001515 }
1516 for _, binlog := range binlogs {
1517 binlog.Log(ctx, sh)
1518 binlog.Log(ctx, sm)
1519 }
William Kurkianea869482019-04-09 15:16:11 -04001520 }
1521 if trInfo != nil {
1522 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1523 }
1524 // TODO: Should we be logging if writing status failed here, like above?
1525 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1526 // error or allow the stats handler to see it?
Abhay Kumara61c5222025-11-10 07:32:50 +00001527 if len(binlogs) != 0 {
1528 st := &binarylog.ServerTrailer{
William Kurkianea869482019-04-09 15:16:11 -04001529 Trailer: stream.Trailer(),
1530 Err: appErr,
Abhay Kumara61c5222025-11-10 07:32:50 +00001531 }
1532 for _, binlog := range binlogs {
1533 binlog.Log(ctx, st)
1534 }
William Kurkianea869482019-04-09 15:16:11 -04001535 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001536 return stream.WriteStatus(statusOK)
William Kurkianea869482019-04-09 15:16:11 -04001537}
1538
Abhay Kumara61c5222025-11-10 07:32:50 +00001539// chainStreamServerInterceptors chains all stream server interceptors into one.
1540func chainStreamServerInterceptors(s *Server) {
1541 // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1542 // be executed before any other chained interceptors.
1543 interceptors := s.opts.chainStreamInts
1544 if s.opts.streamInt != nil {
1545 interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1546 }
1547
1548 var chainedInt StreamServerInterceptor
1549 if len(interceptors) == 0 {
1550 chainedInt = nil
1551 } else if len(interceptors) == 1 {
1552 chainedInt = interceptors[0]
1553 } else {
1554 chainedInt = chainStreamInterceptors(interceptors)
1555 }
1556
1557 s.opts.streamInt = chainedInt
1558}
1559
1560func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
1561 return func(srv any, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
1562 return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
1563 }
1564}
1565
1566func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
1567 if curr == len(interceptors)-1 {
1568 return finalHandler
1569 }
1570 return func(srv any, stream ServerStream) error {
1571 return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
1572 }
1573}
1574
1575func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
William Kurkianea869482019-04-09 15:16:11 -04001576 if channelz.IsOn() {
1577 s.incrCallsStarted()
William Kurkianea869482019-04-09 15:16:11 -04001578 }
bseeniva0b9cbcb2026-02-12 19:11:11 +05301579 sh := s.statsHandler
Abhay Kumara61c5222025-11-10 07:32:50 +00001580 var statsBegin *stats.Begin
bseeniva0b9cbcb2026-02-12 19:11:11 +05301581 if sh != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001582 statsBegin = &stats.Begin{
bseeniva0b9cbcb2026-02-12 19:11:11 +05301583 BeginTime: time.Now(),
Abhay Kumara61c5222025-11-10 07:32:50 +00001584 IsClientStream: sd.ClientStreams,
1585 IsServerStream: sd.ServerStreams,
William Kurkianea869482019-04-09 15:16:11 -04001586 }
bseeniva0b9cbcb2026-02-12 19:11:11 +05301587 sh.HandleRPC(ctx, statsBegin)
William Kurkianea869482019-04-09 15:16:11 -04001588 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001589 ctx = NewContextWithServerTransportStream(ctx, stream)
William Kurkianea869482019-04-09 15:16:11 -04001590 ss := &serverStream{
1591 ctx: ctx,
William Kurkianea869482019-04-09 15:16:11 -04001592 s: stream,
bseeniva0b9cbcb2026-02-12 19:11:11 +05301593 p: parser{r: stream, bufferPool: s.opts.bufferPool},
William Kurkianea869482019-04-09 15:16:11 -04001594 codec: s.getCodec(stream.ContentSubtype()),
Abhay Kumara61c5222025-11-10 07:32:50 +00001595 desc: sd,
William Kurkianea869482019-04-09 15:16:11 -04001596 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1597 maxSendMessageSize: s.opts.maxSendMessageSize,
1598 trInfo: trInfo,
bseeniva0b9cbcb2026-02-12 19:11:11 +05301599 statsHandler: sh,
William Kurkianea869482019-04-09 15:16:11 -04001600 }
1601
bseeniva0b9cbcb2026-02-12 19:11:11 +05301602 if sh != nil || trInfo != nil || channelz.IsOn() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001603 // See comment in processUnaryRPC on defers.
1604 defer func() {
1605 if trInfo != nil {
1606 ss.mu.Lock()
1607 if err != nil && err != io.EOF {
1608 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1609 ss.trInfo.tr.SetError()
1610 }
1611 ss.trInfo.tr.Finish()
1612 ss.trInfo.tr = nil
1613 ss.mu.Unlock()
1614 }
1615
bseeniva0b9cbcb2026-02-12 19:11:11 +05301616 if sh != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001617 end := &stats.End{
1618 BeginTime: statsBegin.BeginTime,
1619 EndTime: time.Now(),
1620 }
1621 if err != nil && err != io.EOF {
1622 end.Error = toRPCErr(err)
1623 }
bseeniva0b9cbcb2026-02-12 19:11:11 +05301624 sh.HandleRPC(ctx, end)
Abhay Kumara61c5222025-11-10 07:32:50 +00001625 }
1626
1627 if channelz.IsOn() {
1628 if err != nil && err != io.EOF {
1629 s.incrCallsFailed()
1630 } else {
1631 s.incrCallsSucceeded()
1632 }
1633 }
1634 }()
1635 }
1636
1637 if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1638 ss.binlogs = append(ss.binlogs, ml)
1639 }
1640 if s.opts.binaryLogger != nil {
1641 if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1642 ss.binlogs = append(ss.binlogs, ml)
1643 }
1644 }
1645 if len(ss.binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001646 md, _ := metadata.FromIncomingContext(ctx)
1647 logEntry := &binarylog.ClientHeader{
1648 Header: md,
1649 MethodName: stream.Method(),
1650 PeerAddr: nil,
1651 }
1652 if deadline, ok := ctx.Deadline(); ok {
1653 logEntry.Timeout = time.Until(deadline)
1654 if logEntry.Timeout < 0 {
1655 logEntry.Timeout = 0
1656 }
1657 }
1658 if a := md[":authority"]; len(a) > 0 {
1659 logEntry.Authority = a[0]
1660 }
1661 if peer, ok := peer.FromContext(ss.Context()); ok {
1662 logEntry.PeerAddr = peer.Addr
1663 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001664 for _, binlog := range ss.binlogs {
1665 binlog.Log(ctx, logEntry)
1666 }
William Kurkianea869482019-04-09 15:16:11 -04001667 }
1668
1669 // If dc is set and matches the stream's compression, use it. Otherwise, try
1670 // to find a matching registered compressor for decomp.
1671 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
Abhay Kumara61c5222025-11-10 07:32:50 +00001672 ss.decompressorV0 = s.opts.dc
William Kurkianea869482019-04-09 15:16:11 -04001673 } else if rc != "" && rc != encoding.Identity {
Abhay Kumara61c5222025-11-10 07:32:50 +00001674 ss.decompressorV1 = encoding.GetCompressor(rc)
1675 if ss.decompressorV1 == nil {
William Kurkianea869482019-04-09 15:16:11 -04001676 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
Abhay Kumara61c5222025-11-10 07:32:50 +00001677 ss.s.WriteStatus(st)
William Kurkianea869482019-04-09 15:16:11 -04001678 return st.Err()
1679 }
1680 }
1681
1682 // If cp is set, use it. Otherwise, attempt to compress the response using
1683 // the incoming message compression method.
1684 //
1685 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1686 if s.opts.cp != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001687 ss.compressorV0 = s.opts.cp
1688 ss.sendCompressorName = s.opts.cp.Type()
William Kurkianea869482019-04-09 15:16:11 -04001689 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1690 // Legacy compressor not specified; attempt to respond with same encoding.
Abhay Kumara61c5222025-11-10 07:32:50 +00001691 ss.compressorV1 = encoding.GetCompressor(rc)
1692 if ss.compressorV1 != nil {
1693 ss.sendCompressorName = rc
William Kurkianea869482019-04-09 15:16:11 -04001694 }
1695 }
1696
Abhay Kumara61c5222025-11-10 07:32:50 +00001697 if ss.sendCompressorName != "" {
1698 if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
1699 return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
1700 }
1701 }
1702
1703 ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.compressorV0, ss.compressorV1)
1704
William Kurkianea869482019-04-09 15:16:11 -04001705 if trInfo != nil {
1706 trInfo.tr.LazyLog(&trInfo.firstLine, false)
William Kurkianea869482019-04-09 15:16:11 -04001707 }
1708 var appErr error
Abhay Kumara61c5222025-11-10 07:32:50 +00001709 var server any
1710 if info != nil {
1711 server = info.serviceImpl
William Kurkianea869482019-04-09 15:16:11 -04001712 }
1713 if s.opts.streamInt == nil {
1714 appErr = sd.Handler(server, ss)
1715 } else {
1716 info := &StreamServerInfo{
1717 FullMethod: stream.Method(),
1718 IsClientStream: sd.ClientStreams,
1719 IsServerStream: sd.ServerStreams,
1720 }
1721 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1722 }
1723 if appErr != nil {
1724 appStatus, ok := status.FromError(appErr)
1725 if !ok {
Abhay Kumara61c5222025-11-10 07:32:50 +00001726 // Convert non-status application error to a status error with code
1727 // Unknown, but handle context errors specifically.
1728 appStatus = status.FromContextError(appErr)
William Kurkianea869482019-04-09 15:16:11 -04001729 appErr = appStatus.Err()
1730 }
1731 if trInfo != nil {
1732 ss.mu.Lock()
1733 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1734 ss.trInfo.tr.SetError()
1735 ss.mu.Unlock()
1736 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001737 if len(ss.binlogs) != 0 {
1738 st := &binarylog.ServerTrailer{
William Kurkianea869482019-04-09 15:16:11 -04001739 Trailer: ss.s.Trailer(),
1740 Err: appErr,
Abhay Kumara61c5222025-11-10 07:32:50 +00001741 }
1742 for _, binlog := range ss.binlogs {
1743 binlog.Log(ctx, st)
1744 }
William Kurkianea869482019-04-09 15:16:11 -04001745 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001746 ss.s.WriteStatus(appStatus)
William Kurkianea869482019-04-09 15:16:11 -04001747 // TODO: Should we log an error from WriteStatus here and below?
1748 return appErr
1749 }
1750 if trInfo != nil {
1751 ss.mu.Lock()
1752 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1753 ss.mu.Unlock()
1754 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001755 if len(ss.binlogs) != 0 {
1756 st := &binarylog.ServerTrailer{
William Kurkianea869482019-04-09 15:16:11 -04001757 Trailer: ss.s.Trailer(),
1758 Err: appErr,
Abhay Kumara61c5222025-11-10 07:32:50 +00001759 }
1760 for _, binlog := range ss.binlogs {
1761 binlog.Log(ctx, st)
1762 }
William Kurkianea869482019-04-09 15:16:11 -04001763 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001764 return ss.s.WriteStatus(statusOK)
William Kurkianea869482019-04-09 15:16:11 -04001765}
1766
Abhay Kumara61c5222025-11-10 07:32:50 +00001767func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream) {
1768 ctx := stream.Context()
1769 ctx = contextWithServer(ctx, s)
1770 var ti *traceInfo
1771 if EnableTracing {
1772 tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
1773 ctx = newTraceContext(ctx, tr)
1774 ti = &traceInfo{
1775 tr: tr,
1776 firstLine: firstLine{
1777 client: false,
1778 remoteAddr: t.Peer().Addr,
1779 },
1780 }
1781 if dl, ok := ctx.Deadline(); ok {
1782 ti.firstLine.deadline = time.Until(dl)
1783 }
1784 }
1785
William Kurkianea869482019-04-09 15:16:11 -04001786 sm := stream.Method()
1787 if sm != "" && sm[0] == '/' {
1788 sm = sm[1:]
1789 }
1790 pos := strings.LastIndex(sm, "/")
1791 if pos == -1 {
Abhay Kumara61c5222025-11-10 07:32:50 +00001792 if ti != nil {
1793 ti.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
1794 ti.tr.SetError()
William Kurkianea869482019-04-09 15:16:11 -04001795 }
1796 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
Abhay Kumara61c5222025-11-10 07:32:50 +00001797 if err := stream.WriteStatus(status.New(codes.Unimplemented, errDesc)); err != nil {
1798 if ti != nil {
1799 ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1800 ti.tr.SetError()
William Kurkianea869482019-04-09 15:16:11 -04001801 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001802 channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)
William Kurkianea869482019-04-09 15:16:11 -04001803 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001804 if ti != nil {
1805 ti.tr.Finish()
William Kurkianea869482019-04-09 15:16:11 -04001806 }
1807 return
1808 }
1809 service := sm[:pos]
1810 method := sm[pos+1:]
1811
Abhay Kumara61c5222025-11-10 07:32:50 +00001812 // FromIncomingContext is expensive: skip if there are no statsHandlers
bseeniva0b9cbcb2026-02-12 19:11:11 +05301813 if s.statsHandler != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001814 md, _ := metadata.FromIncomingContext(ctx)
bseeniva0b9cbcb2026-02-12 19:11:11 +05301815 ctx = s.statsHandler.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()})
1816 s.statsHandler.HandleRPC(ctx, &stats.InHeader{
1817 FullMethod: stream.Method(),
1818 RemoteAddr: t.Peer().Addr,
1819 LocalAddr: t.Peer().LocalAddr,
1820 Compression: stream.RecvCompress(),
1821 WireLength: stream.HeaderWireLength(),
1822 Header: md,
1823 })
Abhay Kumara61c5222025-11-10 07:32:50 +00001824 }
1825 // To have calls in stream callouts work. Will delete once all stats handler
1826 // calls come from the gRPC layer.
1827 stream.SetContext(ctx)
1828
1829 srv, knownService := s.services[service]
Abhilash S.L3b494632019-07-16 15:51:09 +05301830 if knownService {
Abhay Kumara61c5222025-11-10 07:32:50 +00001831 if md, ok := srv.methods[method]; ok {
1832 s.processUnaryRPC(ctx, stream, srv, md, ti)
William Kurkianea869482019-04-09 15:16:11 -04001833 return
1834 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001835 if sd, ok := srv.streams[method]; ok {
1836 s.processStreamingRPC(ctx, stream, srv, sd, ti)
William Kurkianea869482019-04-09 15:16:11 -04001837 return
1838 }
1839 }
1840 // Unknown service, or known server unknown method.
1841 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001842 s.processStreamingRPC(ctx, stream, nil, unknownDesc, ti)
William Kurkianea869482019-04-09 15:16:11 -04001843 return
1844 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301845 var errDesc string
1846 if !knownService {
1847 errDesc = fmt.Sprintf("unknown service %v", service)
1848 } else {
1849 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1850 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001851 if ti != nil {
1852 ti.tr.LazyPrintf("%s", errDesc)
1853 ti.tr.SetError()
William Kurkianea869482019-04-09 15:16:11 -04001854 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001855 if err := stream.WriteStatus(status.New(codes.Unimplemented, errDesc)); err != nil {
1856 if ti != nil {
1857 ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1858 ti.tr.SetError()
William Kurkianea869482019-04-09 15:16:11 -04001859 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001860 channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)
William Kurkianea869482019-04-09 15:16:11 -04001861 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001862 if ti != nil {
1863 ti.tr.Finish()
William Kurkianea869482019-04-09 15:16:11 -04001864 }
1865}
1866
1867// The key to save ServerTransportStream in the context.
1868type streamKey struct{}
1869
1870// NewContextWithServerTransportStream creates a new context from ctx and
1871// attaches stream to it.
1872//
Abhay Kumara61c5222025-11-10 07:32:50 +00001873// # Experimental
1874//
1875// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1876// later release.
William Kurkianea869482019-04-09 15:16:11 -04001877func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1878 return context.WithValue(ctx, streamKey{}, stream)
1879}
1880
1881// ServerTransportStream is a minimal interface that a transport stream must
1882// implement. This can be used to mock an actual transport stream for tests of
1883// handler code that use, for example, grpc.SetHeader (which requires some
1884// stream to be in context).
1885//
1886// See also NewContextWithServerTransportStream.
1887//
Abhay Kumara61c5222025-11-10 07:32:50 +00001888// # Experimental
1889//
1890// Notice: This type is EXPERIMENTAL and may be changed or removed in a
1891// later release.
William Kurkianea869482019-04-09 15:16:11 -04001892type ServerTransportStream interface {
1893 Method() string
1894 SetHeader(md metadata.MD) error
1895 SendHeader(md metadata.MD) error
1896 SetTrailer(md metadata.MD) error
1897}
1898
1899// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1900// ctx. Returns nil if the given context has no stream associated with it
1901// (which implies it is not an RPC invocation context).
1902//
Abhay Kumara61c5222025-11-10 07:32:50 +00001903// # Experimental
1904//
1905// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1906// later release.
William Kurkianea869482019-04-09 15:16:11 -04001907func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1908 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1909 return s
1910}
1911
1912// Stop stops the gRPC server. It immediately closes all open
1913// connections and listeners.
1914// It cancels all active RPCs on the server side and the corresponding
1915// pending RPCs on the client side will get notified by connection
1916// errors.
1917func (s *Server) Stop() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001918 s.stop(false)
William Kurkianea869482019-04-09 15:16:11 -04001919}
1920
1921// GracefulStop stops the gRPC server gracefully. It stops the server from
1922// accepting new connections and RPCs and blocks until all the pending RPCs are
1923// finished.
1924func (s *Server) GracefulStop() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001925 s.stop(true)
1926}
1927
1928func (s *Server) stop(graceful bool) {
David Bainbridge788e5202019-10-21 18:49:40 +00001929 s.quit.Fire()
1930 defer s.done.Fire()
William Kurkianea869482019-04-09 15:16:11 -04001931
Abhay Kumara61c5222025-11-10 07:32:50 +00001932 s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })
William Kurkianea869482019-04-09 15:16:11 -04001933 s.mu.Lock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001934 s.closeListenersLocked()
William Kurkianea869482019-04-09 15:16:11 -04001935 // Wait for serving threads to be ready to exit. Only then can we be sure no
1936 // new conns will be created.
1937 s.mu.Unlock()
1938 s.serveWG.Wait()
Abhay Kumara61c5222025-11-10 07:32:50 +00001939
William Kurkianea869482019-04-09 15:16:11 -04001940 s.mu.Lock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001941 defer s.mu.Unlock()
1942
1943 if graceful {
1944 s.drainAllServerTransportsLocked()
1945 } else {
1946 s.closeServerTransportsLocked()
1947 }
William Kurkianea869482019-04-09 15:16:11 -04001948
1949 for len(s.conns) != 0 {
1950 s.cv.Wait()
1951 }
1952 s.conns = nil
Abhay Kumara61c5222025-11-10 07:32:50 +00001953
1954 if s.opts.numServerWorkers > 0 {
1955 // Closing the channel (only once, via sync.OnceFunc) after all the
1956 // connections have been closed above ensures that there are no
1957 // goroutines executing the callback passed to st.HandleStreams (where
1958 // the channel is written to).
1959 s.serverWorkerChannelClose()
1960 }
1961
1962 if graceful || s.opts.waitForHandlers {
1963 s.handlersWG.Wait()
1964 }
1965
William Kurkianea869482019-04-09 15:16:11 -04001966 if s.events != nil {
1967 s.events.Finish()
1968 s.events = nil
1969 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001970}
1971
1972// s.mu must be held by the caller.
1973func (s *Server) closeServerTransportsLocked() {
1974 for _, conns := range s.conns {
1975 for st := range conns {
1976 st.Close(errors.New("Server.Stop called"))
1977 }
1978 }
1979}
1980
1981// s.mu must be held by the caller.
1982func (s *Server) drainAllServerTransportsLocked() {
1983 if !s.drain {
1984 for _, conns := range s.conns {
1985 for st := range conns {
1986 st.Drain("graceful_stop")
1987 }
1988 }
1989 s.drain = true
1990 }
1991}
1992
1993// s.mu must be held by the caller.
1994func (s *Server) closeListenersLocked() {
1995 for lis := range s.lis {
1996 lis.Close()
1997 }
1998 s.lis = nil
William Kurkianea869482019-04-09 15:16:11 -04001999}
2000
2001// contentSubtype must be lowercase
2002// cannot return nil
2003func (s *Server) getCodec(contentSubtype string) baseCodec {
2004 if s.opts.codec != nil {
2005 return s.opts.codec
2006 }
2007 if contentSubtype == "" {
Abhay Kumara61c5222025-11-10 07:32:50 +00002008 return getCodec(proto.Name)
William Kurkianea869482019-04-09 15:16:11 -04002009 }
Abhay Kumara61c5222025-11-10 07:32:50 +00002010 codec := getCodec(contentSubtype)
William Kurkianea869482019-04-09 15:16:11 -04002011 if codec == nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00002012 logger.Warningf("Unsupported codec %q. Defaulting to %q for now. This will start to fail in future releases.", contentSubtype, proto.Name)
2013 return getCodec(proto.Name)
William Kurkianea869482019-04-09 15:16:11 -04002014 }
2015 return codec
2016}
2017
Abhay Kumara61c5222025-11-10 07:32:50 +00002018type serverKey struct{}
2019
2020// serverFromContext gets the Server from the context.
2021func serverFromContext(ctx context.Context) *Server {
2022 s, _ := ctx.Value(serverKey{}).(*Server)
2023 return s
2024}
2025
2026// contextWithServer sets the Server in the context.
2027func contextWithServer(ctx context.Context, server *Server) context.Context {
2028 return context.WithValue(ctx, serverKey{}, server)
2029}
2030
2031// isRegisteredMethod returns whether the passed in method is registered as a
2032// method on the server. /service/method and service/method will match if the
2033// service and method are registered on the server.
2034func (s *Server) isRegisteredMethod(serviceMethod string) bool {
2035 if serviceMethod != "" && serviceMethod[0] == '/' {
2036 serviceMethod = serviceMethod[1:]
2037 }
2038 pos := strings.LastIndex(serviceMethod, "/")
2039 if pos == -1 { // Invalid method name syntax.
2040 return false
2041 }
2042 service := serviceMethod[:pos]
2043 method := serviceMethod[pos+1:]
2044 srv, knownService := s.services[service]
2045 if knownService {
2046 if _, ok := srv.methods[method]; ok {
2047 return true
2048 }
2049 if _, ok := srv.streams[method]; ok {
2050 return true
2051 }
2052 }
2053 return false
2054}
2055
2056// SetHeader sets the header metadata to be sent from the server to the client.
2057// The context provided must be the context passed to the server's handler.
2058//
2059// Streaming RPCs should prefer the SetHeader method of the ServerStream.
2060//
2061// When called multiple times, all the provided metadata will be merged. All
2062// the metadata will be sent out when one of the following happens:
2063//
2064// - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
2065// - The first response message is sent. For unary handlers, this occurs when
2066// the handler returns; for streaming handlers, this can happen when stream's
2067// SendMsg method is called.
2068// - An RPC status is sent out (error or success). This occurs when the handler
2069// returns.
2070//
2071// SetHeader will fail if called after any of the events above.
2072//
2073// The error returned is compatible with the status package. However, the
2074// status code will often not match the RPC status as seen by the client
2075// application, and therefore, should not be relied upon for this purpose.
William Kurkianea869482019-04-09 15:16:11 -04002076func SetHeader(ctx context.Context, md metadata.MD) error {
2077 if md.Len() == 0 {
2078 return nil
2079 }
2080 stream := ServerTransportStreamFromContext(ctx)
2081 if stream == nil {
2082 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2083 }
2084 return stream.SetHeader(md)
2085}
2086
Abhay Kumara61c5222025-11-10 07:32:50 +00002087// SendHeader sends header metadata. It may be called at most once, and may not
2088// be called after any event that causes headers to be sent (see SetHeader for
2089// a complete list). The provided md and headers set by SetHeader() will be
2090// sent.
2091//
2092// The error returned is compatible with the status package. However, the
2093// status code will often not match the RPC status as seen by the client
2094// application, and therefore, should not be relied upon for this purpose.
William Kurkianea869482019-04-09 15:16:11 -04002095func SendHeader(ctx context.Context, md metadata.MD) error {
2096 stream := ServerTransportStreamFromContext(ctx)
2097 if stream == nil {
2098 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2099 }
2100 if err := stream.SendHeader(md); err != nil {
2101 return toRPCErr(err)
2102 }
2103 return nil
2104}
2105
Abhay Kumara61c5222025-11-10 07:32:50 +00002106// SetSendCompressor sets a compressor for outbound messages from the server.
2107// It must not be called after any event that causes headers to be sent
2108// (see ServerStream.SetHeader for the complete list). Provided compressor is
2109// used when below conditions are met:
2110//
2111// - compressor is registered via encoding.RegisterCompressor
2112// - compressor name must exist in the client advertised compressor names
2113// sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
2114// get client supported compressor names.
2115//
2116// The context provided must be the context passed to the server's handler.
2117// It must be noted that compressor name encoding.Identity disables the
2118// outbound compression.
2119// By default, server messages will be sent using the same compressor with
2120// which request messages were sent.
2121//
2122// It is not safe to call SetSendCompressor concurrently with SendHeader and
2123// SendMsg.
2124//
2125// # Experimental
2126//
2127// Notice: This function is EXPERIMENTAL and may be changed or removed in a
2128// later release.
2129func SetSendCompressor(ctx context.Context, name string) error {
2130 stream, ok := ServerTransportStreamFromContext(ctx).(*transport.ServerStream)
2131 if !ok || stream == nil {
2132 return fmt.Errorf("failed to fetch the stream from the given context")
2133 }
2134
2135 if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
2136 return fmt.Errorf("unable to set send compressor: %w", err)
2137 }
2138
2139 return stream.SetSendCompress(name)
2140}
2141
2142// ClientSupportedCompressors returns compressor names advertised by the client
2143// via grpc-accept-encoding header.
2144//
2145// The context provided must be the context passed to the server's handler.
2146//
2147// # Experimental
2148//
2149// Notice: This function is EXPERIMENTAL and may be changed or removed in a
2150// later release.
2151func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
2152 stream, ok := ServerTransportStreamFromContext(ctx).(*transport.ServerStream)
2153 if !ok || stream == nil {
2154 return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
2155 }
2156
2157 return stream.ClientAdvertisedCompressors(), nil
2158}
2159
William Kurkianea869482019-04-09 15:16:11 -04002160// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
2161// When called more than once, all the provided metadata will be merged.
Abhay Kumara61c5222025-11-10 07:32:50 +00002162//
2163// The error returned is compatible with the status package. However, the
2164// status code will often not match the RPC status as seen by the client
2165// application, and therefore, should not be relied upon for this purpose.
William Kurkianea869482019-04-09 15:16:11 -04002166func SetTrailer(ctx context.Context, md metadata.MD) error {
2167 if md.Len() == 0 {
2168 return nil
2169 }
2170 stream := ServerTransportStreamFromContext(ctx)
2171 if stream == nil {
2172 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2173 }
2174 return stream.SetTrailer(md)
2175}
2176
2177// Method returns the method string for the server context. The returned
2178// string is in the format of "/service/method".
2179func Method(ctx context.Context) (string, bool) {
2180 s := ServerTransportStreamFromContext(ctx)
2181 if s == nil {
2182 return "", false
2183 }
2184 return s.Method(), true
2185}
2186
Abhay Kumara61c5222025-11-10 07:32:50 +00002187// validateSendCompressor returns an error when given compressor name cannot be
2188// handled by the server or the client based on the advertised compressors.
2189func validateSendCompressor(name string, clientCompressors []string) error {
2190 if name == encoding.Identity {
2191 return nil
2192 }
2193
2194 if !grpcutil.IsCompressorNameRegistered(name) {
2195 return fmt.Errorf("compressor not registered %q", name)
2196 }
2197
2198 for _, c := range clientCompressors {
2199 if c == name {
2200 return nil // found match
2201 }
2202 }
2203 return fmt.Errorf("client does not support compressor %q", name)
William Kurkianea869482019-04-09 15:16:11 -04002204}
2205
Abhay Kumara61c5222025-11-10 07:32:50 +00002206// atomicSemaphore implements a blocking, counting semaphore. acquire should be
2207// called synchronously; release may be called asynchronously.
2208type atomicSemaphore struct {
2209 n atomic.Int64
2210 wait chan struct{}
2211}
2212
2213func (q *atomicSemaphore) acquire() {
2214 if q.n.Add(-1) < 0 {
2215 // We ran out of quota. Block until a release happens.
2216 <-q.wait
2217 }
2218}
2219
2220func (q *atomicSemaphore) release() {
2221 // N.B. the "<= 0" check below should allow for this to work with multiple
2222 // concurrent calls to acquire, but also note that with synchronous calls to
2223 // acquire, as our system does, n will never be less than -1. There are
2224 // fairness issues (queuing) to consider if this was to be generalized.
2225 if q.n.Add(1) <= 0 {
2226 // An acquire was waiting on us. Unblock it.
2227 q.wait <- struct{}{}
2228 }
2229}
2230
2231func newHandlerQuota(n uint32) *atomicSemaphore {
2232 a := &atomicSemaphore{wait: make(chan struct{}, 1)}
2233 a.n.Store(int64(n))
2234 return a
William Kurkianea869482019-04-09 15:16:11 -04002235}