blob: 1da2a542acde34242957a12ff797e718cde7b70c [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35
William Kurkianea869482019-04-09 15:16:11 -040036 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/encoding"
39 "google.golang.org/grpc/encoding/proto"
Abhay Kumara61c5222025-11-10 07:32:50 +000040 estats "google.golang.org/grpc/experimental/stats"
William Kurkianea869482019-04-09 15:16:11 -040041 "google.golang.org/grpc/grpclog"
Abhay Kumara61c5222025-11-10 07:32:50 +000042 "google.golang.org/grpc/internal"
William Kurkianea869482019-04-09 15:16:11 -040043 "google.golang.org/grpc/internal/binarylog"
44 "google.golang.org/grpc/internal/channelz"
David Bainbridge788e5202019-10-21 18:49:40 +000045 "google.golang.org/grpc/internal/grpcsync"
Abhay Kumara61c5222025-11-10 07:32:50 +000046 "google.golang.org/grpc/internal/grpcutil"
47 istats "google.golang.org/grpc/internal/stats"
William Kurkianea869482019-04-09 15:16:11 -040048 "google.golang.org/grpc/internal/transport"
49 "google.golang.org/grpc/keepalive"
Abhay Kumara61c5222025-11-10 07:32:50 +000050 "google.golang.org/grpc/mem"
William Kurkianea869482019-04-09 15:16:11 -040051 "google.golang.org/grpc/metadata"
52 "google.golang.org/grpc/peer"
53 "google.golang.org/grpc/stats"
54 "google.golang.org/grpc/status"
55 "google.golang.org/grpc/tap"
56)
57
58const (
59 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
60 defaultServerMaxSendMessageSize = math.MaxInt32
Abhay Kumara61c5222025-11-10 07:32:50 +000061
62 // Server transports are tracked in a map which is keyed on listener
63 // address. For regular gRPC traffic, connections are accepted in Serve()
64 // through a call to Accept(), and we use the actual listener address as key
65 // when we add it to the map. But for connections received through
66 // ServeHTTP(), we do not have a listener and hence use this dummy value.
67 listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
William Kurkianea869482019-04-09 15:16:11 -040068)
69
Abhay Kumara61c5222025-11-10 07:32:50 +000070func init() {
71 internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
72 return srv.opts.creds
73 }
74 internal.IsRegisteredMethod = func(srv *Server, method string) bool {
75 return srv.isRegisteredMethod(method)
76 }
77 internal.ServerFromContext = serverFromContext
78 internal.AddGlobalServerOptions = func(opt ...ServerOption) {
79 globalServerOptions = append(globalServerOptions, opt...)
80 }
81 internal.ClearGlobalServerOptions = func() {
82 globalServerOptions = nil
83 }
84 internal.BinaryLogger = binaryLogger
85 internal.JoinServerOptions = newJoinServerOption
86 internal.BufferPool = bufferPool
87 internal.MetricsRecorderForServer = func(srv *Server) estats.MetricsRecorder {
88 return istats.NewMetricsRecorderList(srv.opts.statsHandlers)
89 }
90}
David Bainbridge788e5202019-10-21 18:49:40 +000091
Abhay Kumara61c5222025-11-10 07:32:50 +000092var statusOK = status.New(codes.OK, "")
93var logger = grpclog.Component("core")
94
95// MethodHandler is a function type that processes a unary RPC method call.
96type MethodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error)
William Kurkianea869482019-04-09 15:16:11 -040097
98// MethodDesc represents an RPC service's method specification.
99type MethodDesc struct {
100 MethodName string
Abhay Kumara61c5222025-11-10 07:32:50 +0000101 Handler MethodHandler
William Kurkianea869482019-04-09 15:16:11 -0400102}
103
104// ServiceDesc represents an RPC service's specification.
105type ServiceDesc struct {
106 ServiceName string
107 // The pointer to the service interface. Used to check whether the user
108 // provided implementation satisfies the interface requirements.
Abhay Kumara61c5222025-11-10 07:32:50 +0000109 HandlerType any
William Kurkianea869482019-04-09 15:16:11 -0400110 Methods []MethodDesc
111 Streams []StreamDesc
Abhay Kumara61c5222025-11-10 07:32:50 +0000112 Metadata any
William Kurkianea869482019-04-09 15:16:11 -0400113}
114
Abhay Kumara61c5222025-11-10 07:32:50 +0000115// serviceInfo wraps information about a service. It is very similar to
116// ServiceDesc and is constructed from it for internal purposes.
117type serviceInfo struct {
118 // Contains the implementation for the methods in this service.
119 serviceImpl any
120 methods map[string]*MethodDesc
121 streams map[string]*StreamDesc
122 mdata any
William Kurkianea869482019-04-09 15:16:11 -0400123}
124
125// Server is a gRPC server to serve RPC requests.
126type Server struct {
Abhilash S.L3b494632019-07-16 15:51:09 +0530127 opts serverOptions
William Kurkianea869482019-04-09 15:16:11 -0400128
Abhay Kumara61c5222025-11-10 07:32:50 +0000129 mu sync.Mutex // guards following
130 lis map[net.Listener]bool
131 // conns contains all active server transports. It is a map keyed on a
132 // listener address with the value being the set of active transports
133 // belonging to that listener.
134 conns map[string]map[transport.ServerTransport]bool
135 serve bool
136 drain bool
137 cv *sync.Cond // signaled when connections close for GracefulStop
138 services map[string]*serviceInfo // service name -> service info
139 events traceEventLog
William Kurkianea869482019-04-09 15:16:11 -0400140
David Bainbridge788e5202019-10-21 18:49:40 +0000141 quit *grpcsync.Event
142 done *grpcsync.Event
William Kurkianea869482019-04-09 15:16:11 -0400143 channelzRemoveOnce sync.Once
Abhay Kumara61c5222025-11-10 07:32:50 +0000144 serveWG sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop
145 handlersWG sync.WaitGroup // counts active method handler goroutines
William Kurkianea869482019-04-09 15:16:11 -0400146
Abhay Kumara61c5222025-11-10 07:32:50 +0000147 channelz *channelz.Server
148
149 serverWorkerChannel chan func()
150 serverWorkerChannelClose func()
William Kurkianea869482019-04-09 15:16:11 -0400151}
152
Abhilash S.L3b494632019-07-16 15:51:09 +0530153type serverOptions struct {
William Kurkianea869482019-04-09 15:16:11 -0400154 creds credentials.TransportCredentials
155 codec baseCodec
156 cp Compressor
157 dc Decompressor
158 unaryInt UnaryServerInterceptor
159 streamInt StreamServerInterceptor
Abhay Kumara61c5222025-11-10 07:32:50 +0000160 chainUnaryInts []UnaryServerInterceptor
161 chainStreamInts []StreamServerInterceptor
162 binaryLogger binarylog.Logger
William Kurkianea869482019-04-09 15:16:11 -0400163 inTapHandle tap.ServerInHandle
Abhay Kumara61c5222025-11-10 07:32:50 +0000164 statsHandlers []stats.Handler
William Kurkianea869482019-04-09 15:16:11 -0400165 maxConcurrentStreams uint32
166 maxReceiveMessageSize int
167 maxSendMessageSize int
168 unknownStreamDesc *StreamDesc
169 keepaliveParams keepalive.ServerParameters
170 keepalivePolicy keepalive.EnforcementPolicy
171 initialWindowSize int32
172 initialConnWindowSize int32
173 writeBufferSize int
174 readBufferSize int
Abhay Kumara61c5222025-11-10 07:32:50 +0000175 sharedWriteBuffer bool
William Kurkianea869482019-04-09 15:16:11 -0400176 connectionTimeout time.Duration
177 maxHeaderListSize *uint32
Devmalya Pauldd23a992019-11-14 07:06:31 +0000178 headerTableSize *uint32
Abhay Kumara61c5222025-11-10 07:32:50 +0000179 numServerWorkers uint32
180 bufferPool mem.BufferPool
181 waitForHandlers bool
182 staticWindowSize bool
William Kurkianea869482019-04-09 15:16:11 -0400183}
184
Abhilash S.L3b494632019-07-16 15:51:09 +0530185var defaultServerOptions = serverOptions{
Abhay Kumara61c5222025-11-10 07:32:50 +0000186 maxConcurrentStreams: math.MaxUint32,
William Kurkianea869482019-04-09 15:16:11 -0400187 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
188 maxSendMessageSize: defaultServerMaxSendMessageSize,
189 connectionTimeout: 120 * time.Second,
190 writeBufferSize: defaultWriteBufSize,
191 readBufferSize: defaultReadBufSize,
Abhay Kumara61c5222025-11-10 07:32:50 +0000192 bufferPool: mem.DefaultBufferPool(),
William Kurkianea869482019-04-09 15:16:11 -0400193}
Abhay Kumara61c5222025-11-10 07:32:50 +0000194var globalServerOptions []ServerOption
William Kurkianea869482019-04-09 15:16:11 -0400195
196// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
Abhilash S.L3b494632019-07-16 15:51:09 +0530197type ServerOption interface {
198 apply(*serverOptions)
199}
200
201// EmptyServerOption does not alter the server configuration. It can be embedded
202// in another structure to build custom server options.
203//
Abhay Kumara61c5222025-11-10 07:32:50 +0000204// # Experimental
205//
206// Notice: This type is EXPERIMENTAL and may be changed or removed in a
207// later release.
Abhilash S.L3b494632019-07-16 15:51:09 +0530208type EmptyServerOption struct{}
209
210func (EmptyServerOption) apply(*serverOptions) {}
211
212// funcServerOption wraps a function that modifies serverOptions into an
213// implementation of the ServerOption interface.
214type funcServerOption struct {
215 f func(*serverOptions)
216}
217
218func (fdo *funcServerOption) apply(do *serverOptions) {
219 fdo.f(do)
220}
221
222func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
223 return &funcServerOption{
224 f: f,
225 }
226}
William Kurkianea869482019-04-09 15:16:11 -0400227
Abhay Kumara61c5222025-11-10 07:32:50 +0000228// joinServerOption provides a way to combine arbitrary number of server
229// options into one.
230type joinServerOption struct {
231 opts []ServerOption
232}
233
234func (mdo *joinServerOption) apply(do *serverOptions) {
235 for _, opt := range mdo.opts {
236 opt.apply(do)
237 }
238}
239
240func newJoinServerOption(opts ...ServerOption) ServerOption {
241 return &joinServerOption{opts: opts}
242}
243
244// SharedWriteBuffer allows reusing per-connection transport write buffer.
245// If this option is set to true every connection will release the buffer after
246// flushing the data on the wire.
247//
248// # Experimental
249//
250// Notice: This API is EXPERIMENTAL and may be changed or removed in a
251// later release.
252func SharedWriteBuffer(val bool) ServerOption {
253 return newFuncServerOption(func(o *serverOptions) {
254 o.sharedWriteBuffer = val
255 })
256}
257
258// WriteBufferSize determines how much data can be batched before doing a write
259// on the wire. The default value for this buffer is 32KB. Zero or negative
260// values will disable the write buffer such that each write will be on underlying
261// connection. Note: A Send call may not directly translate to a write.
William Kurkianea869482019-04-09 15:16:11 -0400262func WriteBufferSize(s int) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530263 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400264 o.writeBufferSize = s
Abhilash S.L3b494632019-07-16 15:51:09 +0530265 })
William Kurkianea869482019-04-09 15:16:11 -0400266}
267
Abhay Kumara61c5222025-11-10 07:32:50 +0000268// ReadBufferSize lets you set the size of read buffer, this determines how much
269// data can be read at most for one read syscall. The default value for this
270// buffer is 32KB. Zero or negative values will disable read buffer for a
271// connection so data framer can access the underlying conn directly.
William Kurkianea869482019-04-09 15:16:11 -0400272func ReadBufferSize(s int) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530273 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400274 o.readBufferSize = s
Abhilash S.L3b494632019-07-16 15:51:09 +0530275 })
William Kurkianea869482019-04-09 15:16:11 -0400276}
277
278// InitialWindowSize returns a ServerOption that sets window size for stream.
279// The lower bound for window size is 64K and any value smaller than that will be ignored.
280func InitialWindowSize(s int32) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530281 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400282 o.initialWindowSize = s
Abhay Kumara61c5222025-11-10 07:32:50 +0000283 o.staticWindowSize = true
Abhilash S.L3b494632019-07-16 15:51:09 +0530284 })
William Kurkianea869482019-04-09 15:16:11 -0400285}
286
287// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
288// The lower bound for window size is 64K and any value smaller than that will be ignored.
289func InitialConnWindowSize(s int32) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530290 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400291 o.initialConnWindowSize = s
Abhay Kumara61c5222025-11-10 07:32:50 +0000292 o.staticWindowSize = true
293 })
294}
295
296// StaticStreamWindowSize returns a ServerOption to set the initial stream
297// window size to the value provided and disables dynamic flow control.
298// The lower bound for window size is 64K and any value smaller than that
299// will be ignored.
300func StaticStreamWindowSize(s int32) ServerOption {
301 return newFuncServerOption(func(o *serverOptions) {
302 o.initialWindowSize = s
303 o.staticWindowSize = true
304 })
305}
306
307// StaticConnWindowSize returns a ServerOption to set the initial connection
308// window size to the value provided and disables dynamic flow control.
309// The lower bound for window size is 64K and any value smaller than that
310// will be ignored.
311func StaticConnWindowSize(s int32) ServerOption {
312 return newFuncServerOption(func(o *serverOptions) {
313 o.initialConnWindowSize = s
314 o.staticWindowSize = true
Abhilash S.L3b494632019-07-16 15:51:09 +0530315 })
William Kurkianea869482019-04-09 15:16:11 -0400316}
317
318// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
319func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
Abhay Kumara61c5222025-11-10 07:32:50 +0000320 if kp.Time > 0 && kp.Time < internal.KeepaliveMinServerPingTime {
321 logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
322 kp.Time = internal.KeepaliveMinServerPingTime
William Kurkianea869482019-04-09 15:16:11 -0400323 }
324
Abhilash S.L3b494632019-07-16 15:51:09 +0530325 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400326 o.keepaliveParams = kp
Abhilash S.L3b494632019-07-16 15:51:09 +0530327 })
William Kurkianea869482019-04-09 15:16:11 -0400328}
329
330// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
331func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530332 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400333 o.keepalivePolicy = kep
Abhilash S.L3b494632019-07-16 15:51:09 +0530334 })
William Kurkianea869482019-04-09 15:16:11 -0400335}
336
337// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
338//
339// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
Abhay Kumara61c5222025-11-10 07:32:50 +0000340//
341// Deprecated: register codecs using encoding.RegisterCodec. The server will
342// automatically use registered codecs based on the incoming requests' headers.
343// See also
344// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
345// Will be supported throughout 1.x.
William Kurkianea869482019-04-09 15:16:11 -0400346func CustomCodec(codec Codec) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530347 return newFuncServerOption(func(o *serverOptions) {
Abhay Kumara61c5222025-11-10 07:32:50 +0000348 o.codec = newCodecV0Bridge(codec)
349 })
350}
351
352// ForceServerCodec returns a ServerOption that sets a codec for message
353// marshaling and unmarshaling.
354//
355// This will override any lookups by content-subtype for Codecs registered
356// with RegisterCodec.
357//
358// See Content-Type on
359// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
360// more details. Also see the documentation on RegisterCodec and
361// CallContentSubtype for more details on the interaction between encoding.Codec
362// and content-subtype.
363//
364// This function is provided for advanced users; prefer to register codecs
365// using encoding.RegisterCodec.
366// The server will automatically use registered codecs based on the incoming
367// requests' headers. See also
368// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
369// Will be supported throughout 1.x.
370//
371// # Experimental
372//
373// Notice: This API is EXPERIMENTAL and may be changed or removed in a
374// later release.
375func ForceServerCodec(codec encoding.Codec) ServerOption {
376 return newFuncServerOption(func(o *serverOptions) {
377 o.codec = newCodecV1Bridge(codec)
378 })
379}
380
381// ForceServerCodecV2 is the equivalent of ForceServerCodec, but for the new
382// CodecV2 interface.
383//
384// Will be supported throughout 1.x.
385//
386// # Experimental
387//
388// Notice: This API is EXPERIMENTAL and may be changed or removed in a
389// later release.
390func ForceServerCodecV2(codecV2 encoding.CodecV2) ServerOption {
391 return newFuncServerOption(func(o *serverOptions) {
392 o.codec = codecV2
Abhilash S.L3b494632019-07-16 15:51:09 +0530393 })
William Kurkianea869482019-04-09 15:16:11 -0400394}
395
396// RPCCompressor returns a ServerOption that sets a compressor for outbound
397// messages. For backward compatibility, all outbound messages will be sent
398// using this compressor, regardless of incoming message compression. By
399// default, server messages will be sent using the same compressor with which
400// request messages were sent.
401//
Abhay Kumara61c5222025-11-10 07:32:50 +0000402// Deprecated: use encoding.RegisterCompressor instead. Will be supported
403// throughout 1.x.
William Kurkianea869482019-04-09 15:16:11 -0400404func RPCCompressor(cp Compressor) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530405 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400406 o.cp = cp
Abhilash S.L3b494632019-07-16 15:51:09 +0530407 })
William Kurkianea869482019-04-09 15:16:11 -0400408}
409
410// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
411// messages. It has higher priority than decompressors registered via
412// encoding.RegisterCompressor.
413//
Abhay Kumara61c5222025-11-10 07:32:50 +0000414// Deprecated: use encoding.RegisterCompressor instead. Will be supported
415// throughout 1.x.
William Kurkianea869482019-04-09 15:16:11 -0400416func RPCDecompressor(dc Decompressor) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530417 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400418 o.dc = dc
Abhilash S.L3b494632019-07-16 15:51:09 +0530419 })
William Kurkianea869482019-04-09 15:16:11 -0400420}
421
422// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
423// If this is not set, gRPC uses the default limit.
424//
Abhay Kumara61c5222025-11-10 07:32:50 +0000425// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
William Kurkianea869482019-04-09 15:16:11 -0400426func MaxMsgSize(m int) ServerOption {
427 return MaxRecvMsgSize(m)
428}
429
430// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
431// If this is not set, gRPC uses the default 4MB.
432func MaxRecvMsgSize(m int) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530433 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400434 o.maxReceiveMessageSize = m
Abhilash S.L3b494632019-07-16 15:51:09 +0530435 })
William Kurkianea869482019-04-09 15:16:11 -0400436}
437
438// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
439// If this is not set, gRPC uses the default `math.MaxInt32`.
440func MaxSendMsgSize(m int) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530441 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400442 o.maxSendMessageSize = m
Abhilash S.L3b494632019-07-16 15:51:09 +0530443 })
William Kurkianea869482019-04-09 15:16:11 -0400444}
445
446// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
447// of concurrent streams to each ServerTransport.
448func MaxConcurrentStreams(n uint32) ServerOption {
Abhay Kumara61c5222025-11-10 07:32:50 +0000449 if n == 0 {
450 n = math.MaxUint32
451 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530452 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400453 o.maxConcurrentStreams = n
Abhilash S.L3b494632019-07-16 15:51:09 +0530454 })
William Kurkianea869482019-04-09 15:16:11 -0400455}
456
457// Creds returns a ServerOption that sets credentials for server connections.
458func Creds(c credentials.TransportCredentials) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530459 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400460 o.creds = c
Abhilash S.L3b494632019-07-16 15:51:09 +0530461 })
William Kurkianea869482019-04-09 15:16:11 -0400462}
463
464// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
465// server. Only one unary interceptor can be installed. The construction of multiple
466// interceptors (e.g., chaining) can be implemented at the caller.
467func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530468 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400469 if o.unaryInt != nil {
470 panic("The unary server interceptor was already set and may not be reset.")
471 }
472 o.unaryInt = i
Abhilash S.L3b494632019-07-16 15:51:09 +0530473 })
William Kurkianea869482019-04-09 15:16:11 -0400474}
475
Abhay Kumara61c5222025-11-10 07:32:50 +0000476// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
477// for unary RPCs. The first interceptor will be the outer most,
478// while the last interceptor will be the inner most wrapper around the real call.
479// All unary interceptors added by this method will be chained.
480func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
481 return newFuncServerOption(func(o *serverOptions) {
482 o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
483 })
484}
485
William Kurkianea869482019-04-09 15:16:11 -0400486// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
487// server. Only one stream interceptor can be installed.
488func StreamInterceptor(i StreamServerInterceptor) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530489 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400490 if o.streamInt != nil {
491 panic("The stream server interceptor was already set and may not be reset.")
492 }
493 o.streamInt = i
Abhilash S.L3b494632019-07-16 15:51:09 +0530494 })
William Kurkianea869482019-04-09 15:16:11 -0400495}
496
Abhay Kumara61c5222025-11-10 07:32:50 +0000497// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
498// for streaming RPCs. The first interceptor will be the outer most,
499// while the last interceptor will be the inner most wrapper around the real call.
500// All stream interceptors added by this method will be chained.
501func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
502 return newFuncServerOption(func(o *serverOptions) {
503 o.chainStreamInts = append(o.chainStreamInts, interceptors...)
504 })
505}
506
William Kurkianea869482019-04-09 15:16:11 -0400507// InTapHandle returns a ServerOption that sets the tap handle for all the server
508// transport to be created. Only one can be installed.
Abhay Kumara61c5222025-11-10 07:32:50 +0000509//
510// # Experimental
511//
512// Notice: This API is EXPERIMENTAL and may be changed or removed in a
513// later release.
William Kurkianea869482019-04-09 15:16:11 -0400514func InTapHandle(h tap.ServerInHandle) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530515 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400516 if o.inTapHandle != nil {
517 panic("The tap handle was already set and may not be reset.")
518 }
519 o.inTapHandle = h
Abhilash S.L3b494632019-07-16 15:51:09 +0530520 })
William Kurkianea869482019-04-09 15:16:11 -0400521}
522
523// StatsHandler returns a ServerOption that sets the stats handler for the server.
524func StatsHandler(h stats.Handler) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530525 return newFuncServerOption(func(o *serverOptions) {
Abhay Kumara61c5222025-11-10 07:32:50 +0000526 if h == nil {
527 logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
528 // Do not allow a nil stats handler, which would otherwise cause
529 // panics.
530 return
531 }
532 o.statsHandlers = append(o.statsHandlers, h)
533 })
534}
535
536// binaryLogger returns a ServerOption that can set the binary logger for the
537// server.
538func binaryLogger(bl binarylog.Logger) ServerOption {
539 return newFuncServerOption(func(o *serverOptions) {
540 o.binaryLogger = bl
Abhilash S.L3b494632019-07-16 15:51:09 +0530541 })
William Kurkianea869482019-04-09 15:16:11 -0400542}
543
544// UnknownServiceHandler returns a ServerOption that allows for adding a custom
545// unknown service handler. The provided method is a bidi-streaming RPC service
546// handler that will be invoked instead of returning the "unimplemented" gRPC
547// error whenever a request is received for an unregistered service or method.
Abhay Kumara61c5222025-11-10 07:32:50 +0000548// The handling function and stream interceptor (if set) have full access to
549// the ServerStream, including its Context.
William Kurkianea869482019-04-09 15:16:11 -0400550func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530551 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400552 o.unknownStreamDesc = &StreamDesc{
553 StreamName: "unknown_service_handler",
554 Handler: streamHandler,
555 // We need to assume that the users of the streamHandler will want to use both.
556 ClientStreams: true,
557 ServerStreams: true,
558 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530559 })
William Kurkianea869482019-04-09 15:16:11 -0400560}
561
562// ConnectionTimeout returns a ServerOption that sets the timeout for
563// connection establishment (up to and including HTTP/2 handshaking) for all
564// new connections. If this is not set, the default is 120 seconds. A zero or
565// negative value will result in an immediate timeout.
566//
Abhay Kumara61c5222025-11-10 07:32:50 +0000567// # Experimental
568//
569// Notice: This API is EXPERIMENTAL and may be changed or removed in a
570// later release.
William Kurkianea869482019-04-09 15:16:11 -0400571func ConnectionTimeout(d time.Duration) ServerOption {
Abhilash S.L3b494632019-07-16 15:51:09 +0530572 return newFuncServerOption(func(o *serverOptions) {
William Kurkianea869482019-04-09 15:16:11 -0400573 o.connectionTimeout = d
Abhilash S.L3b494632019-07-16 15:51:09 +0530574 })
William Kurkianea869482019-04-09 15:16:11 -0400575}
576
Abhay Kumara61c5222025-11-10 07:32:50 +0000577// MaxHeaderListSizeServerOption is a ServerOption that sets the max
578// (uncompressed) size of header list that the server is prepared to accept.
579type MaxHeaderListSizeServerOption struct {
580 MaxHeaderListSize uint32
581}
582
583func (o MaxHeaderListSizeServerOption) apply(so *serverOptions) {
584 so.maxHeaderListSize = &o.MaxHeaderListSize
585}
586
William Kurkianea869482019-04-09 15:16:11 -0400587// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
588// of header list that the server is prepared to accept.
589func MaxHeaderListSize(s uint32) ServerOption {
Abhay Kumara61c5222025-11-10 07:32:50 +0000590 return MaxHeaderListSizeServerOption{
591 MaxHeaderListSize: s,
592 }
William Kurkianea869482019-04-09 15:16:11 -0400593}
594
Devmalya Pauldd23a992019-11-14 07:06:31 +0000595// HeaderTableSize returns a ServerOption that sets the size of dynamic
596// header table for stream.
597//
Abhay Kumara61c5222025-11-10 07:32:50 +0000598// # Experimental
599//
600// Notice: This API is EXPERIMENTAL and may be changed or removed in a
601// later release.
Devmalya Pauldd23a992019-11-14 07:06:31 +0000602func HeaderTableSize(s uint32) ServerOption {
603 return newFuncServerOption(func(o *serverOptions) {
604 o.headerTableSize = &s
605 })
606}
607
Abhay Kumara61c5222025-11-10 07:32:50 +0000608// NumStreamWorkers returns a ServerOption that sets the number of worker
609// goroutines that should be used to process incoming streams. Setting this to
610// zero (default) will disable workers and spawn a new goroutine for each
611// stream.
612//
613// # Experimental
614//
615// Notice: This API is EXPERIMENTAL and may be changed or removed in a
616// later release.
617func NumStreamWorkers(numServerWorkers uint32) ServerOption {
618 // TODO: If/when this API gets stabilized (i.e. stream workers become the
619 // only way streams are processed), change the behavior of the zero value to
620 // a sane default. Preliminary experiments suggest that a value equal to the
621 // number of CPUs available is most performant; requires thorough testing.
622 return newFuncServerOption(func(o *serverOptions) {
623 o.numServerWorkers = numServerWorkers
624 })
625}
626
627// WaitForHandlers cause Stop to wait until all outstanding method handlers have
628// exited before returning. If false, Stop will return as soon as all
629// connections have closed, but method handlers may still be running. By
630// default, Stop does not wait for method handlers to return.
631//
632// # Experimental
633//
634// Notice: This API is EXPERIMENTAL and may be changed or removed in a
635// later release.
636func WaitForHandlers(w bool) ServerOption {
637 return newFuncServerOption(func(o *serverOptions) {
638 o.waitForHandlers = w
639 })
640}
641
642func bufferPool(bufferPool mem.BufferPool) ServerOption {
643 return newFuncServerOption(func(o *serverOptions) {
644 o.bufferPool = bufferPool
645 })
646}
647
648// serverWorkerResetThreshold defines how often the stack must be reset. Every
649// N requests, by spawning a new goroutine in its place, a worker can reset its
650// stack so that large stacks don't live in memory forever. 2^16 should allow
651// each goroutine stack to live for at least a few seconds in a typical
652// workload (assuming a QPS of a few thousand requests/sec).
653const serverWorkerResetThreshold = 1 << 16
654
655// serverWorker blocks on a *transport.ServerStream channel forever and waits
656// for data to be fed by serveStreams. This allows multiple requests to be
657// processed by the same goroutine, removing the need for expensive stack
658// re-allocations (see the runtime.morestack problem [1]).
659//
660// [1] https://github.com/golang/go/issues/18138
661func (s *Server) serverWorker() {
662 for completed := 0; completed < serverWorkerResetThreshold; completed++ {
663 f, ok := <-s.serverWorkerChannel
664 if !ok {
665 return
666 }
667 f()
668 }
669 go s.serverWorker()
670}
671
672// initServerWorkers creates worker goroutines and a channel to process incoming
673// connections to reduce the time spent overall on runtime.morestack.
674func (s *Server) initServerWorkers() {
675 s.serverWorkerChannel = make(chan func())
676 s.serverWorkerChannelClose = sync.OnceFunc(func() {
677 close(s.serverWorkerChannel)
678 })
679 for i := uint32(0); i < s.opts.numServerWorkers; i++ {
680 go s.serverWorker()
681 }
682}
683
William Kurkianea869482019-04-09 15:16:11 -0400684// NewServer creates a gRPC server which has no service registered and has not
685// started to accept requests yet.
686func NewServer(opt ...ServerOption) *Server {
687 opts := defaultServerOptions
Abhay Kumara61c5222025-11-10 07:32:50 +0000688 for _, o := range globalServerOptions {
689 o.apply(&opts)
690 }
William Kurkianea869482019-04-09 15:16:11 -0400691 for _, o := range opt {
Abhilash S.L3b494632019-07-16 15:51:09 +0530692 o.apply(&opts)
William Kurkianea869482019-04-09 15:16:11 -0400693 }
694 s := &Server{
Abhay Kumara61c5222025-11-10 07:32:50 +0000695 lis: make(map[net.Listener]bool),
696 opts: opts,
697 conns: make(map[string]map[transport.ServerTransport]bool),
698 services: make(map[string]*serviceInfo),
699 quit: grpcsync.NewEvent(),
700 done: grpcsync.NewEvent(),
701 channelz: channelz.RegisterServer(""),
William Kurkianea869482019-04-09 15:16:11 -0400702 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000703 chainUnaryServerInterceptors(s)
704 chainStreamServerInterceptors(s)
William Kurkianea869482019-04-09 15:16:11 -0400705 s.cv = sync.NewCond(&s.mu)
706 if EnableTracing {
707 _, file, line, _ := runtime.Caller(1)
Abhay Kumara61c5222025-11-10 07:32:50 +0000708 s.events = newTraceEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
William Kurkianea869482019-04-09 15:16:11 -0400709 }
710
Abhay Kumara61c5222025-11-10 07:32:50 +0000711 if s.opts.numServerWorkers > 0 {
712 s.initServerWorkers()
William Kurkianea869482019-04-09 15:16:11 -0400713 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000714
715 channelz.Info(logger, s.channelz, "Server created")
William Kurkianea869482019-04-09 15:16:11 -0400716 return s
717}
718
719// printf records an event in s's event log, unless s has been stopped.
720// REQUIRES s.mu is held.
Abhay Kumara61c5222025-11-10 07:32:50 +0000721func (s *Server) printf(format string, a ...any) {
William Kurkianea869482019-04-09 15:16:11 -0400722 if s.events != nil {
723 s.events.Printf(format, a...)
724 }
725}
726
727// errorf records an error in s's event log, unless s has been stopped.
728// REQUIRES s.mu is held.
Abhay Kumara61c5222025-11-10 07:32:50 +0000729func (s *Server) errorf(format string, a ...any) {
William Kurkianea869482019-04-09 15:16:11 -0400730 if s.events != nil {
731 s.events.Errorf(format, a...)
732 }
733}
734
Abhay Kumara61c5222025-11-10 07:32:50 +0000735// ServiceRegistrar wraps a single method that supports service registration. It
736// enables users to pass concrete types other than grpc.Server to the service
737// registration methods exported by the IDL generated code.
738type ServiceRegistrar interface {
739 // RegisterService registers a service and its implementation to the
740 // concrete type implementing this interface. It may not be called
741 // once the server has started serving.
742 // desc describes the service and its methods and handlers. impl is the
743 // service implementation which is passed to the method handlers.
744 RegisterService(desc *ServiceDesc, impl any)
745}
746
William Kurkianea869482019-04-09 15:16:11 -0400747// RegisterService registers a service and its implementation to the gRPC
748// server. It is called from the IDL generated code. This must be called before
Abhay Kumara61c5222025-11-10 07:32:50 +0000749// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
750// ensure it implements sd.HandlerType.
751func (s *Server) RegisterService(sd *ServiceDesc, ss any) {
752 if ss != nil {
753 ht := reflect.TypeOf(sd.HandlerType).Elem()
754 st := reflect.TypeOf(ss)
755 if !st.Implements(ht) {
756 logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
757 }
William Kurkianea869482019-04-09 15:16:11 -0400758 }
759 s.register(sd, ss)
760}
761
Abhay Kumara61c5222025-11-10 07:32:50 +0000762func (s *Server) register(sd *ServiceDesc, ss any) {
William Kurkianea869482019-04-09 15:16:11 -0400763 s.mu.Lock()
764 defer s.mu.Unlock()
765 s.printf("RegisterService(%q)", sd.ServiceName)
766 if s.serve {
Abhay Kumara61c5222025-11-10 07:32:50 +0000767 logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
William Kurkianea869482019-04-09 15:16:11 -0400768 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000769 if _, ok := s.services[sd.ServiceName]; ok {
770 logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
William Kurkianea869482019-04-09 15:16:11 -0400771 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000772 info := &serviceInfo{
773 serviceImpl: ss,
774 methods: make(map[string]*MethodDesc),
775 streams: make(map[string]*StreamDesc),
776 mdata: sd.Metadata,
William Kurkianea869482019-04-09 15:16:11 -0400777 }
778 for i := range sd.Methods {
779 d := &sd.Methods[i]
Abhay Kumara61c5222025-11-10 07:32:50 +0000780 info.methods[d.MethodName] = d
William Kurkianea869482019-04-09 15:16:11 -0400781 }
782 for i := range sd.Streams {
783 d := &sd.Streams[i]
Abhay Kumara61c5222025-11-10 07:32:50 +0000784 info.streams[d.StreamName] = d
William Kurkianea869482019-04-09 15:16:11 -0400785 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000786 s.services[sd.ServiceName] = info
William Kurkianea869482019-04-09 15:16:11 -0400787}
788
789// MethodInfo contains the information of an RPC including its method name and type.
790type MethodInfo struct {
791 // Name is the method name only, without the service name or package name.
792 Name string
793 // IsClientStream indicates whether the RPC is a client streaming RPC.
794 IsClientStream bool
795 // IsServerStream indicates whether the RPC is a server streaming RPC.
796 IsServerStream bool
797}
798
799// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
800type ServiceInfo struct {
801 Methods []MethodInfo
802 // Metadata is the metadata specified in ServiceDesc when registering service.
Abhay Kumara61c5222025-11-10 07:32:50 +0000803 Metadata any
William Kurkianea869482019-04-09 15:16:11 -0400804}
805
806// GetServiceInfo returns a map from service names to ServiceInfo.
807// Service names include the package names, in the form of <package>.<service>.
808func (s *Server) GetServiceInfo() map[string]ServiceInfo {
809 ret := make(map[string]ServiceInfo)
Abhay Kumara61c5222025-11-10 07:32:50 +0000810 for n, srv := range s.services {
811 methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
812 for m := range srv.methods {
William Kurkianea869482019-04-09 15:16:11 -0400813 methods = append(methods, MethodInfo{
814 Name: m,
815 IsClientStream: false,
816 IsServerStream: false,
817 })
818 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000819 for m, d := range srv.streams {
William Kurkianea869482019-04-09 15:16:11 -0400820 methods = append(methods, MethodInfo{
821 Name: m,
822 IsClientStream: d.ClientStreams,
823 IsServerStream: d.ServerStreams,
824 })
825 }
826
827 ret[n] = ServiceInfo{
828 Methods: methods,
829 Metadata: srv.mdata,
830 }
831 }
832 return ret
833}
834
835// ErrServerStopped indicates that the operation is now illegal because of
836// the server being stopped.
837var ErrServerStopped = errors.New("grpc: the server has been stopped")
838
William Kurkianea869482019-04-09 15:16:11 -0400839type listenSocket struct {
840 net.Listener
Abhay Kumara61c5222025-11-10 07:32:50 +0000841 channelz *channelz.Socket
William Kurkianea869482019-04-09 15:16:11 -0400842}
843
844func (l *listenSocket) Close() error {
845 err := l.Listener.Close()
Abhay Kumara61c5222025-11-10 07:32:50 +0000846 channelz.RemoveEntry(l.channelz.ID)
847 channelz.Info(logger, l.channelz, "ListenSocket deleted")
William Kurkianea869482019-04-09 15:16:11 -0400848 return err
849}
850
851// Serve accepts incoming connections on the listener lis, creating a new
852// ServerTransport and service goroutine for each. The service goroutines
853// read gRPC requests and then call the registered handlers to reply to them.
854// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
855// this method returns.
856// Serve will return a non-nil error unless Stop or GracefulStop is called.
Abhay Kumara61c5222025-11-10 07:32:50 +0000857//
858// Note: All supported releases of Go (as of December 2023) override the OS
859// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
860// with OS defaults for keepalive time and interval, callers need to do the
861// following two things:
862// - pass a net.Listener created by calling the Listen method on a
863// net.ListenConfig with the `KeepAlive` field set to a negative value. This
864// will result in the Go standard library not overriding OS defaults for TCP
865// keepalive interval and time. But this will also result in the Go standard
866// library not enabling TCP keepalives by default.
867// - override the Accept method on the passed in net.Listener and set the
868// SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
William Kurkianea869482019-04-09 15:16:11 -0400869func (s *Server) Serve(lis net.Listener) error {
870 s.mu.Lock()
871 s.printf("serving")
872 s.serve = true
873 if s.lis == nil {
874 // Serve called after Stop or GracefulStop.
875 s.mu.Unlock()
876 lis.Close()
877 return ErrServerStopped
878 }
879
880 s.serveWG.Add(1)
881 defer func() {
882 s.serveWG.Done()
David Bainbridge788e5202019-10-21 18:49:40 +0000883 if s.quit.HasFired() {
884 // Stop or GracefulStop called; block until done and return nil.
885 <-s.done.Done()
William Kurkianea869482019-04-09 15:16:11 -0400886 }
887 }()
888
Abhay Kumara61c5222025-11-10 07:32:50 +0000889 ls := &listenSocket{
890 Listener: lis,
891 channelz: channelz.RegisterSocket(&channelz.Socket{
892 SocketType: channelz.SocketTypeListen,
893 Parent: s.channelz,
894 RefName: lis.Addr().String(),
895 LocalAddr: lis.Addr(),
896 SocketOptions: channelz.GetSocketOption(lis)},
897 ),
William Kurkianea869482019-04-09 15:16:11 -0400898 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000899 s.lis[ls] = true
William Kurkianea869482019-04-09 15:16:11 -0400900
901 defer func() {
902 s.mu.Lock()
903 if s.lis != nil && s.lis[ls] {
904 ls.Close()
905 delete(s.lis, ls)
906 }
907 s.mu.Unlock()
908 }()
909
Abhay Kumara61c5222025-11-10 07:32:50 +0000910 s.mu.Unlock()
911 channelz.Info(logger, ls.channelz, "ListenSocket created")
William Kurkianea869482019-04-09 15:16:11 -0400912
Abhay Kumara61c5222025-11-10 07:32:50 +0000913 var tempDelay time.Duration // how long to sleep on accept failure
William Kurkianea869482019-04-09 15:16:11 -0400914 for {
915 rawConn, err := lis.Accept()
916 if err != nil {
917 if ne, ok := err.(interface {
918 Temporary() bool
919 }); ok && ne.Temporary() {
920 if tempDelay == 0 {
921 tempDelay = 5 * time.Millisecond
922 } else {
923 tempDelay *= 2
924 }
925 if max := 1 * time.Second; tempDelay > max {
926 tempDelay = max
927 }
928 s.mu.Lock()
929 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
930 s.mu.Unlock()
931 timer := time.NewTimer(tempDelay)
932 select {
933 case <-timer.C:
David Bainbridge788e5202019-10-21 18:49:40 +0000934 case <-s.quit.Done():
William Kurkianea869482019-04-09 15:16:11 -0400935 timer.Stop()
936 return nil
937 }
938 continue
939 }
940 s.mu.Lock()
941 s.printf("done serving; Accept = %v", err)
942 s.mu.Unlock()
943
David Bainbridge788e5202019-10-21 18:49:40 +0000944 if s.quit.HasFired() {
William Kurkianea869482019-04-09 15:16:11 -0400945 return nil
William Kurkianea869482019-04-09 15:16:11 -0400946 }
947 return err
948 }
949 tempDelay = 0
950 // Start a new goroutine to deal with rawConn so we don't stall this Accept
951 // loop goroutine.
952 //
953 // Make sure we account for the goroutine so GracefulStop doesn't nil out
954 // s.conns before this conn can be added.
955 s.serveWG.Add(1)
956 go func() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000957 s.handleRawConn(lis.Addr().String(), rawConn)
William Kurkianea869482019-04-09 15:16:11 -0400958 s.serveWG.Done()
959 }()
960 }
961}
962
963// handleRawConn forks a goroutine to handle a just-accepted connection that
964// has not had any I/O performed on it yet.
Abhay Kumara61c5222025-11-10 07:32:50 +0000965func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
David Bainbridge788e5202019-10-21 18:49:40 +0000966 if s.quit.HasFired() {
967 rawConn.Close()
968 return
969 }
William Kurkianea869482019-04-09 15:16:11 -0400970 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
William Kurkianea869482019-04-09 15:16:11 -0400971
William Kurkianea869482019-04-09 15:16:11 -0400972 // Finish handshaking (HTTP2)
Abhay Kumara61c5222025-11-10 07:32:50 +0000973 st := s.newHTTP2Transport(rawConn)
974 rawConn.SetDeadline(time.Time{})
William Kurkianea869482019-04-09 15:16:11 -0400975 if st == nil {
976 return
977 }
978
Abhay Kumara61c5222025-11-10 07:32:50 +0000979 if cc, ok := rawConn.(interface {
980 PassServerTransport(transport.ServerTransport)
981 }); ok {
982 cc.PassServerTransport(st)
983 }
984
985 if !s.addConn(lisAddr, st) {
William Kurkianea869482019-04-09 15:16:11 -0400986 return
987 }
988 go func() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000989 s.serveStreams(context.Background(), st, rawConn)
990 s.removeConn(lisAddr, st)
William Kurkianea869482019-04-09 15:16:11 -0400991 }()
992}
993
994// newHTTP2Transport sets up a http/2 transport (using the
995// gRPC http2 server transport in transport/http2_server.go).
Abhay Kumara61c5222025-11-10 07:32:50 +0000996func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
William Kurkianea869482019-04-09 15:16:11 -0400997 config := &transport.ServerConfig{
998 MaxStreams: s.opts.maxConcurrentStreams,
Abhay Kumara61c5222025-11-10 07:32:50 +0000999 ConnectionTimeout: s.opts.connectionTimeout,
1000 Credentials: s.opts.creds,
William Kurkianea869482019-04-09 15:16:11 -04001001 InTapHandle: s.opts.inTapHandle,
Abhay Kumara61c5222025-11-10 07:32:50 +00001002 StatsHandlers: s.opts.statsHandlers,
William Kurkianea869482019-04-09 15:16:11 -04001003 KeepaliveParams: s.opts.keepaliveParams,
1004 KeepalivePolicy: s.opts.keepalivePolicy,
1005 InitialWindowSize: s.opts.initialWindowSize,
1006 InitialConnWindowSize: s.opts.initialConnWindowSize,
1007 WriteBufferSize: s.opts.writeBufferSize,
1008 ReadBufferSize: s.opts.readBufferSize,
Abhay Kumara61c5222025-11-10 07:32:50 +00001009 SharedWriteBuffer: s.opts.sharedWriteBuffer,
1010 ChannelzParent: s.channelz,
William Kurkianea869482019-04-09 15:16:11 -04001011 MaxHeaderListSize: s.opts.maxHeaderListSize,
Devmalya Pauldd23a992019-11-14 07:06:31 +00001012 HeaderTableSize: s.opts.headerTableSize,
Abhay Kumara61c5222025-11-10 07:32:50 +00001013 BufferPool: s.opts.bufferPool,
1014 StaticWindowSize: s.opts.staticWindowSize,
William Kurkianea869482019-04-09 15:16:11 -04001015 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001016 st, err := transport.NewServerTransport(c, config)
William Kurkianea869482019-04-09 15:16:11 -04001017 if err != nil {
1018 s.mu.Lock()
1019 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
1020 s.mu.Unlock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001021 // ErrConnDispatched means that the connection was dispatched away from
1022 // gRPC; those connections should be left open.
1023 if err != credentials.ErrConnDispatched {
1024 // Don't log on ErrConnDispatched and io.EOF to prevent log spam.
1025 if err != io.EOF {
1026 channelz.Info(logger, s.channelz, "grpc: Server.Serve failed to create ServerTransport: ", err)
1027 }
1028 c.Close()
1029 }
William Kurkianea869482019-04-09 15:16:11 -04001030 return nil
1031 }
1032
1033 return st
1034}
1035
Abhay Kumara61c5222025-11-10 07:32:50 +00001036func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) {
1037 ctx = transport.SetConnection(ctx, rawConn)
1038 ctx = peer.NewContext(ctx, st.Peer())
1039 for _, sh := range s.opts.statsHandlers {
1040 ctx = sh.TagConn(ctx, &stats.ConnTagInfo{
1041 RemoteAddr: st.Peer().Addr,
1042 LocalAddr: st.Peer().LocalAddr,
1043 })
1044 sh.HandleConn(ctx, &stats.ConnBegin{})
1045 }
1046
1047 defer func() {
1048 st.Close(errors.New("finished serving streams for the server transport"))
1049 for _, sh := range s.opts.statsHandlers {
1050 sh.HandleConn(ctx, &stats.ConnEnd{})
William Kurkianea869482019-04-09 15:16:11 -04001051 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001052 }()
1053
1054 streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
1055 st.HandleStreams(ctx, func(stream *transport.ServerStream) {
1056 s.handlersWG.Add(1)
1057 streamQuota.acquire()
1058 f := func() {
1059 defer streamQuota.release()
1060 defer s.handlersWG.Done()
1061 s.handleStream(st, stream)
1062 }
1063
1064 if s.opts.numServerWorkers > 0 {
1065 select {
1066 case s.serverWorkerChannel <- f:
1067 return
1068 default:
1069 // If all stream workers are busy, fallback to the default code path.
1070 }
1071 }
1072 go f()
William Kurkianea869482019-04-09 15:16:11 -04001073 })
William Kurkianea869482019-04-09 15:16:11 -04001074}
1075
1076var _ http.Handler = (*Server)(nil)
1077
1078// ServeHTTP implements the Go standard library's http.Handler
1079// interface by responding to the gRPC request r, by looking up
1080// the requested gRPC method in the gRPC server s.
1081//
1082// The provided HTTP request must have arrived on an HTTP/2
1083// connection. When using the Go standard library's server,
1084// practically this means that the Request must also have arrived
1085// over TLS.
1086//
1087// To share one port (such as 443 for https) between gRPC and an
1088// existing http.Handler, use a root http.Handler such as:
1089//
Abhay Kumara61c5222025-11-10 07:32:50 +00001090// if r.ProtoMajor == 2 && strings.HasPrefix(
1091// r.Header.Get("Content-Type"), "application/grpc") {
1092// grpcServer.ServeHTTP(w, r)
1093// } else {
1094// yourMux.ServeHTTP(w, r)
1095// }
William Kurkianea869482019-04-09 15:16:11 -04001096//
1097// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
1098// separate from grpc-go's HTTP/2 server. Performance and features may vary
1099// between the two paths. ServeHTTP does not support some gRPC features
Abhay Kumara61c5222025-11-10 07:32:50 +00001100// available through grpc-go's HTTP/2 server.
1101//
1102// # Experimental
1103//
1104// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1105// later release.
William Kurkianea869482019-04-09 15:16:11 -04001106func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Abhay Kumara61c5222025-11-10 07:32:50 +00001107 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers, s.opts.bufferPool)
William Kurkianea869482019-04-09 15:16:11 -04001108 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001109 // Errors returned from transport.NewServerHandlerTransport have
1110 // already been written to w.
William Kurkianea869482019-04-09 15:16:11 -04001111 return
1112 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001113 if !s.addConn(listenerAddressForServeHTTP, st) {
William Kurkianea869482019-04-09 15:16:11 -04001114 return
1115 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001116 defer s.removeConn(listenerAddressForServeHTTP, st)
1117 s.serveStreams(r.Context(), st, nil)
William Kurkianea869482019-04-09 15:16:11 -04001118}
1119
Abhay Kumara61c5222025-11-10 07:32:50 +00001120func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
William Kurkianea869482019-04-09 15:16:11 -04001121 s.mu.Lock()
1122 defer s.mu.Unlock()
1123 if s.conns == nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001124 st.Close(errors.New("Server.addConn called when server has already been stopped"))
William Kurkianea869482019-04-09 15:16:11 -04001125 return false
1126 }
1127 if s.drain {
1128 // Transport added after we drained our existing conns: drain it
1129 // immediately.
Abhay Kumara61c5222025-11-10 07:32:50 +00001130 st.Drain("")
William Kurkianea869482019-04-09 15:16:11 -04001131 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001132
1133 if s.conns[addr] == nil {
1134 // Create a map entry if this is the first connection on this listener.
1135 s.conns[addr] = make(map[transport.ServerTransport]bool)
1136 }
1137 s.conns[addr][st] = true
William Kurkianea869482019-04-09 15:16:11 -04001138 return true
1139}
1140
Abhay Kumara61c5222025-11-10 07:32:50 +00001141func (s *Server) removeConn(addr string, st transport.ServerTransport) {
William Kurkianea869482019-04-09 15:16:11 -04001142 s.mu.Lock()
1143 defer s.mu.Unlock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001144
1145 conns := s.conns[addr]
1146 if conns != nil {
1147 delete(conns, st)
1148 if len(conns) == 0 {
1149 // If the last connection for this address is being removed, also
1150 // remove the map entry corresponding to the address. This is used
1151 // in GracefulStop() when waiting for all connections to be closed.
1152 delete(s.conns, addr)
1153 }
William Kurkianea869482019-04-09 15:16:11 -04001154 s.cv.Broadcast()
1155 }
1156}
1157
William Kurkianea869482019-04-09 15:16:11 -04001158func (s *Server) incrCallsStarted() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001159 s.channelz.ServerMetrics.CallsStarted.Add(1)
1160 s.channelz.ServerMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
William Kurkianea869482019-04-09 15:16:11 -04001161}
1162
1163func (s *Server) incrCallsSucceeded() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001164 s.channelz.ServerMetrics.CallsSucceeded.Add(1)
William Kurkianea869482019-04-09 15:16:11 -04001165}
1166
1167func (s *Server) incrCallsFailed() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001168 s.channelz.ServerMetrics.CallsFailed.Add(1)
William Kurkianea869482019-04-09 15:16:11 -04001169}
1170
Abhay Kumara61c5222025-11-10 07:32:50 +00001171func (s *Server) sendResponse(ctx context.Context, stream *transport.ServerStream, msg any, cp Compressor, opts *transport.WriteOptions, comp encoding.Compressor) error {
William Kurkianea869482019-04-09 15:16:11 -04001172 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
1173 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001174 channelz.Error(logger, s.channelz, "grpc: server failed to encode response: ", err)
William Kurkianea869482019-04-09 15:16:11 -04001175 return err
1176 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001177
1178 compData, pf, err := compress(data, cp, comp, s.opts.bufferPool)
William Kurkianea869482019-04-09 15:16:11 -04001179 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001180 data.Free()
1181 channelz.Error(logger, s.channelz, "grpc: server failed to compress response: ", err)
William Kurkianea869482019-04-09 15:16:11 -04001182 return err
1183 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001184
1185 hdr, payload := msgHeader(data, compData, pf)
1186
1187 defer func() {
1188 compData.Free()
1189 data.Free()
1190 // payload does not need to be freed here, it is either data or compData, both of
1191 // which are already freed.
1192 }()
1193
1194 dataLen := data.Len()
1195 payloadLen := payload.Len()
William Kurkianea869482019-04-09 15:16:11 -04001196 // TODO(dfawley): should we be checking len(data) instead?
Abhay Kumara61c5222025-11-10 07:32:50 +00001197 if payloadLen > s.opts.maxSendMessageSize {
1198 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", payloadLen, s.opts.maxSendMessageSize)
William Kurkianea869482019-04-09 15:16:11 -04001199 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001200 err = stream.Write(hdr, payload, opts)
1201 if err == nil {
1202 if len(s.opts.statsHandlers) != 0 {
1203 for _, sh := range s.opts.statsHandlers {
1204 sh.HandleRPC(ctx, outPayload(false, msg, dataLen, payloadLen, time.Now()))
1205 }
1206 }
William Kurkianea869482019-04-09 15:16:11 -04001207 }
1208 return err
1209}
1210
Abhay Kumara61c5222025-11-10 07:32:50 +00001211// chainUnaryServerInterceptors chains all unary server interceptors into one.
1212func chainUnaryServerInterceptors(s *Server) {
1213 // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
1214 // be executed before any other chained interceptors.
1215 interceptors := s.opts.chainUnaryInts
1216 if s.opts.unaryInt != nil {
1217 interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
William Kurkianea869482019-04-09 15:16:11 -04001218 }
1219
Abhay Kumara61c5222025-11-10 07:32:50 +00001220 var chainedInt UnaryServerInterceptor
1221 if len(interceptors) == 0 {
1222 chainedInt = nil
1223 } else if len(interceptors) == 1 {
1224 chainedInt = interceptors[0]
1225 } else {
1226 chainedInt = chainUnaryInterceptors(interceptors)
1227 }
1228
1229 s.opts.unaryInt = chainedInt
1230}
1231
1232func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
1233 return func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (any, error) {
1234 return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
1235 }
1236}
1237
1238func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
1239 if curr == len(interceptors)-1 {
1240 return finalHandler
1241 }
1242 return func(ctx context.Context, req any) (any, error) {
1243 return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
1244 }
1245}
1246
1247func (s *Server) processUnaryRPC(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
1248 shs := s.opts.statsHandlers
1249 if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
1250 if channelz.IsOn() {
1251 s.incrCallsStarted()
1252 }
1253 var statsBegin *stats.Begin
1254 for _, sh := range shs {
1255 beginTime := time.Now()
1256 statsBegin = &stats.Begin{
1257 BeginTime: beginTime,
1258 IsClientStream: false,
1259 IsServerStream: false,
1260 }
1261 sh.HandleRPC(ctx, statsBegin)
1262 }
1263 if trInfo != nil {
1264 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1265 }
1266 // The deferred error handling for tracing, stats handler and channelz are
1267 // combined into one function to reduce stack usage -- a defer takes ~56-64
1268 // bytes on the stack, so overflowing the stack will require a stack
1269 // re-allocation, which is expensive.
1270 //
1271 // To maintain behavior similar to separate deferred statements, statements
1272 // should be executed in the reverse order. That is, tracing first, stats
1273 // handler second, and channelz last. Note that panics *within* defers will
1274 // lead to different behavior, but that's an acceptable compromise; that
1275 // would be undefined behavior territory anyway.
1276 defer func() {
1277 if trInfo != nil {
1278 if err != nil && err != io.EOF {
1279 trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1280 trInfo.tr.SetError()
1281 }
1282 trInfo.tr.Finish()
1283 }
1284
1285 for _, sh := range shs {
1286 end := &stats.End{
1287 BeginTime: statsBegin.BeginTime,
1288 EndTime: time.Now(),
1289 }
1290 if err != nil && err != io.EOF {
1291 end.Error = toRPCErr(err)
1292 }
1293 sh.HandleRPC(ctx, end)
1294 }
1295
1296 if channelz.IsOn() {
1297 if err != nil && err != io.EOF {
1298 s.incrCallsFailed()
1299 } else {
1300 s.incrCallsSucceeded()
1301 }
1302 }
1303 }()
1304 }
1305 var binlogs []binarylog.MethodLogger
1306 if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1307 binlogs = append(binlogs, ml)
1308 }
1309 if s.opts.binaryLogger != nil {
1310 if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1311 binlogs = append(binlogs, ml)
1312 }
1313 }
1314 if len(binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001315 md, _ := metadata.FromIncomingContext(ctx)
1316 logEntry := &binarylog.ClientHeader{
1317 Header: md,
1318 MethodName: stream.Method(),
1319 PeerAddr: nil,
1320 }
1321 if deadline, ok := ctx.Deadline(); ok {
1322 logEntry.Timeout = time.Until(deadline)
1323 if logEntry.Timeout < 0 {
1324 logEntry.Timeout = 0
1325 }
1326 }
1327 if a := md[":authority"]; len(a) > 0 {
1328 logEntry.Authority = a[0]
1329 }
1330 if peer, ok := peer.FromContext(ctx); ok {
1331 logEntry.PeerAddr = peer.Addr
1332 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001333 for _, binlog := range binlogs {
1334 binlog.Log(ctx, logEntry)
1335 }
William Kurkianea869482019-04-09 15:16:11 -04001336 }
1337
1338 // comp and cp are used for compression. decomp and dc are used for
1339 // decompression. If comp and decomp are both set, they are the same;
1340 // however they are kept separate to ensure that at most one of the
1341 // compressor/decompressor variable pairs are set for use later.
1342 var comp, decomp encoding.Compressor
1343 var cp Compressor
1344 var dc Decompressor
Abhay Kumara61c5222025-11-10 07:32:50 +00001345 var sendCompressorName string
William Kurkianea869482019-04-09 15:16:11 -04001346
1347 // If dc is set and matches the stream's compression, use it. Otherwise, try
1348 // to find a matching registered compressor for decomp.
1349 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1350 dc = s.opts.dc
1351 } else if rc != "" && rc != encoding.Identity {
1352 decomp = encoding.GetCompressor(rc)
1353 if decomp == nil {
1354 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
Abhay Kumara61c5222025-11-10 07:32:50 +00001355 stream.WriteStatus(st)
William Kurkianea869482019-04-09 15:16:11 -04001356 return st.Err()
1357 }
1358 }
1359
1360 // If cp is set, use it. Otherwise, attempt to compress the response using
1361 // the incoming message compression method.
1362 //
1363 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1364 if s.opts.cp != nil {
1365 cp = s.opts.cp
Abhay Kumara61c5222025-11-10 07:32:50 +00001366 sendCompressorName = cp.Type()
William Kurkianea869482019-04-09 15:16:11 -04001367 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1368 // Legacy compressor not specified; attempt to respond with same encoding.
1369 comp = encoding.GetCompressor(rc)
1370 if comp != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001371 sendCompressorName = comp.Name()
1372 }
1373 }
1374
1375 if sendCompressorName != "" {
1376 if err := stream.SetSendCompress(sendCompressorName); err != nil {
1377 return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
William Kurkianea869482019-04-09 15:16:11 -04001378 }
1379 }
1380
1381 var payInfo *payloadInfo
Abhay Kumara61c5222025-11-10 07:32:50 +00001382 if len(shs) != 0 || len(binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001383 payInfo = &payloadInfo{}
Abhay Kumara61c5222025-11-10 07:32:50 +00001384 defer payInfo.free()
William Kurkianea869482019-04-09 15:16:11 -04001385 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001386
1387 d, err := recvAndDecompress(&parser{r: stream, bufferPool: s.opts.bufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp, true)
William Kurkianea869482019-04-09 15:16:11 -04001388 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001389 if e := stream.WriteStatus(status.Convert(err)); e != nil {
1390 channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
William Kurkianea869482019-04-09 15:16:11 -04001391 }
1392 return err
1393 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001394 freed := false
1395 dataFree := func() {
1396 if !freed {
1397 d.Free()
1398 freed = true
1399 }
William Kurkianea869482019-04-09 15:16:11 -04001400 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001401 defer dataFree()
1402 df := func(v any) error {
1403 defer dataFree()
William Kurkianea869482019-04-09 15:16:11 -04001404 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1405 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1406 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001407
1408 for _, sh := range shs {
1409 sh.HandleRPC(ctx, &stats.InPayload{
1410 RecvTime: time.Now(),
1411 Payload: v,
1412 Length: d.Len(),
1413 WireLength: payInfo.compressedLength + headerLen,
1414 CompressedLength: payInfo.compressedLength,
William Kurkianea869482019-04-09 15:16:11 -04001415 })
1416 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001417 if len(binlogs) != 0 {
1418 cm := &binarylog.ClientMessage{
1419 Message: d.Materialize(),
1420 }
1421 for _, binlog := range binlogs {
1422 binlog.Log(ctx, cm)
1423 }
William Kurkianea869482019-04-09 15:16:11 -04001424 }
1425 if trInfo != nil {
1426 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1427 }
1428 return nil
1429 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001430 ctx = NewContextWithServerTransportStream(ctx, stream)
1431 reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
William Kurkianea869482019-04-09 15:16:11 -04001432 if appErr != nil {
1433 appStatus, ok := status.FromError(appErr)
1434 if !ok {
Abhay Kumara61c5222025-11-10 07:32:50 +00001435 // Convert non-status application error to a status error with code
1436 // Unknown, but handle context errors specifically.
1437 appStatus = status.FromContextError(appErr)
1438 appErr = appStatus.Err()
William Kurkianea869482019-04-09 15:16:11 -04001439 }
1440 if trInfo != nil {
1441 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1442 trInfo.tr.SetError()
1443 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001444 if e := stream.WriteStatus(appStatus); e != nil {
1445 channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
William Kurkianea869482019-04-09 15:16:11 -04001446 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001447 if len(binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001448 if h, _ := stream.Header(); h.Len() > 0 {
1449 // Only log serverHeader if there was header. Otherwise it can
1450 // be trailer only.
Abhay Kumara61c5222025-11-10 07:32:50 +00001451 sh := &binarylog.ServerHeader{
William Kurkianea869482019-04-09 15:16:11 -04001452 Header: h,
Abhay Kumara61c5222025-11-10 07:32:50 +00001453 }
1454 for _, binlog := range binlogs {
1455 binlog.Log(ctx, sh)
1456 }
William Kurkianea869482019-04-09 15:16:11 -04001457 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001458 st := &binarylog.ServerTrailer{
William Kurkianea869482019-04-09 15:16:11 -04001459 Trailer: stream.Trailer(),
1460 Err: appErr,
Abhay Kumara61c5222025-11-10 07:32:50 +00001461 }
1462 for _, binlog := range binlogs {
1463 binlog.Log(ctx, st)
1464 }
William Kurkianea869482019-04-09 15:16:11 -04001465 }
1466 return appErr
1467 }
1468 if trInfo != nil {
1469 trInfo.tr.LazyLog(stringer("OK"), false)
1470 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001471 opts := &transport.WriteOptions{Last: true}
William Kurkianea869482019-04-09 15:16:11 -04001472
Abhay Kumara61c5222025-11-10 07:32:50 +00001473 // Server handler could have set new compressor by calling SetSendCompressor.
1474 // In case it is set, we need to use it for compressing outbound message.
1475 if stream.SendCompress() != sendCompressorName {
1476 comp = encoding.GetCompressor(stream.SendCompress())
1477 }
1478 if err := s.sendResponse(ctx, stream, reply, cp, opts, comp); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001479 if err == io.EOF {
1480 // The entire stream is done (for unary RPC only).
1481 return err
1482 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001483 if sts, ok := status.FromError(err); ok {
1484 if e := stream.WriteStatus(sts); e != nil {
1485 channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
William Kurkianea869482019-04-09 15:16:11 -04001486 }
1487 } else {
1488 switch st := err.(type) {
1489 case transport.ConnectionError:
1490 // Nothing to do here.
1491 default:
1492 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1493 }
1494 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001495 if len(binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001496 h, _ := stream.Header()
Abhay Kumara61c5222025-11-10 07:32:50 +00001497 sh := &binarylog.ServerHeader{
William Kurkianea869482019-04-09 15:16:11 -04001498 Header: h,
Abhay Kumara61c5222025-11-10 07:32:50 +00001499 }
1500 st := &binarylog.ServerTrailer{
William Kurkianea869482019-04-09 15:16:11 -04001501 Trailer: stream.Trailer(),
1502 Err: appErr,
Abhay Kumara61c5222025-11-10 07:32:50 +00001503 }
1504 for _, binlog := range binlogs {
1505 binlog.Log(ctx, sh)
1506 binlog.Log(ctx, st)
1507 }
William Kurkianea869482019-04-09 15:16:11 -04001508 }
1509 return err
1510 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001511 if len(binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001512 h, _ := stream.Header()
Abhay Kumara61c5222025-11-10 07:32:50 +00001513 sh := &binarylog.ServerHeader{
William Kurkianea869482019-04-09 15:16:11 -04001514 Header: h,
Abhay Kumara61c5222025-11-10 07:32:50 +00001515 }
1516 sm := &binarylog.ServerMessage{
William Kurkianea869482019-04-09 15:16:11 -04001517 Message: reply,
Abhay Kumara61c5222025-11-10 07:32:50 +00001518 }
1519 for _, binlog := range binlogs {
1520 binlog.Log(ctx, sh)
1521 binlog.Log(ctx, sm)
1522 }
William Kurkianea869482019-04-09 15:16:11 -04001523 }
1524 if trInfo != nil {
1525 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1526 }
1527 // TODO: Should we be logging if writing status failed here, like above?
1528 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1529 // error or allow the stats handler to see it?
Abhay Kumara61c5222025-11-10 07:32:50 +00001530 if len(binlogs) != 0 {
1531 st := &binarylog.ServerTrailer{
William Kurkianea869482019-04-09 15:16:11 -04001532 Trailer: stream.Trailer(),
1533 Err: appErr,
Abhay Kumara61c5222025-11-10 07:32:50 +00001534 }
1535 for _, binlog := range binlogs {
1536 binlog.Log(ctx, st)
1537 }
William Kurkianea869482019-04-09 15:16:11 -04001538 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001539 return stream.WriteStatus(statusOK)
William Kurkianea869482019-04-09 15:16:11 -04001540}
1541
Abhay Kumara61c5222025-11-10 07:32:50 +00001542// chainStreamServerInterceptors chains all stream server interceptors into one.
1543func chainStreamServerInterceptors(s *Server) {
1544 // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1545 // be executed before any other chained interceptors.
1546 interceptors := s.opts.chainStreamInts
1547 if s.opts.streamInt != nil {
1548 interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1549 }
1550
1551 var chainedInt StreamServerInterceptor
1552 if len(interceptors) == 0 {
1553 chainedInt = nil
1554 } else if len(interceptors) == 1 {
1555 chainedInt = interceptors[0]
1556 } else {
1557 chainedInt = chainStreamInterceptors(interceptors)
1558 }
1559
1560 s.opts.streamInt = chainedInt
1561}
1562
1563func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
1564 return func(srv any, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
1565 return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
1566 }
1567}
1568
1569func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
1570 if curr == len(interceptors)-1 {
1571 return finalHandler
1572 }
1573 return func(srv any, stream ServerStream) error {
1574 return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
1575 }
1576}
1577
1578func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
William Kurkianea869482019-04-09 15:16:11 -04001579 if channelz.IsOn() {
1580 s.incrCallsStarted()
William Kurkianea869482019-04-09 15:16:11 -04001581 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001582 shs := s.opts.statsHandlers
1583 var statsBegin *stats.Begin
1584 if len(shs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001585 beginTime := time.Now()
Abhay Kumara61c5222025-11-10 07:32:50 +00001586 statsBegin = &stats.Begin{
1587 BeginTime: beginTime,
1588 IsClientStream: sd.ClientStreams,
1589 IsServerStream: sd.ServerStreams,
William Kurkianea869482019-04-09 15:16:11 -04001590 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001591 for _, sh := range shs {
1592 sh.HandleRPC(ctx, statsBegin)
1593 }
William Kurkianea869482019-04-09 15:16:11 -04001594 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001595 ctx = NewContextWithServerTransportStream(ctx, stream)
William Kurkianea869482019-04-09 15:16:11 -04001596 ss := &serverStream{
1597 ctx: ctx,
William Kurkianea869482019-04-09 15:16:11 -04001598 s: stream,
Abhay Kumara61c5222025-11-10 07:32:50 +00001599 p: &parser{r: stream, bufferPool: s.opts.bufferPool},
William Kurkianea869482019-04-09 15:16:11 -04001600 codec: s.getCodec(stream.ContentSubtype()),
Abhay Kumara61c5222025-11-10 07:32:50 +00001601 desc: sd,
William Kurkianea869482019-04-09 15:16:11 -04001602 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1603 maxSendMessageSize: s.opts.maxSendMessageSize,
1604 trInfo: trInfo,
Abhay Kumara61c5222025-11-10 07:32:50 +00001605 statsHandler: shs,
William Kurkianea869482019-04-09 15:16:11 -04001606 }
1607
Abhay Kumara61c5222025-11-10 07:32:50 +00001608 if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
1609 // See comment in processUnaryRPC on defers.
1610 defer func() {
1611 if trInfo != nil {
1612 ss.mu.Lock()
1613 if err != nil && err != io.EOF {
1614 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1615 ss.trInfo.tr.SetError()
1616 }
1617 ss.trInfo.tr.Finish()
1618 ss.trInfo.tr = nil
1619 ss.mu.Unlock()
1620 }
1621
1622 if len(shs) != 0 {
1623 end := &stats.End{
1624 BeginTime: statsBegin.BeginTime,
1625 EndTime: time.Now(),
1626 }
1627 if err != nil && err != io.EOF {
1628 end.Error = toRPCErr(err)
1629 }
1630 for _, sh := range shs {
1631 sh.HandleRPC(ctx, end)
1632 }
1633 }
1634
1635 if channelz.IsOn() {
1636 if err != nil && err != io.EOF {
1637 s.incrCallsFailed()
1638 } else {
1639 s.incrCallsSucceeded()
1640 }
1641 }
1642 }()
1643 }
1644
1645 if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1646 ss.binlogs = append(ss.binlogs, ml)
1647 }
1648 if s.opts.binaryLogger != nil {
1649 if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1650 ss.binlogs = append(ss.binlogs, ml)
1651 }
1652 }
1653 if len(ss.binlogs) != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001654 md, _ := metadata.FromIncomingContext(ctx)
1655 logEntry := &binarylog.ClientHeader{
1656 Header: md,
1657 MethodName: stream.Method(),
1658 PeerAddr: nil,
1659 }
1660 if deadline, ok := ctx.Deadline(); ok {
1661 logEntry.Timeout = time.Until(deadline)
1662 if logEntry.Timeout < 0 {
1663 logEntry.Timeout = 0
1664 }
1665 }
1666 if a := md[":authority"]; len(a) > 0 {
1667 logEntry.Authority = a[0]
1668 }
1669 if peer, ok := peer.FromContext(ss.Context()); ok {
1670 logEntry.PeerAddr = peer.Addr
1671 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001672 for _, binlog := range ss.binlogs {
1673 binlog.Log(ctx, logEntry)
1674 }
William Kurkianea869482019-04-09 15:16:11 -04001675 }
1676
1677 // If dc is set and matches the stream's compression, use it. Otherwise, try
1678 // to find a matching registered compressor for decomp.
1679 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
Abhay Kumara61c5222025-11-10 07:32:50 +00001680 ss.decompressorV0 = s.opts.dc
William Kurkianea869482019-04-09 15:16:11 -04001681 } else if rc != "" && rc != encoding.Identity {
Abhay Kumara61c5222025-11-10 07:32:50 +00001682 ss.decompressorV1 = encoding.GetCompressor(rc)
1683 if ss.decompressorV1 == nil {
William Kurkianea869482019-04-09 15:16:11 -04001684 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
Abhay Kumara61c5222025-11-10 07:32:50 +00001685 ss.s.WriteStatus(st)
William Kurkianea869482019-04-09 15:16:11 -04001686 return st.Err()
1687 }
1688 }
1689
1690 // If cp is set, use it. Otherwise, attempt to compress the response using
1691 // the incoming message compression method.
1692 //
1693 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1694 if s.opts.cp != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001695 ss.compressorV0 = s.opts.cp
1696 ss.sendCompressorName = s.opts.cp.Type()
William Kurkianea869482019-04-09 15:16:11 -04001697 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1698 // Legacy compressor not specified; attempt to respond with same encoding.
Abhay Kumara61c5222025-11-10 07:32:50 +00001699 ss.compressorV1 = encoding.GetCompressor(rc)
1700 if ss.compressorV1 != nil {
1701 ss.sendCompressorName = rc
William Kurkianea869482019-04-09 15:16:11 -04001702 }
1703 }
1704
Abhay Kumara61c5222025-11-10 07:32:50 +00001705 if ss.sendCompressorName != "" {
1706 if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
1707 return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
1708 }
1709 }
1710
1711 ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.compressorV0, ss.compressorV1)
1712
William Kurkianea869482019-04-09 15:16:11 -04001713 if trInfo != nil {
1714 trInfo.tr.LazyLog(&trInfo.firstLine, false)
William Kurkianea869482019-04-09 15:16:11 -04001715 }
1716 var appErr error
Abhay Kumara61c5222025-11-10 07:32:50 +00001717 var server any
1718 if info != nil {
1719 server = info.serviceImpl
William Kurkianea869482019-04-09 15:16:11 -04001720 }
1721 if s.opts.streamInt == nil {
1722 appErr = sd.Handler(server, ss)
1723 } else {
1724 info := &StreamServerInfo{
1725 FullMethod: stream.Method(),
1726 IsClientStream: sd.ClientStreams,
1727 IsServerStream: sd.ServerStreams,
1728 }
1729 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1730 }
1731 if appErr != nil {
1732 appStatus, ok := status.FromError(appErr)
1733 if !ok {
Abhay Kumara61c5222025-11-10 07:32:50 +00001734 // Convert non-status application error to a status error with code
1735 // Unknown, but handle context errors specifically.
1736 appStatus = status.FromContextError(appErr)
William Kurkianea869482019-04-09 15:16:11 -04001737 appErr = appStatus.Err()
1738 }
1739 if trInfo != nil {
1740 ss.mu.Lock()
1741 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1742 ss.trInfo.tr.SetError()
1743 ss.mu.Unlock()
1744 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001745 if len(ss.binlogs) != 0 {
1746 st := &binarylog.ServerTrailer{
William Kurkianea869482019-04-09 15:16:11 -04001747 Trailer: ss.s.Trailer(),
1748 Err: appErr,
Abhay Kumara61c5222025-11-10 07:32:50 +00001749 }
1750 for _, binlog := range ss.binlogs {
1751 binlog.Log(ctx, st)
1752 }
William Kurkianea869482019-04-09 15:16:11 -04001753 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001754 ss.s.WriteStatus(appStatus)
William Kurkianea869482019-04-09 15:16:11 -04001755 // TODO: Should we log an error from WriteStatus here and below?
1756 return appErr
1757 }
1758 if trInfo != nil {
1759 ss.mu.Lock()
1760 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1761 ss.mu.Unlock()
1762 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001763 if len(ss.binlogs) != 0 {
1764 st := &binarylog.ServerTrailer{
William Kurkianea869482019-04-09 15:16:11 -04001765 Trailer: ss.s.Trailer(),
1766 Err: appErr,
Abhay Kumara61c5222025-11-10 07:32:50 +00001767 }
1768 for _, binlog := range ss.binlogs {
1769 binlog.Log(ctx, st)
1770 }
William Kurkianea869482019-04-09 15:16:11 -04001771 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001772 return ss.s.WriteStatus(statusOK)
William Kurkianea869482019-04-09 15:16:11 -04001773}
1774
Abhay Kumara61c5222025-11-10 07:32:50 +00001775func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream) {
1776 ctx := stream.Context()
1777 ctx = contextWithServer(ctx, s)
1778 var ti *traceInfo
1779 if EnableTracing {
1780 tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
1781 ctx = newTraceContext(ctx, tr)
1782 ti = &traceInfo{
1783 tr: tr,
1784 firstLine: firstLine{
1785 client: false,
1786 remoteAddr: t.Peer().Addr,
1787 },
1788 }
1789 if dl, ok := ctx.Deadline(); ok {
1790 ti.firstLine.deadline = time.Until(dl)
1791 }
1792 }
1793
William Kurkianea869482019-04-09 15:16:11 -04001794 sm := stream.Method()
1795 if sm != "" && sm[0] == '/' {
1796 sm = sm[1:]
1797 }
1798 pos := strings.LastIndex(sm, "/")
1799 if pos == -1 {
Abhay Kumara61c5222025-11-10 07:32:50 +00001800 if ti != nil {
1801 ti.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
1802 ti.tr.SetError()
William Kurkianea869482019-04-09 15:16:11 -04001803 }
1804 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
Abhay Kumara61c5222025-11-10 07:32:50 +00001805 if err := stream.WriteStatus(status.New(codes.Unimplemented, errDesc)); err != nil {
1806 if ti != nil {
1807 ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1808 ti.tr.SetError()
William Kurkianea869482019-04-09 15:16:11 -04001809 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001810 channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)
William Kurkianea869482019-04-09 15:16:11 -04001811 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001812 if ti != nil {
1813 ti.tr.Finish()
William Kurkianea869482019-04-09 15:16:11 -04001814 }
1815 return
1816 }
1817 service := sm[:pos]
1818 method := sm[pos+1:]
1819
Abhay Kumara61c5222025-11-10 07:32:50 +00001820 // FromIncomingContext is expensive: skip if there are no statsHandlers
1821 if len(s.opts.statsHandlers) > 0 {
1822 md, _ := metadata.FromIncomingContext(ctx)
1823 for _, sh := range s.opts.statsHandlers {
1824 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()})
1825 sh.HandleRPC(ctx, &stats.InHeader{
1826 FullMethod: stream.Method(),
1827 RemoteAddr: t.Peer().Addr,
1828 LocalAddr: t.Peer().LocalAddr,
1829 Compression: stream.RecvCompress(),
1830 WireLength: stream.HeaderWireLength(),
1831 Header: md,
1832 })
1833 }
1834 }
1835 // To have calls in stream callouts work. Will delete once all stats handler
1836 // calls come from the gRPC layer.
1837 stream.SetContext(ctx)
1838
1839 srv, knownService := s.services[service]
Abhilash S.L3b494632019-07-16 15:51:09 +05301840 if knownService {
Abhay Kumara61c5222025-11-10 07:32:50 +00001841 if md, ok := srv.methods[method]; ok {
1842 s.processUnaryRPC(ctx, stream, srv, md, ti)
William Kurkianea869482019-04-09 15:16:11 -04001843 return
1844 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001845 if sd, ok := srv.streams[method]; ok {
1846 s.processStreamingRPC(ctx, stream, srv, sd, ti)
William Kurkianea869482019-04-09 15:16:11 -04001847 return
1848 }
1849 }
1850 // Unknown service, or known server unknown method.
1851 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00001852 s.processStreamingRPC(ctx, stream, nil, unknownDesc, ti)
William Kurkianea869482019-04-09 15:16:11 -04001853 return
1854 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301855 var errDesc string
1856 if !knownService {
1857 errDesc = fmt.Sprintf("unknown service %v", service)
1858 } else {
1859 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1860 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001861 if ti != nil {
1862 ti.tr.LazyPrintf("%s", errDesc)
1863 ti.tr.SetError()
William Kurkianea869482019-04-09 15:16:11 -04001864 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001865 if err := stream.WriteStatus(status.New(codes.Unimplemented, errDesc)); err != nil {
1866 if ti != nil {
1867 ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1868 ti.tr.SetError()
William Kurkianea869482019-04-09 15:16:11 -04001869 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001870 channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)
William Kurkianea869482019-04-09 15:16:11 -04001871 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001872 if ti != nil {
1873 ti.tr.Finish()
William Kurkianea869482019-04-09 15:16:11 -04001874 }
1875}
1876
1877// The key to save ServerTransportStream in the context.
1878type streamKey struct{}
1879
1880// NewContextWithServerTransportStream creates a new context from ctx and
1881// attaches stream to it.
1882//
Abhay Kumara61c5222025-11-10 07:32:50 +00001883// # Experimental
1884//
1885// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1886// later release.
William Kurkianea869482019-04-09 15:16:11 -04001887func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1888 return context.WithValue(ctx, streamKey{}, stream)
1889}
1890
1891// ServerTransportStream is a minimal interface that a transport stream must
1892// implement. This can be used to mock an actual transport stream for tests of
1893// handler code that use, for example, grpc.SetHeader (which requires some
1894// stream to be in context).
1895//
1896// See also NewContextWithServerTransportStream.
1897//
Abhay Kumara61c5222025-11-10 07:32:50 +00001898// # Experimental
1899//
1900// Notice: This type is EXPERIMENTAL and may be changed or removed in a
1901// later release.
William Kurkianea869482019-04-09 15:16:11 -04001902type ServerTransportStream interface {
1903 Method() string
1904 SetHeader(md metadata.MD) error
1905 SendHeader(md metadata.MD) error
1906 SetTrailer(md metadata.MD) error
1907}
1908
1909// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1910// ctx. Returns nil if the given context has no stream associated with it
1911// (which implies it is not an RPC invocation context).
1912//
Abhay Kumara61c5222025-11-10 07:32:50 +00001913// # Experimental
1914//
1915// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1916// later release.
William Kurkianea869482019-04-09 15:16:11 -04001917func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1918 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1919 return s
1920}
1921
1922// Stop stops the gRPC server. It immediately closes all open
1923// connections and listeners.
1924// It cancels all active RPCs on the server side and the corresponding
1925// pending RPCs on the client side will get notified by connection
1926// errors.
1927func (s *Server) Stop() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001928 s.stop(false)
William Kurkianea869482019-04-09 15:16:11 -04001929}
1930
1931// GracefulStop stops the gRPC server gracefully. It stops the server from
1932// accepting new connections and RPCs and blocks until all the pending RPCs are
1933// finished.
1934func (s *Server) GracefulStop() {
Abhay Kumara61c5222025-11-10 07:32:50 +00001935 s.stop(true)
1936}
1937
1938func (s *Server) stop(graceful bool) {
David Bainbridge788e5202019-10-21 18:49:40 +00001939 s.quit.Fire()
1940 defer s.done.Fire()
William Kurkianea869482019-04-09 15:16:11 -04001941
Abhay Kumara61c5222025-11-10 07:32:50 +00001942 s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })
William Kurkianea869482019-04-09 15:16:11 -04001943 s.mu.Lock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001944 s.closeListenersLocked()
William Kurkianea869482019-04-09 15:16:11 -04001945 // Wait for serving threads to be ready to exit. Only then can we be sure no
1946 // new conns will be created.
1947 s.mu.Unlock()
1948 s.serveWG.Wait()
Abhay Kumara61c5222025-11-10 07:32:50 +00001949
William Kurkianea869482019-04-09 15:16:11 -04001950 s.mu.Lock()
Abhay Kumara61c5222025-11-10 07:32:50 +00001951 defer s.mu.Unlock()
1952
1953 if graceful {
1954 s.drainAllServerTransportsLocked()
1955 } else {
1956 s.closeServerTransportsLocked()
1957 }
William Kurkianea869482019-04-09 15:16:11 -04001958
1959 for len(s.conns) != 0 {
1960 s.cv.Wait()
1961 }
1962 s.conns = nil
Abhay Kumara61c5222025-11-10 07:32:50 +00001963
1964 if s.opts.numServerWorkers > 0 {
1965 // Closing the channel (only once, via sync.OnceFunc) after all the
1966 // connections have been closed above ensures that there are no
1967 // goroutines executing the callback passed to st.HandleStreams (where
1968 // the channel is written to).
1969 s.serverWorkerChannelClose()
1970 }
1971
1972 if graceful || s.opts.waitForHandlers {
1973 s.handlersWG.Wait()
1974 }
1975
William Kurkianea869482019-04-09 15:16:11 -04001976 if s.events != nil {
1977 s.events.Finish()
1978 s.events = nil
1979 }
Abhay Kumara61c5222025-11-10 07:32:50 +00001980}
1981
1982// s.mu must be held by the caller.
1983func (s *Server) closeServerTransportsLocked() {
1984 for _, conns := range s.conns {
1985 for st := range conns {
1986 st.Close(errors.New("Server.Stop called"))
1987 }
1988 }
1989}
1990
1991// s.mu must be held by the caller.
1992func (s *Server) drainAllServerTransportsLocked() {
1993 if !s.drain {
1994 for _, conns := range s.conns {
1995 for st := range conns {
1996 st.Drain("graceful_stop")
1997 }
1998 }
1999 s.drain = true
2000 }
2001}
2002
2003// s.mu must be held by the caller.
2004func (s *Server) closeListenersLocked() {
2005 for lis := range s.lis {
2006 lis.Close()
2007 }
2008 s.lis = nil
William Kurkianea869482019-04-09 15:16:11 -04002009}
2010
2011// contentSubtype must be lowercase
2012// cannot return nil
2013func (s *Server) getCodec(contentSubtype string) baseCodec {
2014 if s.opts.codec != nil {
2015 return s.opts.codec
2016 }
2017 if contentSubtype == "" {
Abhay Kumara61c5222025-11-10 07:32:50 +00002018 return getCodec(proto.Name)
William Kurkianea869482019-04-09 15:16:11 -04002019 }
Abhay Kumara61c5222025-11-10 07:32:50 +00002020 codec := getCodec(contentSubtype)
William Kurkianea869482019-04-09 15:16:11 -04002021 if codec == nil {
Abhay Kumara61c5222025-11-10 07:32:50 +00002022 logger.Warningf("Unsupported codec %q. Defaulting to %q for now. This will start to fail in future releases.", contentSubtype, proto.Name)
2023 return getCodec(proto.Name)
William Kurkianea869482019-04-09 15:16:11 -04002024 }
2025 return codec
2026}
2027
Abhay Kumara61c5222025-11-10 07:32:50 +00002028type serverKey struct{}
2029
2030// serverFromContext gets the Server from the context.
2031func serverFromContext(ctx context.Context) *Server {
2032 s, _ := ctx.Value(serverKey{}).(*Server)
2033 return s
2034}
2035
2036// contextWithServer sets the Server in the context.
2037func contextWithServer(ctx context.Context, server *Server) context.Context {
2038 return context.WithValue(ctx, serverKey{}, server)
2039}
2040
2041// isRegisteredMethod returns whether the passed in method is registered as a
2042// method on the server. /service/method and service/method will match if the
2043// service and method are registered on the server.
2044func (s *Server) isRegisteredMethod(serviceMethod string) bool {
2045 if serviceMethod != "" && serviceMethod[0] == '/' {
2046 serviceMethod = serviceMethod[1:]
2047 }
2048 pos := strings.LastIndex(serviceMethod, "/")
2049 if pos == -1 { // Invalid method name syntax.
2050 return false
2051 }
2052 service := serviceMethod[:pos]
2053 method := serviceMethod[pos+1:]
2054 srv, knownService := s.services[service]
2055 if knownService {
2056 if _, ok := srv.methods[method]; ok {
2057 return true
2058 }
2059 if _, ok := srv.streams[method]; ok {
2060 return true
2061 }
2062 }
2063 return false
2064}
2065
2066// SetHeader sets the header metadata to be sent from the server to the client.
2067// The context provided must be the context passed to the server's handler.
2068//
2069// Streaming RPCs should prefer the SetHeader method of the ServerStream.
2070//
2071// When called multiple times, all the provided metadata will be merged. All
2072// the metadata will be sent out when one of the following happens:
2073//
2074// - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
2075// - The first response message is sent. For unary handlers, this occurs when
2076// the handler returns; for streaming handlers, this can happen when stream's
2077// SendMsg method is called.
2078// - An RPC status is sent out (error or success). This occurs when the handler
2079// returns.
2080//
2081// SetHeader will fail if called after any of the events above.
2082//
2083// The error returned is compatible with the status package. However, the
2084// status code will often not match the RPC status as seen by the client
2085// application, and therefore, should not be relied upon for this purpose.
William Kurkianea869482019-04-09 15:16:11 -04002086func SetHeader(ctx context.Context, md metadata.MD) error {
2087 if md.Len() == 0 {
2088 return nil
2089 }
2090 stream := ServerTransportStreamFromContext(ctx)
2091 if stream == nil {
2092 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2093 }
2094 return stream.SetHeader(md)
2095}
2096
Abhay Kumara61c5222025-11-10 07:32:50 +00002097// SendHeader sends header metadata. It may be called at most once, and may not
2098// be called after any event that causes headers to be sent (see SetHeader for
2099// a complete list). The provided md and headers set by SetHeader() will be
2100// sent.
2101//
2102// The error returned is compatible with the status package. However, the
2103// status code will often not match the RPC status as seen by the client
2104// application, and therefore, should not be relied upon for this purpose.
William Kurkianea869482019-04-09 15:16:11 -04002105func SendHeader(ctx context.Context, md metadata.MD) error {
2106 stream := ServerTransportStreamFromContext(ctx)
2107 if stream == nil {
2108 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2109 }
2110 if err := stream.SendHeader(md); err != nil {
2111 return toRPCErr(err)
2112 }
2113 return nil
2114}
2115
Abhay Kumara61c5222025-11-10 07:32:50 +00002116// SetSendCompressor sets a compressor for outbound messages from the server.
2117// It must not be called after any event that causes headers to be sent
2118// (see ServerStream.SetHeader for the complete list). Provided compressor is
2119// used when below conditions are met:
2120//
2121// - compressor is registered via encoding.RegisterCompressor
2122// - compressor name must exist in the client advertised compressor names
2123// sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
2124// get client supported compressor names.
2125//
2126// The context provided must be the context passed to the server's handler.
2127// It must be noted that compressor name encoding.Identity disables the
2128// outbound compression.
2129// By default, server messages will be sent using the same compressor with
2130// which request messages were sent.
2131//
2132// It is not safe to call SetSendCompressor concurrently with SendHeader and
2133// SendMsg.
2134//
2135// # Experimental
2136//
2137// Notice: This function is EXPERIMENTAL and may be changed or removed in a
2138// later release.
2139func SetSendCompressor(ctx context.Context, name string) error {
2140 stream, ok := ServerTransportStreamFromContext(ctx).(*transport.ServerStream)
2141 if !ok || stream == nil {
2142 return fmt.Errorf("failed to fetch the stream from the given context")
2143 }
2144
2145 if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
2146 return fmt.Errorf("unable to set send compressor: %w", err)
2147 }
2148
2149 return stream.SetSendCompress(name)
2150}
2151
2152// ClientSupportedCompressors returns compressor names advertised by the client
2153// via grpc-accept-encoding header.
2154//
2155// The context provided must be the context passed to the server's handler.
2156//
2157// # Experimental
2158//
2159// Notice: This function is EXPERIMENTAL and may be changed or removed in a
2160// later release.
2161func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
2162 stream, ok := ServerTransportStreamFromContext(ctx).(*transport.ServerStream)
2163 if !ok || stream == nil {
2164 return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
2165 }
2166
2167 return stream.ClientAdvertisedCompressors(), nil
2168}
2169
William Kurkianea869482019-04-09 15:16:11 -04002170// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
2171// When called more than once, all the provided metadata will be merged.
Abhay Kumara61c5222025-11-10 07:32:50 +00002172//
2173// The error returned is compatible with the status package. However, the
2174// status code will often not match the RPC status as seen by the client
2175// application, and therefore, should not be relied upon for this purpose.
William Kurkianea869482019-04-09 15:16:11 -04002176func SetTrailer(ctx context.Context, md metadata.MD) error {
2177 if md.Len() == 0 {
2178 return nil
2179 }
2180 stream := ServerTransportStreamFromContext(ctx)
2181 if stream == nil {
2182 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2183 }
2184 return stream.SetTrailer(md)
2185}
2186
2187// Method returns the method string for the server context. The returned
2188// string is in the format of "/service/method".
2189func Method(ctx context.Context) (string, bool) {
2190 s := ServerTransportStreamFromContext(ctx)
2191 if s == nil {
2192 return "", false
2193 }
2194 return s.Method(), true
2195}
2196
Abhay Kumara61c5222025-11-10 07:32:50 +00002197// validateSendCompressor returns an error when given compressor name cannot be
2198// handled by the server or the client based on the advertised compressors.
2199func validateSendCompressor(name string, clientCompressors []string) error {
2200 if name == encoding.Identity {
2201 return nil
2202 }
2203
2204 if !grpcutil.IsCompressorNameRegistered(name) {
2205 return fmt.Errorf("compressor not registered %q", name)
2206 }
2207
2208 for _, c := range clientCompressors {
2209 if c == name {
2210 return nil // found match
2211 }
2212 }
2213 return fmt.Errorf("client does not support compressor %q", name)
William Kurkianea869482019-04-09 15:16:11 -04002214}
2215
Abhay Kumara61c5222025-11-10 07:32:50 +00002216// atomicSemaphore implements a blocking, counting semaphore. acquire should be
2217// called synchronously; release may be called asynchronously.
2218type atomicSemaphore struct {
2219 n atomic.Int64
2220 wait chan struct{}
2221}
2222
2223func (q *atomicSemaphore) acquire() {
2224 if q.n.Add(-1) < 0 {
2225 // We ran out of quota. Block until a release happens.
2226 <-q.wait
2227 }
2228}
2229
2230func (q *atomicSemaphore) release() {
2231 // N.B. the "<= 0" check below should allow for this to work with multiple
2232 // concurrent calls to acquire, but also note that with synchronous calls to
2233 // acquire, as our system does, n will never be less than -1. There are
2234 // fairness issues (queuing) to consider if this was to be generalized.
2235 if q.n.Add(1) <= 0 {
2236 // An acquire was waiting on us. Unblock it.
2237 q.wait <- struct{}{}
2238 }
2239}
2240
2241func newHandlerQuota(n uint32) *atomicSemaphore {
2242 a := &atomicSemaphore{wait: make(chan struct{}, 1)}
2243 a.n.Store(int64(n))
2244 return a
William Kurkianea869482019-04-09 15:16:11 -04002245}