blob: 95c0d6d92f9765fc67ffd00ecdad27f0d6478854 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package embed
16
17import (
18 "context"
19 "errors"
20 "fmt"
21 "io"
22 defaultLog "log"
23 "math"
24 "net"
25 "net/http"
26 "net/url"
27 "runtime"
28 "sort"
29 "strconv"
30 "strings"
31 "sync"
32 "time"
33
34 "github.com/soheilhy/cmux"
35 "go.uber.org/zap"
36 "google.golang.org/grpc"
37 "google.golang.org/grpc/credentials/insecure"
38 "google.golang.org/grpc/keepalive"
39
40 "go.etcd.io/etcd/api/v3/version"
41 "go.etcd.io/etcd/client/pkg/v3/transport"
42 "go.etcd.io/etcd/client/pkg/v3/types"
43 "go.etcd.io/etcd/client/v3/credentials"
44 "go.etcd.io/etcd/pkg/v3/debugutil"
45 runtimeutil "go.etcd.io/etcd/pkg/v3/runtime"
46 "go.etcd.io/etcd/server/v3/config"
47 "go.etcd.io/etcd/server/v3/etcdserver"
48 "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
49 "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
50 "go.etcd.io/etcd/server/v3/features"
51 "go.etcd.io/etcd/server/v3/storage"
52 "go.etcd.io/etcd/server/v3/verify"
53)
54
55const (
56 // internal fd usage includes disk usage and transport usage.
57 // To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
58 // at most 2 to read/lock/write WALs. One case that it needs to 2 is to
59 // read all logs after some snapshot index, which locates at the end of
60 // the second last and the head of the last. For purging, it needs to read
61 // directory, so it needs 1. For fd monitor, it needs 1.
62 // For transport, rafthttp builds two long-polling connections and at most
63 // four temporary connections with each member. There are at most 9 members
64 // in a cluster, so it should reserve 96.
65 // For the safety, we set the total reserved number to 150.
66 reservedInternalFDNum = 150
67)
68
69// Etcd contains a running etcd server and its listeners.
70type Etcd struct {
71 Peers []*peerListener
72 Clients []net.Listener
73 // a map of contexts for the servers that serves client requests.
74 sctxs map[string]*serveCtx
75 metricsListeners []net.Listener
76
77 tracingExporterShutdown func()
78
79 Server *etcdserver.EtcdServer
80
81 cfg Config
82
83 // closeOnce is to ensure `stopc` is closed only once, no matter
84 // how many times the Close() method is called.
85 closeOnce sync.Once
86 // stopc is used to notify the sub goroutines not to send
87 // any errors to `errc`.
88 stopc chan struct{}
89 // errc is used to receive error from sub goroutines (including
90 // client handler, peer handler and metrics handler). It's closed
91 // after all these sub goroutines exit (checked via `wg`). Writers
92 // should avoid writing after `stopc` is closed by selecting on
93 // reading from `stopc`.
94 errc chan error
95
96 // wg is used to track the lifecycle of all sub goroutines which
97 // need to send error back to the `errc`.
98 wg sync.WaitGroup
99}
100
101type peerListener struct {
102 net.Listener
103 serve func() error
104 close func(context.Context) error
105}
106
107// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
108// The returned Etcd.Server is not guaranteed to have joined the cluster. Wait
109// on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use.
110func StartEtcd(inCfg *Config) (e *Etcd, err error) {
111 if err = inCfg.Validate(); err != nil {
112 return nil, err
113 }
114 serving := false
115 e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
116 cfg := &e.cfg
117 defer func() {
118 if e == nil || err == nil {
119 return
120 }
121 if !serving {
122 // errored before starting gRPC server for serveCtx.serversC
123 for _, sctx := range e.sctxs {
124 sctx.close()
125 }
126 }
127 e.Close()
128 e = nil
129 }()
130
131 if !cfg.SocketOpts.Empty() {
132 cfg.logger.Info(
133 "configuring socket options",
134 zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),
135 zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),
136 )
137 }
138 e.cfg.logger.Info(
139 "configuring peer listeners",
140 zap.Strings("listen-peer-urls", e.cfg.getListenPeerURLs()),
141 )
142 if e.Peers, err = configurePeerListeners(cfg); err != nil {
143 return e, err
144 }
145
146 e.cfg.logger.Info(
147 "configuring client listeners",
148 zap.Strings("listen-client-urls", e.cfg.getListenClientURLs()),
149 )
150 if e.sctxs, err = configureClientListeners(cfg); err != nil {
151 return e, err
152 }
153
154 for _, sctx := range e.sctxs {
155 e.Clients = append(e.Clients, sctx.l)
156 }
157
158 var (
159 urlsmap types.URLsMap
160 token string
161 )
162 memberInitialized := true
163 if !isMemberInitialized(cfg) {
164 memberInitialized = false
165 urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
166 if err != nil {
167 return e, fmt.Errorf("error setting up initial cluster: %w", err)
168 }
169 }
170
171 // AutoCompactionRetention defaults to "0" if not set.
172 if len(cfg.AutoCompactionRetention) == 0 {
173 cfg.AutoCompactionRetention = "0"
174 }
175 autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
176 if err != nil {
177 return e, err
178 }
179
180 backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
181
182 srvcfg := config.ServerConfig{
183 Name: cfg.Name,
184 ClientURLs: cfg.AdvertiseClientUrls,
185 PeerURLs: cfg.AdvertisePeerUrls,
186 DataDir: cfg.Dir,
187 DedicatedWALDir: cfg.WalDir,
188 SnapshotCount: cfg.SnapshotCount,
189 SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
190 MaxSnapFiles: cfg.MaxSnapFiles,
191 MaxWALFiles: cfg.MaxWalFiles,
192 InitialPeerURLsMap: urlsmap,
193 InitialClusterToken: token,
194 DiscoveryURL: cfg.Durl,
195 DiscoveryProxy: cfg.Dproxy,
196 DiscoveryCfg: cfg.DiscoveryCfg,
197 NewCluster: cfg.IsNewCluster(),
198 PeerTLSInfo: cfg.PeerTLSInfo,
199 TickMs: cfg.TickMs,
200 ElectionTicks: cfg.ElectionTicks(),
201 InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
202 AutoCompactionRetention: autoCompactionRetention,
203 AutoCompactionMode: cfg.AutoCompactionMode,
204 QuotaBackendBytes: cfg.QuotaBackendBytes,
205 BackendBatchLimit: cfg.BackendBatchLimit,
206 BackendFreelistType: backendFreelistType,
207 BackendBatchInterval: cfg.BackendBatchInterval,
208 MaxTxnOps: cfg.MaxTxnOps,
209 MaxRequestBytes: cfg.MaxRequestBytes,
210 MaxConcurrentStreams: cfg.MaxConcurrentStreams,
211 SocketOpts: cfg.SocketOpts,
212 StrictReconfigCheck: cfg.StrictReconfigCheck,
213 ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
214 AuthToken: cfg.AuthToken,
215 BcryptCost: cfg.BcryptCost,
216 TokenTTL: cfg.AuthTokenTTL,
217 CORS: cfg.CORS,
218 HostWhitelist: cfg.HostWhitelist,
219 CorruptCheckTime: cfg.CorruptCheckTime,
220 CompactHashCheckTime: cfg.CompactHashCheckTime,
221 PreVote: cfg.PreVote,
222 Logger: cfg.logger,
223 ForceNewCluster: cfg.ForceNewCluster,
224 EnableGRPCGateway: cfg.EnableGRPCGateway,
225 EnableDistributedTracing: cfg.EnableDistributedTracing,
226 UnsafeNoFsync: cfg.UnsafeNoFsync,
227 CompactionBatchLimit: cfg.CompactionBatchLimit,
228 CompactionSleepInterval: cfg.CompactionSleepInterval,
229 WatchProgressNotifyInterval: cfg.WatchProgressNotifyInterval,
230 DowngradeCheckTime: cfg.DowngradeCheckTime,
231 WarningApplyDuration: cfg.WarningApplyDuration,
232 WarningUnaryRequestDuration: cfg.WarningUnaryRequestDuration,
233 MemoryMlock: cfg.MemoryMlock,
234 BootstrapDefragThresholdMegabytes: cfg.BootstrapDefragThresholdMegabytes,
235 MaxLearners: cfg.MaxLearners,
236 V2Deprecation: cfg.V2DeprecationEffective(),
237 ExperimentalLocalAddress: cfg.InferLocalAddr(),
238 ServerFeatureGate: cfg.ServerFeatureGate,
239 Metrics: cfg.Metrics,
240 }
241
242 if srvcfg.EnableDistributedTracing {
243 tctx := context.Background()
244 tracingExporter, terr := newTracingExporter(tctx, cfg)
245 if terr != nil {
246 return e, terr
247 }
248 e.tracingExporterShutdown = func() {
249 tracingExporter.Close(tctx)
250 }
251 srvcfg.TracerOptions = tracingExporter.opts
252
253 e.cfg.logger.Info(
254 "distributed tracing setup enabled",
255 )
256 }
257
258 srvcfg.PeerTLSInfo.LocalAddr = srvcfg.ExperimentalLocalAddress
259
260 print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
261
262 if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
263 return e, err
264 }
265
266 // buffer channel so goroutines on closed connections won't wait forever
267 e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
268
269 // newly started member ("memberInitialized==false")
270 // does not need corruption check
271 if memberInitialized && srvcfg.ServerFeatureGate.Enabled(features.InitialCorruptCheck) {
272 if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {
273 // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
274 // (nothing to close since rafthttp transports have not been started)
275
276 e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))
277 e.Server.Cleanup()
278 e.Server = nil
279 return e, err
280 }
281 }
282 e.Server.Start()
283
284 e.servePeers()
285
286 e.serveClients()
287
288 if err = e.serveMetrics(); err != nil {
289 return e, err
290 }
291
292 e.cfg.logger.Info(
293 "now serving peer/client/metrics",
294 zap.String("local-member-id", e.Server.MemberID().String()),
295 zap.Strings("initial-advertise-peer-urls", e.cfg.getAdvertisePeerURLs()),
296 zap.Strings("listen-peer-urls", e.cfg.getListenPeerURLs()),
297 zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientURLs()),
298 zap.Strings("listen-client-urls", e.cfg.getListenClientURLs()),
299 zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()),
300 )
301 serving = true
302 return e, nil
303}
304
305func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized bool) {
306 cors := make([]string, 0, len(ec.CORS))
307 for v := range ec.CORS {
308 cors = append(cors, v)
309 }
310 sort.Strings(cors)
311
312 hss := make([]string, 0, len(ec.HostWhitelist))
313 for v := range ec.HostWhitelist {
314 hss = append(hss, v)
315 }
316 sort.Strings(hss)
317
318 quota := ec.QuotaBackendBytes
319 if quota == 0 {
320 quota = storage.DefaultQuotaBytes
321 }
322
323 lg.Info(
324 "starting an etcd server",
325 zap.String("etcd-version", version.Version),
326 zap.String("git-sha", version.GitSHA),
327 zap.String("go-version", runtime.Version()),
328 zap.String("go-os", runtime.GOOS),
329 zap.String("go-arch", runtime.GOARCH),
330 zap.Int("max-cpu-set", runtime.GOMAXPROCS(0)),
331 zap.Int("max-cpu-available", runtime.NumCPU()),
332 zap.Bool("member-initialized", memberInitialized),
333 zap.String("name", sc.Name),
334 zap.String("data-dir", sc.DataDir),
335 zap.String("wal-dir", ec.WalDir),
336 zap.String("wal-dir-dedicated", sc.DedicatedWALDir),
337 zap.String("member-dir", sc.MemberDir()),
338 zap.Bool("force-new-cluster", sc.ForceNewCluster),
339 zap.String("heartbeat-interval", fmt.Sprintf("%v", time.Duration(sc.TickMs)*time.Millisecond)),
340 zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(sc.ElectionTicks*int(sc.TickMs))*time.Millisecond)),
341 zap.Bool("initial-election-tick-advance", sc.InitialElectionTickAdvance),
342 zap.Uint64("snapshot-count", sc.SnapshotCount),
343 zap.Uint("max-wals", sc.MaxWALFiles),
344 zap.Uint("max-snapshots", sc.MaxSnapFiles),
345 zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries),
346 zap.Strings("initial-advertise-peer-urls", ec.getAdvertisePeerURLs()),
347 zap.Strings("listen-peer-urls", ec.getListenPeerURLs()),
348 zap.Strings("advertise-client-urls", ec.getAdvertiseClientURLs()),
349 zap.Strings("listen-client-urls", ec.getListenClientURLs()),
350 zap.Strings("listen-metrics-urls", ec.getMetricsURLs()),
351 zap.String("experimental-local-address", sc.ExperimentalLocalAddress),
352 zap.Strings("cors", cors),
353 zap.Strings("host-whitelist", hss),
354 zap.String("initial-cluster", sc.InitialPeerURLsMap.String()),
355 zap.String("initial-cluster-state", ec.ClusterState),
356 zap.String("initial-cluster-token", sc.InitialClusterToken),
357 zap.Int64("quota-backend-bytes", quota),
358 zap.Uint("max-request-bytes", sc.MaxRequestBytes),
359 zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams),
360
361 zap.Bool("pre-vote", sc.PreVote),
362 zap.String(ServerFeatureGateFlagName, sc.ServerFeatureGate.String()),
363 zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
364 zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
365 zap.Duration("compact-check-time-interval", sc.CompactHashCheckTime),
366 zap.String("auto-compaction-mode", sc.AutoCompactionMode),
367 zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention),
368 zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
369 zap.String("discovery-url", sc.DiscoveryURL),
370 zap.String("discovery-proxy", sc.DiscoveryProxy),
371
372 zap.String("discovery-token", sc.DiscoveryCfg.Token),
373 zap.String("discovery-endpoints", strings.Join(sc.DiscoveryCfg.Endpoints, ",")),
374 zap.String("discovery-dial-timeout", sc.DiscoveryCfg.DialTimeout.String()),
375 zap.String("discovery-request-timeout", sc.DiscoveryCfg.RequestTimeout.String()),
376 zap.String("discovery-keepalive-time", sc.DiscoveryCfg.KeepAliveTime.String()),
377 zap.String("discovery-keepalive-timeout", sc.DiscoveryCfg.KeepAliveTimeout.String()),
378 zap.Bool("discovery-insecure-transport", sc.DiscoveryCfg.Secure.InsecureTransport),
379 zap.Bool("discovery-insecure-skip-tls-verify", sc.DiscoveryCfg.Secure.InsecureSkipVerify),
380 zap.String("discovery-cert", sc.DiscoveryCfg.Secure.Cert),
381 zap.String("discovery-key", sc.DiscoveryCfg.Secure.Key),
382 zap.String("discovery-cacert", sc.DiscoveryCfg.Secure.Cacert),
383 zap.String("discovery-user", sc.DiscoveryCfg.Auth.Username),
384
385 zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()),
386 zap.Int("max-learners", sc.MaxLearners),
387
388 zap.String("v2-deprecation", string(ec.V2Deprecation)),
389 )
390}
391
392// Config returns the current configuration.
393func (e *Etcd) Config() Config {
394 return e.cfg
395}
396
397// Close gracefully shuts down all servers/listeners.
398// Client requests will be terminated with request timeout.
399// After timeout, enforce remaning requests be closed immediately.
400//
401// The rough workflow to shut down etcd:
402// 1. close the `stopc` channel, so that all error handlers (child
403// goroutines) won't send back any errors anymore;
404// 2. stop the http and grpc servers gracefully, within request timeout;
405// 3. close all client and metrics listeners, so that etcd server
406// stops receiving any new connection;
407// 4. call the cancel function to close the gateway context, so that
408// all gateway connections are closed.
409// 5. stop etcd server gracefully, and ensure the main raft loop
410// goroutine is stopped;
411// 6. stop all peer listeners, so that it stops receiving peer connections
412// and messages (wait up to 1-second);
413// 7. wait for all child goroutines (i.e. client handlers, peer handlers
414// and metrics handlers) to exit;
415// 8. close the `errc` channel to release the resource. Note that it's only
416// safe to close the `errc` after step 7 above is done, otherwise the
417// child goroutines may send errors back to already closed `errc` channel.
418func (e *Etcd) Close() {
419 fields := []zap.Field{
420 zap.String("name", e.cfg.Name),
421 zap.String("data-dir", e.cfg.Dir),
422 zap.Strings("advertise-peer-urls", e.cfg.getAdvertisePeerURLs()),
423 zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientURLs()),
424 }
425 lg := e.GetLogger()
426 lg.Info("closing etcd server", fields...)
427 defer func() {
428 lg.Info("closed etcd server", fields...)
429 verify.MustVerifyIfEnabled(verify.Config{
430 Logger: lg,
431 DataDir: e.cfg.Dir,
432 ExactIndex: false,
433 })
434 lg.Sync()
435 }()
436
437 e.closeOnce.Do(func() {
438 close(e.stopc)
439 })
440
441 // close client requests with request timeout
442 timeout := 2 * time.Second
443 if e.Server != nil {
444 timeout = e.Server.Cfg.ReqTimeout()
445 }
446 for _, sctx := range e.sctxs {
447 for ss := range sctx.serversC {
448 ctx, cancel := context.WithTimeout(context.Background(), timeout)
449 stopServers(ctx, ss)
450 cancel()
451 }
452 }
453
454 for _, sctx := range e.sctxs {
455 sctx.cancel()
456 }
457
458 for i := range e.Clients {
459 if e.Clients[i] != nil {
460 e.Clients[i].Close()
461 }
462 }
463
464 for i := range e.metricsListeners {
465 e.metricsListeners[i].Close()
466 }
467
468 // shutdown tracing exporter
469 if e.tracingExporterShutdown != nil {
470 e.tracingExporterShutdown()
471 }
472
473 // close rafthttp transports
474 if e.Server != nil {
475 e.Server.Stop()
476 }
477
478 // close all idle connections in peer handler (wait up to 1-second)
479 for i := range e.Peers {
480 if e.Peers[i] != nil && e.Peers[i].close != nil {
481 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
482 e.Peers[i].close(ctx)
483 cancel()
484 }
485 }
486 if e.errc != nil {
487 e.wg.Wait()
488 close(e.errc)
489 }
490}
491
492func stopServers(ctx context.Context, ss *servers) {
493 // first, close the http.Server
494 if ss.http != nil {
495 ss.http.Shutdown(ctx)
496 }
497 if ss.grpc == nil {
498 return
499 }
500 // do not grpc.Server.GracefulStop when grpc runs under http server
501 // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
502 // and https://github.com/etcd-io/etcd/issues/8916
503 if ss.secure && ss.http != nil {
504 ss.grpc.Stop()
505 return
506 }
507
508 ch := make(chan struct{})
509 go func() {
510 defer close(ch)
511 // close listeners to stop accepting new connections,
512 // will block on any existing transports
513 ss.grpc.GracefulStop()
514 }()
515
516 // wait until all pending RPCs are finished
517 select {
518 case <-ch:
519 case <-ctx.Done():
520 // took too long, manually close open transports
521 // e.g. watch streams
522 ss.grpc.Stop()
523
524 // concurrent GracefulStop should be interrupted
525 <-ch
526 }
527}
528
529// Err - return channel used to report errors during etcd run/shutdown.
530// Since etcd 3.5 the channel is being closed when the etcd is over.
531func (e *Etcd) Err() <-chan error {
532 return e.errc
533}
534
535func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
536 if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
537 return nil, err
538 }
539 if err = cfg.PeerSelfCert(); err != nil {
540 cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err))
541 }
542 updateMinMaxVersions(&cfg.PeerTLSInfo, cfg.TlsMinVersion, cfg.TlsMaxVersion)
543 if !cfg.PeerTLSInfo.Empty() {
544 cfg.logger.Info(
545 "starting with peer TLS",
546 zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)),
547 zap.Strings("cipher-suites", cfg.CipherSuites),
548 )
549 }
550
551 peers = make([]*peerListener, len(cfg.ListenPeerUrls))
552 defer func() {
553 if err == nil {
554 return
555 }
556 for i := range peers {
557 if peers[i] != nil && peers[i].close != nil {
558 cfg.logger.Warn(
559 "closing peer listener",
560 zap.String("address", cfg.ListenPeerUrls[i].String()),
561 zap.Error(err),
562 )
563 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
564 peers[i].close(ctx)
565 cancel()
566 }
567 }
568 }()
569
570 for i, u := range cfg.ListenPeerUrls {
571 if u.Scheme == "http" {
572 if !cfg.PeerTLSInfo.Empty() {
573 cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String()))
574 }
575 if cfg.PeerTLSInfo.ClientCertAuth {
576 cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String()))
577 }
578 }
579 peers[i] = &peerListener{close: func(context.Context) error { return nil }}
580 peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
581 transport.WithTLSInfo(&cfg.PeerTLSInfo),
582 transport.WithSocketOpts(&cfg.SocketOpts),
583 transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
584 )
585 if err != nil {
586 cfg.logger.Error("creating peer listener failed", zap.Error(err))
587 return nil, err
588 }
589 // once serve, overwrite with 'http.Server.Shutdown'
590 peers[i].close = func(context.Context) error {
591 return peers[i].Listener.Close()
592 }
593 }
594 return peers, nil
595}
596
597// configure peer handlers after rafthttp.Transport started
598func (e *Etcd) servePeers() {
599 ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
600
601 for _, p := range e.Peers {
602 u := p.Listener.Addr().String()
603 m := cmux.New(p.Listener)
604 srv := &http.Server{
605 Handler: ph,
606 ReadTimeout: 5 * time.Minute,
607 ErrorLog: defaultLog.New(io.Discard, "", 0), // do not log user error
608 }
609 go srv.Serve(m.Match(cmux.Any()))
610 p.serve = func() error {
611 e.cfg.logger.Info(
612 "cmux::serve",
613 zap.String("address", u),
614 )
615 return m.Serve()
616 }
617 p.close = func(ctx context.Context) error {
618 // gracefully shutdown http.Server
619 // close open listeners, idle connections
620 // until context cancel or time-out
621 e.cfg.logger.Info(
622 "stopping serving peer traffic",
623 zap.String("address", u),
624 )
625 srv.Shutdown(ctx)
626 e.cfg.logger.Info(
627 "stopped serving peer traffic",
628 zap.String("address", u),
629 )
630 m.Close()
631 return nil
632 }
633 }
634
635 // start peer servers in a goroutine
636 for _, pl := range e.Peers {
637 l := pl
638 e.startHandler(func() error {
639 u := l.Addr().String()
640 e.cfg.logger.Info(
641 "serving peer traffic",
642 zap.String("address", u),
643 )
644 return l.serve()
645 })
646 }
647}
648
649func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
650 if err = updateCipherSuites(&cfg.ClientTLSInfo, cfg.CipherSuites); err != nil {
651 return nil, err
652 }
653 if err = cfg.ClientSelfCert(); err != nil {
654 cfg.logger.Fatal("failed to get client self-signed certs", zap.Error(err))
655 }
656 updateMinMaxVersions(&cfg.ClientTLSInfo, cfg.TlsMinVersion, cfg.TlsMaxVersion)
657 if cfg.EnablePprof {
658 cfg.logger.Info("pprof is enabled", zap.String("path", debugutil.HTTPPrefixPProf))
659 }
660
661 sctxs = make(map[string]*serveCtx)
662 for _, u := range append(cfg.ListenClientUrls, cfg.ListenClientHttpUrls...) {
663 if u.Scheme == "http" || u.Scheme == "unix" {
664 if !cfg.ClientTLSInfo.Empty() {
665 cfg.logger.Warn("scheme is http or unix while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String()))
666 }
667 if cfg.ClientTLSInfo.ClientCertAuth {
668 cfg.logger.Warn("scheme is http or unix while --client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("client-url", u.String()))
669 }
670 }
671 if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() {
672 return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String())
673 }
674 }
675
676 for _, u := range cfg.ListenClientUrls {
677 addr, secure, network := resolveURL(u)
678 sctx := sctxs[addr]
679 if sctx == nil {
680 sctx = newServeCtx(cfg.logger)
681 sctxs[addr] = sctx
682 }
683 sctx.secure = sctx.secure || secure
684 sctx.insecure = sctx.insecure || !secure
685 sctx.scheme = u.Scheme
686 sctx.addr = addr
687 sctx.network = network
688 }
689 for _, u := range cfg.ListenClientHttpUrls {
690 addr, secure, network := resolveURL(u)
691
692 sctx := sctxs[addr]
693 if sctx == nil {
694 sctx = newServeCtx(cfg.logger)
695 sctxs[addr] = sctx
696 } else if !sctx.httpOnly {
697 return nil, fmt.Errorf("cannot bind both --listen-client-urls and --listen-client-http-urls on the same url %s", u.String())
698 }
699 sctx.secure = sctx.secure || secure
700 sctx.insecure = sctx.insecure || !secure
701 sctx.scheme = u.Scheme
702 sctx.addr = addr
703 sctx.network = network
704 sctx.httpOnly = true
705 }
706
707 for _, sctx := range sctxs {
708 if sctx.l, err = transport.NewListenerWithOpts(sctx.addr, sctx.scheme,
709 transport.WithSocketOpts(&cfg.SocketOpts),
710 transport.WithSkipTLSInfoCheck(true),
711 ); err != nil {
712 return nil, err
713 }
714 // net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
715 // hosts that disable ipv6. So, use the address given by the user.
716
717 if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
718 if fdLimit <= reservedInternalFDNum {
719 cfg.logger.Fatal(
720 "file descriptor limit of etcd process is too low; please set higher",
721 zap.Uint64("limit", fdLimit),
722 zap.Int("recommended-limit", reservedInternalFDNum),
723 )
724 }
725 sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
726 }
727
728 defer func(sctx *serveCtx) {
729 if err == nil || sctx.l == nil {
730 return
731 }
732 sctx.l.Close()
733 cfg.logger.Warn(
734 "closing peer listener",
735 zap.String("address", sctx.addr),
736 zap.Error(err),
737 )
738 }(sctx)
739 for k := range cfg.UserHandlers {
740 sctx.userHandlers[k] = cfg.UserHandlers[k]
741 }
742 sctx.serviceRegister = cfg.ServiceRegister
743 if cfg.EnablePprof || cfg.LogLevel == "debug" {
744 sctx.registerPprof()
745 }
746 if cfg.LogLevel == "debug" {
747 sctx.registerTrace()
748 }
749 }
750 return sctxs, nil
751}
752
753func resolveURL(u url.URL) (addr string, secure bool, network string) {
754 addr = u.Host
755 network = "tcp"
756 if u.Scheme == "unix" || u.Scheme == "unixs" {
757 addr = u.Host + u.Path
758 network = "unix"
759 }
760 secure = u.Scheme == "https" || u.Scheme == "unixs"
761 return addr, secure, network
762}
763
764func (e *Etcd) serveClients() {
765 if !e.cfg.ClientTLSInfo.Empty() {
766 e.cfg.logger.Info(
767 "starting with client TLS",
768 zap.String("tls-info", fmt.Sprintf("%+v", e.cfg.ClientTLSInfo)),
769 zap.Strings("cipher-suites", e.cfg.CipherSuites),
770 )
771 }
772
773 // Start a client server goroutine for each listen address
774 mux := http.NewServeMux()
775 etcdhttp.HandleDebug(mux)
776 etcdhttp.HandleVersion(mux, e.Server)
777 etcdhttp.HandleMetrics(mux)
778 etcdhttp.HandleHealth(e.cfg.logger, mux, e.Server)
779
780 var gopts []grpc.ServerOption
781 if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
782 gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
783 MinTime: e.cfg.GRPCKeepAliveMinTime,
784 PermitWithoutStream: false,
785 }))
786 }
787 if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
788 e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
789 gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
790 Time: e.cfg.GRPCKeepAliveInterval,
791 Timeout: e.cfg.GRPCKeepAliveTimeout,
792 }))
793 }
794 gopts = append(gopts, e.cfg.GRPCAdditionalServerOptions...)
795
796 splitHTTP := false
797 for _, sctx := range e.sctxs {
798 if sctx.httpOnly {
799 splitHTTP = true
800 }
801 }
802
803 // start client servers in each goroutine
804 for _, sctx := range e.sctxs {
805 s := sctx
806 e.startHandler(func() error {
807 return s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...)
808 })
809 }
810}
811
812func (e *Etcd) grpcGatewayDial(splitHTTP bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) {
813 if !e.cfg.EnableGRPCGateway {
814 return nil
815 }
816 sctx := e.pickGRPCGatewayServeContext(splitHTTP)
817 addr := sctx.addr
818 if network := sctx.network; network == "unix" {
819 // explicitly define unix network for gRPC socket support
820 addr = fmt.Sprintf("%s:%s", network, addr)
821 }
822 opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32))}
823 if sctx.secure {
824 tlscfg, tlsErr := e.cfg.ClientTLSInfo.ServerConfig()
825 if tlsErr != nil {
826 return func(ctx context.Context) (*grpc.ClientConn, error) {
827 return nil, tlsErr
828 }
829 }
830 dtls := tlscfg.Clone()
831 // trust local server
832 dtls.InsecureSkipVerify = true
833 opts = append(opts, grpc.WithTransportCredentials(credentials.NewTransportCredential(dtls)))
834 } else {
835 opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
836 }
837
838 return func(ctx context.Context) (*grpc.ClientConn, error) {
839 conn, err := grpc.DialContext(ctx, addr, opts...)
840 if err != nil {
841 sctx.lg.Error("grpc gateway failed to dial", zap.String("addr", addr), zap.Error(err))
842 return nil, err
843 }
844 return conn, err
845 }
846}
847
848func (e *Etcd) pickGRPCGatewayServeContext(splitHTTP bool) *serveCtx {
849 for _, sctx := range e.sctxs {
850 if !splitHTTP || !sctx.httpOnly {
851 return sctx
852 }
853 }
854 panic("Expect at least one context able to serve grpc")
855}
856
857var ErrMissingClientTLSInfoForMetricsURL = errors.New("client TLS key/cert (--cert-file, --key-file) must be provided for metrics secure url")
858
859func (e *Etcd) createMetricsListener(murl url.URL) (net.Listener, error) {
860 tlsInfo := &e.cfg.ClientTLSInfo
861 switch murl.Scheme {
862 case "http":
863 tlsInfo = nil
864 case "https", "unixs":
865 if e.cfg.ClientTLSInfo.Empty() {
866 return nil, ErrMissingClientTLSInfoForMetricsURL
867 }
868 }
869 return transport.NewListenerWithOpts(murl.Host, murl.Scheme,
870 transport.WithTLSInfo(tlsInfo),
871 transport.WithSocketOpts(&e.cfg.SocketOpts),
872 )
873}
874
875func (e *Etcd) serveMetrics() (err error) {
876 if len(e.cfg.ListenMetricsUrls) > 0 {
877 metricsMux := http.NewServeMux()
878 etcdhttp.HandleMetrics(metricsMux)
879 etcdhttp.HandleHealth(e.cfg.logger, metricsMux, e.Server)
880
881 for _, murl := range e.cfg.ListenMetricsUrls {
882 u := murl
883 ml, err := e.createMetricsListener(murl)
884 if err != nil {
885 return err
886 }
887 e.metricsListeners = append(e.metricsListeners, ml)
888
889 e.startHandler(func() error {
890 e.cfg.logger.Info(
891 "serving metrics",
892 zap.String("address", u.String()),
893 )
894 return http.Serve(ml, metricsMux)
895 })
896 }
897 }
898 return nil
899}
900
901func (e *Etcd) startHandler(handler func() error) {
902 // start each handler in a separate goroutine
903 e.wg.Add(1)
904 go func() {
905 defer e.wg.Done()
906 e.errHandler(handler())
907 }()
908}
909
910func (e *Etcd) errHandler(err error) {
911 if err != nil {
912 e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err))
913 }
914 select {
915 case <-e.stopc:
916 return
917 default:
918 }
919 select {
920 case <-e.stopc:
921 case e.errc <- err:
922 }
923}
924
925// GetLogger returns the logger.
926func (e *Etcd) GetLogger() *zap.Logger {
927 e.cfg.loggerMu.RLock()
928 l := e.cfg.logger
929 e.cfg.loggerMu.RUnlock()
930 return l
931}
932
933func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
934 h, err := strconv.Atoi(retention)
935 if err == nil && h >= 0 {
936 switch mode {
937 case CompactorModeRevision:
938 ret = time.Duration(int64(h))
939 case CompactorModePeriodic:
940 ret = time.Duration(int64(h)) * time.Hour
941 case "":
942 return 0, errors.New("--auto-compaction-mode is undefined")
943 }
944 } else {
945 // periodic compaction
946 ret, err = time.ParseDuration(retention)
947 if err != nil {
948 return 0, fmt.Errorf("error parsing CompactionRetention: %w", err)
949 }
950 }
951 return ret, nil
952}