blob: 0eb16b7d3c2c0842afb54262be336c401ef01dc6 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "context"
19 "encoding/json"
20 errorspkg "errors"
21 "expvar"
22 "fmt"
23 "math"
24 "net/http"
25 "path"
26 "reflect"
27 "regexp"
28 "strconv"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "github.com/coreos/go-semver/semver"
34 humanize "github.com/dustin/go-humanize"
35 "github.com/prometheus/client_golang/prometheus"
36 "go.uber.org/zap"
37
38 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
39 "go.etcd.io/etcd/api/v3/membershippb"
40 "go.etcd.io/etcd/api/v3/version"
41 "go.etcd.io/etcd/client/pkg/v3/fileutil"
42 "go.etcd.io/etcd/client/pkg/v3/types"
43 "go.etcd.io/etcd/client/pkg/v3/verify"
44 "go.etcd.io/etcd/pkg/v3/featuregate"
45 "go.etcd.io/etcd/pkg/v3/idutil"
46 "go.etcd.io/etcd/pkg/v3/notify"
47 "go.etcd.io/etcd/pkg/v3/pbutil"
48 "go.etcd.io/etcd/pkg/v3/runtime"
49 "go.etcd.io/etcd/pkg/v3/schedule"
50 "go.etcd.io/etcd/pkg/v3/traceutil"
51 "go.etcd.io/etcd/pkg/v3/wait"
52 "go.etcd.io/etcd/server/v3/auth"
53 "go.etcd.io/etcd/server/v3/config"
54 "go.etcd.io/etcd/server/v3/etcdserver/api"
55 httptypes "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp/types"
56 "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
57 "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
58 "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
59 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
60 "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
61 "go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
62 "go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor"
63 "go.etcd.io/etcd/server/v3/etcdserver/apply"
64 "go.etcd.io/etcd/server/v3/etcdserver/cindex"
65 "go.etcd.io/etcd/server/v3/etcdserver/errors"
66 "go.etcd.io/etcd/server/v3/etcdserver/txn"
67 serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
68 "go.etcd.io/etcd/server/v3/features"
69 "go.etcd.io/etcd/server/v3/lease"
70 "go.etcd.io/etcd/server/v3/lease/leasehttp"
71 serverstorage "go.etcd.io/etcd/server/v3/storage"
72 "go.etcd.io/etcd/server/v3/storage/backend"
73 "go.etcd.io/etcd/server/v3/storage/mvcc"
74 "go.etcd.io/etcd/server/v3/storage/schema"
75 "go.etcd.io/raft/v3"
76 "go.etcd.io/raft/v3/raftpb"
77)
78
79const (
80 DefaultSnapshotCount = 10000
81
82 // DefaultSnapshotCatchUpEntries is the number of entries for a slow follower
83 // to catch-up after compacting the raft storage entries.
84 // We expect the follower has a millisecond level latency with the leader.
85 // The max throughput is around 10K. Keep a 5K entries is enough for helping
86 // follower to catch up.
87 DefaultSnapshotCatchUpEntries uint64 = 5000
88
89 StoreClusterPrefix = "/0"
90 StoreKeysPrefix = "/1"
91
92 // HealthInterval is the minimum time the cluster should be healthy
93 // before accepting add and delete member requests.
94 HealthInterval = 5 * time.Second
95
96 purgeFileInterval = 30 * time.Second
97
98 // max number of in-flight snapshot messages etcdserver allows to have
99 // This number is more than enough for most clusters with 5 machines.
100 maxInFlightMsgSnap = 16
101
102 releaseDelayAfterSnapshot = 30 * time.Second
103
104 // maxPendingRevokes is the maximum number of outstanding expired lease revocations.
105 maxPendingRevokes = 16
106
107 recommendedMaxRequestBytes = 10 * 1024 * 1024
108
109 // readyPercentThreshold is a threshold used to determine
110 // whether a learner is ready for a transition into a full voting member or not.
111 readyPercentThreshold = 0.9
112
113 DowngradeEnabledPath = "/downgrade/enabled"
114 memorySnapshotCount = 100
115)
116
117var (
118 // monitorVersionInterval should be smaller than the timeout
119 // on the connection. Or we will not be able to reuse the connection
120 // (since it will timeout).
121 monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
122
123 recommendedMaxRequestBytesString = humanize.Bytes(uint64(recommendedMaxRequestBytes))
124 storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
125)
126
127func init() {
128 expvar.Publish(
129 "file_descriptor_limit",
130 expvar.Func(
131 func() any {
132 n, _ := runtime.FDLimit()
133 return n
134 },
135 ),
136 )
137}
138
139type Response struct {
140 Term uint64
141 Index uint64
142 Event *v2store.Event
143 Watcher v2store.Watcher
144 Err error
145}
146
147type ServerV2 interface {
148 Server
149 Leader() types.ID
150
151 ClientCertAuthEnabled() bool
152}
153
154type ServerV3 interface {
155 Server
156 apply.RaftStatusGetter
157}
158
159func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled }
160
161type Server interface {
162 // AddMember attempts to add a member into the cluster. It will return
163 // ErrIDRemoved if member ID is removed from the cluster, or return
164 // ErrIDExists if member ID exists in the cluster.
165 AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
166 // RemoveMember attempts to remove a member from the cluster. It will
167 // return ErrIDRemoved if member ID is removed from the cluster, or return
168 // ErrIDNotFound if member ID is not in the cluster.
169 RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
170 // UpdateMember attempts to update an existing member in the cluster. It will
171 // return ErrIDNotFound if the member ID does not exist.
172 UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
173 // PromoteMember attempts to promote a non-voting node to a voting node. It will
174 // return ErrIDNotFound if the member ID does not exist.
175 // return ErrLearnerNotReady if the member are not ready.
176 // return ErrMemberNotLearner if the member is not a learner.
177 PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
178
179 // ClusterVersion is the cluster-wide minimum major.minor version.
180 // Cluster version is set to the min version that an etcd member is
181 // compatible with when first bootstrap.
182 //
183 // ClusterVersion is nil until the cluster is bootstrapped (has a quorum).
184 //
185 // During a rolling upgrades, the ClusterVersion will be updated
186 // automatically after a sync. (5 second by default)
187 //
188 // The API/raft component can utilize ClusterVersion to determine if
189 // it can accept a client request or a raft RPC.
190 // NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and
191 // the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since
192 // this feature is introduced post 2.0.
193 ClusterVersion() *semver.Version
194 // StorageVersion is the storage schema version. It's supported starting
195 // from 3.6.
196 StorageVersion() *semver.Version
197 Cluster() api.Cluster
198 Alarms() []*pb.AlarmMember
199
200 // LeaderChangedNotify returns a channel for application level code to be notified
201 // when etcd leader changes, this function is intend to be used only in application
202 // which embed etcd.
203 // Caution:
204 // 1. the returned channel is being closed when the leadership changes.
205 // 2. so the new channel needs to be obtained for each raft term.
206 // 3. user can loose some consecutive channel changes using this API.
207 LeaderChangedNotify() <-chan struct{}
208}
209
210// EtcdServer is the production implementation of the Server interface
211type EtcdServer struct {
212 // inflightSnapshots holds count the number of snapshots currently inflight.
213 inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned.
214 appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
215 committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
216 term uint64 // must use atomic operations to access; keep 64-bit aligned.
217 lead uint64 // must use atomic operations to access; keep 64-bit aligned.
218
219 consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
220 r raftNode // uses 64-bit atomics; keep 64-bit aligned.
221
222 readych chan struct{}
223 Cfg config.ServerConfig
224
225 lgMu *sync.RWMutex
226 lg *zap.Logger
227
228 w wait.Wait
229
230 readMu sync.RWMutex
231 // read routine notifies etcd server that it waits for reading by sending an empty struct to
232 // readwaitC
233 readwaitc chan struct{}
234 // readNotifier is used to notify the read routine that it can process the request
235 // when there is no error
236 readNotifier *notifier
237
238 // stop signals the run goroutine should shutdown.
239 stop chan struct{}
240 // stopping is closed by run goroutine on shutdown.
241 stopping chan struct{}
242 // done is closed when all goroutines from start() complete.
243 done chan struct{}
244 // leaderChanged is used to notify the linearizable read loop to drop the old read requests.
245 leaderChanged *notify.Notifier
246
247 errorc chan error
248 memberID types.ID
249 attributes membership.Attributes
250
251 cluster *membership.RaftCluster
252
253 v2store v2store.Store
254 snapshotter *snap.Snapshotter
255
256 uberApply apply.UberApplier
257
258 applyWait wait.WaitTime
259
260 kv mvcc.WatchableKV
261 lessor lease.Lessor
262 bemu sync.RWMutex
263 be backend.Backend
264 beHooks *serverstorage.BackendHooks
265 authStore auth.AuthStore
266 alarmStore *v3alarm.AlarmStore
267
268 stats *stats.ServerStats
269 lstats *stats.LeaderStats
270
271 SyncTicker *time.Ticker
272 // compactor is used to auto-compact the KV.
273 compactor v3compactor.Compactor
274
275 // peerRt used to send requests (version, lease) to peers.
276 peerRt http.RoundTripper
277 reqIDGen *idutil.Generator
278
279 // wgMu blocks concurrent waitgroup mutation while server stopping
280 wgMu sync.RWMutex
281 // wg is used to wait for the goroutines that depends on the server state
282 // to exit when stopping the server.
283 wg sync.WaitGroup
284
285 // ctx is used for etcd-initiated requests that may need to be canceled
286 // on etcd server shutdown.
287 ctx context.Context
288 cancel context.CancelFunc
289
290 leadTimeMu sync.RWMutex
291 leadElectedTime time.Time
292
293 firstCommitInTerm *notify.Notifier
294 clusterVersionChanged *notify.Notifier
295
296 *AccessController
297 // forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
298 // Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
299 // TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
300 forceDiskSnapshot bool
301 corruptionChecker CorruptionChecker
302}
303
304// NewServer creates a new EtcdServer from the supplied configuration. The
305// configuration is considered static for the lifetime of the EtcdServer.
306func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
307 b, err := bootstrap(cfg)
308 if err != nil {
309 cfg.Logger.Error("bootstrap failed", zap.Error(err))
310 return nil, err
311 }
312 cfg.Logger.Info("bootstrap successfully")
313
314 defer func() {
315 if err != nil {
316 b.Close()
317 }
318 }()
319
320 sstats := stats.NewServerStats(cfg.Name, b.cluster.cl.String())
321 lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String())
322
323 heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
324 srv = &EtcdServer{
325 readych: make(chan struct{}),
326 Cfg: cfg,
327 lgMu: new(sync.RWMutex),
328 lg: cfg.Logger,
329 errorc: make(chan error, 1),
330 v2store: b.storage.st,
331 snapshotter: b.ss,
332 r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
333 memberID: b.cluster.nodeID,
334 attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
335 cluster: b.cluster.cl,
336 stats: sstats,
337 lstats: lstats,
338 SyncTicker: time.NewTicker(500 * time.Millisecond),
339 peerRt: b.prt,
340 reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),
341 AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
342 consistIndex: b.storage.backend.ci,
343 firstCommitInTerm: notify.NewNotifier(),
344 clusterVersionChanged: notify.NewNotifier(),
345 }
346
347 addFeatureGateMetrics(cfg.ServerFeatureGate, serverFeatureEnabled)
348 serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1)
349 srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
350
351 srv.be = b.storage.backend.be
352 srv.beHooks = b.storage.backend.beHooks
353 minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
354
355 // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
356 // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
357 srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
358 MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
359 CheckpointInterval: cfg.LeaseCheckpointInterval,
360 CheckpointPersist: cfg.ServerFeatureGate.Enabled(features.LeaseCheckpointPersist),
361 ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
362 })
363
364 tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
365 func(index uint64) <-chan struct{} {
366 return srv.applyWait.Wait(index)
367 },
368 time.Duration(cfg.TokenTTL)*time.Second,
369 )
370 if err != nil {
371 cfg.Logger.Warn("failed to create token provider", zap.Error(err))
372 return nil, err
373 }
374
375 mvccStoreConfig := mvcc.StoreConfig{
376 CompactionBatchLimit: cfg.CompactionBatchLimit,
377 CompactionSleepInterval: cfg.CompactionSleepInterval,
378 }
379 srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
380 srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())
381
382 srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))
383
384 newSrv := srv // since srv == nil in defer if srv is returned as nil
385 defer func() {
386 // closing backend without first closing kv can cause
387 // resumed compactions to fail with closed tx errors
388 if err != nil {
389 newSrv.kv.Close()
390 }
391 }()
392 if num := cfg.AutoCompactionRetention; num != 0 {
393 srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
394 if err != nil {
395 return nil, err
396 }
397 srv.compactor.Run()
398 }
399
400 if err = srv.restoreAlarms(); err != nil {
401 return nil, err
402 }
403 srv.uberApply = srv.NewUberApplier()
404
405 if srv.FeatureEnabled(features.LeaseCheckpoint) {
406 // setting checkpointer enables lease checkpoint feature.
407 srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
408 if !srv.ensureLeadership() {
409 srv.lg.Warn("Ignore the checkpoint request because current member isn't a leader",
410 zap.Uint64("local-member-id", uint64(srv.MemberID())))
411 return lease.ErrNotPrimary
412 }
413
414 srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
415 return nil
416 })
417 }
418
419 // Set the hook after EtcdServer finishes the initialization to avoid
420 // the hook being called during the initialization process.
421 srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
422
423 // TODO: move transport initialization near the definition of remote
424 tr := &rafthttp.Transport{
425 Logger: cfg.Logger,
426 TLSInfo: cfg.PeerTLSInfo,
427 DialTimeout: cfg.PeerDialTimeout(),
428 ID: b.cluster.nodeID,
429 URLs: cfg.PeerURLs,
430 ClusterID: b.cluster.cl.ID(),
431 Raft: srv,
432 Snapshotter: b.ss,
433 ServerStats: sstats,
434 LeaderStats: lstats,
435 ErrorC: srv.errorc,
436 }
437 if err = tr.Start(); err != nil {
438 return nil, err
439 }
440 // add all remotes into transport
441 for _, m := range b.cluster.remotes {
442 if m.ID != b.cluster.nodeID {
443 tr.AddRemote(m.ID, m.PeerURLs)
444 }
445 }
446 for _, m := range b.cluster.cl.Members() {
447 if m.ID != b.cluster.nodeID {
448 tr.AddPeer(m.ID, m.PeerURLs)
449 }
450 }
451 srv.r.transport = tr
452
453 return srv, nil
454}
455
456func (s *EtcdServer) Logger() *zap.Logger {
457 s.lgMu.RLock()
458 l := s.lg
459 s.lgMu.RUnlock()
460 return l
461}
462
463func (s *EtcdServer) Config() config.ServerConfig {
464 return s.Cfg
465}
466
467// FeatureEnabled returns true if the feature is enabled by the etcd server, false otherwise.
468func (s *EtcdServer) FeatureEnabled(f featuregate.Feature) bool {
469 return s.Cfg.ServerFeatureGate.Enabled(f)
470}
471
472func tickToDur(ticks int, tickMs uint) string {
473 return fmt.Sprintf("%v", time.Duration(ticks)*time.Duration(tickMs)*time.Millisecond)
474}
475
476func (s *EtcdServer) adjustTicks() {
477 lg := s.Logger()
478 clusterN := len(s.cluster.Members())
479
480 // single-node fresh start, or single-node recovers from snapshot
481 if clusterN == 1 {
482 ticks := s.Cfg.ElectionTicks - 1
483 lg.Info(
484 "started as single-node; fast-forwarding election ticks",
485 zap.String("local-member-id", s.MemberID().String()),
486 zap.Int("forward-ticks", ticks),
487 zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
488 zap.Int("election-ticks", s.Cfg.ElectionTicks),
489 zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)),
490 )
491 s.r.advanceTicks(ticks)
492 return
493 }
494
495 if !s.Cfg.InitialElectionTickAdvance {
496 lg.Info("skipping initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks))
497 return
498 }
499 lg.Info("starting initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks))
500
501 // retry up to "rafthttp.ConnReadTimeout", which is 5-sec
502 // until peer connection reports; otherwise:
503 // 1. all connections failed, or
504 // 2. no active peers, or
505 // 3. restarted single-node with no snapshot
506 // then, do nothing, because advancing ticks would have no effect
507 waitTime := rafthttp.ConnReadTimeout
508 itv := 50 * time.Millisecond
509 for i := int64(0); i < int64(waitTime/itv); i++ {
510 select {
511 case <-time.After(itv):
512 case <-s.stopping:
513 return
514 }
515
516 peerN := s.r.transport.ActivePeers()
517 if peerN > 1 {
518 // multi-node received peer connection reports
519 // adjust ticks, in case slow leader message receive
520 ticks := s.Cfg.ElectionTicks - 2
521
522 lg.Info(
523 "initialized peer connections; fast-forwarding election ticks",
524 zap.String("local-member-id", s.MemberID().String()),
525 zap.Int("forward-ticks", ticks),
526 zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
527 zap.Int("election-ticks", s.Cfg.ElectionTicks),
528 zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)),
529 zap.Int("active-remote-members", peerN),
530 )
531
532 s.r.advanceTicks(ticks)
533 return
534 }
535 }
536}
537
538// Start performs any initialization of the Server necessary for it to
539// begin serving requests. It must be called before Do or Process.
540// Start must be non-blocking; any long-running server functionality
541// should be implemented in goroutines.
542func (s *EtcdServer) Start() {
543 s.start()
544 s.GoAttach(func() { s.adjustTicks() })
545 s.GoAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
546 s.GoAttach(s.purgeFile)
547 s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
548 s.GoAttach(s.monitorClusterVersions)
549 s.GoAttach(s.monitorStorageVersion)
550 s.GoAttach(s.linearizableReadLoop)
551 s.GoAttach(s.monitorKVHash)
552 s.GoAttach(s.monitorCompactHash)
553 s.GoAttach(s.monitorDowngrade)
554}
555
556// start prepares and starts server in a new goroutine. It is no longer safe to
557// modify a server's fields after it has been sent to Start.
558// This function is just used for testing.
559func (s *EtcdServer) start() {
560 lg := s.Logger()
561
562 if s.Cfg.SnapshotCount == 0 {
563 lg.Info(
564 "updating snapshot-count to default",
565 zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount),
566 zap.Uint64("updated-snapshot-count", DefaultSnapshotCount),
567 )
568 s.Cfg.SnapshotCount = DefaultSnapshotCount
569 }
570 if s.Cfg.SnapshotCatchUpEntries == 0 {
571 lg.Info(
572 "updating snapshot catch-up entries to default",
573 zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries),
574 zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries),
575 )
576 s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
577 }
578
579 s.w = wait.New()
580 s.applyWait = wait.NewTimeList()
581 s.done = make(chan struct{})
582 s.stop = make(chan struct{})
583 s.stopping = make(chan struct{}, 1)
584 s.ctx, s.cancel = context.WithCancel(context.Background())
585 s.readwaitc = make(chan struct{}, 1)
586 s.readNotifier = newNotifier()
587 s.leaderChanged = notify.NewNotifier()
588 if s.ClusterVersion() != nil {
589 lg.Info(
590 "starting etcd server",
591 zap.String("local-member-id", s.MemberID().String()),
592 zap.String("local-server-version", version.Version),
593 zap.String("cluster-id", s.Cluster().ID().String()),
594 zap.String("cluster-version", version.Cluster(s.ClusterVersion().String())),
595 )
596 membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(s.ClusterVersion().String())}).Set(1)
597 } else {
598 lg.Info(
599 "starting etcd server",
600 zap.String("local-member-id", s.MemberID().String()),
601 zap.String("local-server-version", version.Version),
602 zap.String("cluster-version", "to_be_decided"),
603 )
604 }
605
606 // TODO: if this is an empty log, writes all peer infos
607 // into the first entry
608 go s.run()
609}
610
611func (s *EtcdServer) purgeFile() {
612 lg := s.Logger()
613 var dberrc, serrc, werrc <-chan error
614 var dbdonec, sdonec, wdonec <-chan struct{}
615 if s.Cfg.MaxSnapFiles > 0 {
616 dbdonec, dberrc = fileutil.PurgeFileWithoutFlock(lg, s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
617 sdonec, serrc = fileutil.PurgeFileWithoutFlock(lg, s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
618 }
619 if s.Cfg.MaxWALFiles > 0 {
620 wdonec, werrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping)
621 }
622
623 select {
624 case e := <-dberrc:
625 lg.Fatal("failed to purge snap db file", zap.Error(e))
626 case e := <-serrc:
627 lg.Fatal("failed to purge snap file", zap.Error(e))
628 case e := <-werrc:
629 lg.Fatal("failed to purge wal file", zap.Error(e))
630 case <-s.stopping:
631 if dbdonec != nil {
632 <-dbdonec
633 }
634 if sdonec != nil {
635 <-sdonec
636 }
637 if wdonec != nil {
638 <-wdonec
639 }
640 return
641 }
642}
643
644func (s *EtcdServer) Cluster() api.Cluster { return s.cluster }
645
646func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
647
648type ServerPeer interface {
649 ServerV2
650 RaftHandler() http.Handler
651 LeaseHandler() http.Handler
652}
653
654func (s *EtcdServer) LeaseHandler() http.Handler {
655 if s.lessor == nil {
656 return nil
657 }
658 return leasehttp.NewHandler(s.lessor, s.ApplyWait)
659}
660
661func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
662
663type ServerPeerV2 interface {
664 ServerPeer
665 HashKVHandler() http.Handler
666 DowngradeEnabledHandler() http.Handler
667}
668
669func (s *EtcdServer) DowngradeInfo() *serverversion.DowngradeInfo { return s.cluster.DowngradeInfo() }
670
671type downgradeEnabledHandler struct {
672 lg *zap.Logger
673 cluster api.Cluster
674 server *EtcdServer
675}
676
677func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
678 return &downgradeEnabledHandler{
679 lg: s.Logger(),
680 cluster: s.cluster,
681 server: s,
682 }
683}
684
685func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
686 if r.Method != http.MethodGet {
687 w.Header().Set("Allow", http.MethodGet)
688 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
689 return
690 }
691
692 w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
693
694 if r.URL.Path != DowngradeEnabledPath {
695 http.Error(w, "bad path", http.StatusBadRequest)
696 return
697 }
698
699 ctx, cancel := context.WithTimeout(context.Background(), h.server.Cfg.ReqTimeout())
700 defer cancel()
701
702 // serve with linearized downgrade info
703 if err := h.server.linearizableReadNotify(ctx); err != nil {
704 http.Error(w, fmt.Sprintf("failed linearized read: %v", err),
705 http.StatusInternalServerError)
706 return
707 }
708 enabled := h.server.DowngradeInfo().Enabled
709 w.Header().Set("Content-Type", "text/plain")
710 w.Write([]byte(strconv.FormatBool(enabled)))
711}
712
713// Process takes a raft message and applies it to the server's raft state
714// machine, respecting any timeout of the given context.
715func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
716 lg := s.Logger()
717 if s.cluster.IsIDRemoved(types.ID(m.From)) {
718 lg.Warn(
719 "rejected Raft message from removed member",
720 zap.String("local-member-id", s.MemberID().String()),
721 zap.String("removed-member-id", types.ID(m.From).String()),
722 )
723 return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
724 }
725 if s.MemberID() != types.ID(m.To) {
726 lg.Warn(
727 "rejected Raft message to mismatch member",
728 zap.String("local-member-id", s.MemberID().String()),
729 zap.String("mismatch-member-id", types.ID(m.To).String()),
730 )
731 return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message to mismatch member")
732 }
733 if m.Type == raftpb.MsgApp {
734 s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
735 }
736 return s.r.Step(ctx, m)
737}
738
739func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) }
740
741func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
742
743// ReportSnapshot reports snapshot sent status to the raft state machine,
744// and clears the used snapshot from the snapshot store.
745func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
746 s.r.ReportSnapshot(id, status)
747}
748
749type etcdProgress struct {
750 confState raftpb.ConfState
751 diskSnapshotIndex uint64
752 memorySnapshotIndex uint64
753 appliedt uint64
754 appliedi uint64
755}
756
757// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
758// and helps decouple state machine logic from Raft algorithms.
759// TODO: add a state machine interface to toApply the commit entries and do snapshot/recover
760type raftReadyHandler struct {
761 getLead func() (lead uint64)
762 updateLead func(lead uint64)
763 updateLeadership func(newLeader bool)
764 updateCommittedIndex func(uint64)
765}
766
767func (s *EtcdServer) run() {
768 lg := s.Logger()
769
770 sn, err := s.r.raftStorage.Snapshot()
771 if err != nil {
772 lg.Panic("failed to get snapshot from Raft storage", zap.Error(err))
773 }
774
775 // asynchronously accept toApply packets, dispatch progress in-order
776 sched := schedule.NewFIFOScheduler(lg)
777
778 rh := &raftReadyHandler{
779 getLead: func() (lead uint64) { return s.getLead() },
780 updateLead: func(lead uint64) { s.setLead(lead) },
781 updateLeadership: func(newLeader bool) {
782 if !s.isLeader() {
783 if s.lessor != nil {
784 s.lessor.Demote()
785 }
786 if s.compactor != nil {
787 s.compactor.Pause()
788 }
789 } else {
790 if newLeader {
791 t := time.Now()
792 s.leadTimeMu.Lock()
793 s.leadElectedTime = t
794 s.leadTimeMu.Unlock()
795 }
796 if s.compactor != nil {
797 s.compactor.Resume()
798 }
799 }
800 if newLeader {
801 s.leaderChanged.Notify()
802 }
803 // TODO: remove the nil checking
804 // current test utility does not provide the stats
805 if s.stats != nil {
806 s.stats.BecomeLeader()
807 }
808 },
809 updateCommittedIndex: func(ci uint64) {
810 cci := s.getCommittedIndex()
811 if ci > cci {
812 s.setCommittedIndex(ci)
813 }
814 },
815 }
816 s.r.start(rh)
817
818 ep := etcdProgress{
819 confState: sn.Metadata.ConfState,
820 diskSnapshotIndex: sn.Metadata.Index,
821 memorySnapshotIndex: sn.Metadata.Index,
822 appliedt: sn.Metadata.Term,
823 appliedi: sn.Metadata.Index,
824 }
825
826 defer func() {
827 s.wgMu.Lock() // block concurrent waitgroup adds in GoAttach while stopping
828 close(s.stopping)
829 s.wgMu.Unlock()
830 s.cancel()
831 sched.Stop()
832
833 // wait for goroutines before closing raft so wal stays open
834 s.wg.Wait()
835
836 s.SyncTicker.Stop()
837
838 // must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
839 // by adding a peer after raft stops the transport
840 s.r.stop()
841
842 s.Cleanup()
843
844 close(s.done)
845 }()
846
847 var expiredLeaseC <-chan []*lease.Lease
848 if s.lessor != nil {
849 expiredLeaseC = s.lessor.ExpiredLeasesC()
850 }
851
852 for {
853 select {
854 case ap := <-s.r.apply():
855 f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
856 sched.Schedule(f)
857 case leases := <-expiredLeaseC:
858 s.revokeExpiredLeases(leases)
859 case err := <-s.errorc:
860 lg.Warn("server error", zap.Error(err))
861 lg.Warn("data-dir used by this member must be removed")
862 return
863 case <-s.stop:
864 return
865 }
866 }
867}
868
869func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
870 s.GoAttach(func() {
871 // We shouldn't revoke any leases if current member isn't a leader,
872 // because the operation should only be performed by the leader. When
873 // the leader gets blocked on the raft loop, such as writing WAL entries,
874 // it can't process any events or messages from raft. It may think it
875 // is still the leader even the leader has already changed.
876 // Refer to https://github.com/etcd-io/etcd/issues/15247
877 lg := s.Logger()
878 if !s.ensureLeadership() {
879 lg.Warn("Ignore the lease revoking request because current member isn't a leader",
880 zap.Uint64("local-member-id", uint64(s.MemberID())))
881 return
882 }
883
884 // Increases throughput of expired leases deletion process through parallelization
885 c := make(chan struct{}, maxPendingRevokes)
886 for _, curLease := range leases {
887 select {
888 case c <- struct{}{}:
889 case <-s.stopping:
890 return
891 }
892
893 f := func(lid int64) {
894 s.GoAttach(func() {
895 ctx := s.authStore.WithRoot(s.ctx)
896 _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lid})
897 if lerr == nil {
898 leaseExpired.Inc()
899 } else {
900 lg.Warn(
901 "failed to revoke lease",
902 zap.String("lease-id", fmt.Sprintf("%016x", lid)),
903 zap.Error(lerr),
904 )
905 }
906
907 <-c
908 })
909 }
910
911 f(int64(curLease.ID))
912 }
913 })
914}
915
916// isActive checks if the etcd instance is still actively processing the
917// heartbeat message (ticks). It returns false if no heartbeat has been
918// received within 3 * tickMs.
919func (s *EtcdServer) isActive() bool {
920 latestTickTs := s.r.getLatestTickTs()
921 threshold := 3 * time.Duration(s.Cfg.TickMs) * time.Millisecond
922 return latestTickTs.Add(threshold).After(time.Now())
923}
924
925// ensureLeadership checks whether current member is still the leader.
926func (s *EtcdServer) ensureLeadership() bool {
927 lg := s.Logger()
928
929 if s.isActive() {
930 lg.Debug("The member is active, skip checking leadership",
931 zap.Time("latestTickTs", s.r.getLatestTickTs()),
932 zap.Time("now", time.Now()))
933 return true
934 }
935
936 ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
937 defer cancel()
938 if err := s.linearizableReadNotify(ctx); err != nil {
939 lg.Warn("Failed to check current member's leadership",
940 zap.Error(err))
941 return false
942 }
943
944 newLeaderID := s.raftStatus().Lead
945 if newLeaderID != uint64(s.MemberID()) {
946 lg.Warn("Current member isn't a leader",
947 zap.Uint64("local-member-id", uint64(s.MemberID())),
948 zap.Uint64("new-lead", newLeaderID))
949 return false
950 }
951
952 return true
953}
954
955// Cleanup removes allocated objects by EtcdServer.NewServer in
956// situation that EtcdServer::Start was not called (that takes care of cleanup).
957func (s *EtcdServer) Cleanup() {
958 // kv, lessor and backend can be nil if running without v3 enabled
959 // or running unit tests.
960 if s.lessor != nil {
961 s.lessor.Stop()
962 }
963 if s.kv != nil {
964 s.kv.Close()
965 }
966 if s.authStore != nil {
967 s.authStore.Close()
968 }
969 if s.be != nil {
970 s.be.Close()
971 }
972 if s.compactor != nil {
973 s.compactor.Stop()
974 }
975}
976
977func (s *EtcdServer) Defragment() error {
978 s.bemu.Lock()
979 defer s.bemu.Unlock()
980 return s.be.Defrag()
981}
982
983func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
984 s.applySnapshot(ep, apply)
985 s.applyEntries(ep, apply)
986 backend.VerifyBackendConsistency(s.Backend(), s.Logger(), true, schema.AllBuckets...)
987
988 proposalsApplied.Set(float64(ep.appliedi))
989 s.applyWait.Trigger(ep.appliedi)
990
991 // wait for the raft routine to finish the disk writes before triggering a
992 // snapshot. or applied index might be greater than the last index in raft
993 // storage, since the raft routine might be slower than toApply routine.
994 <-apply.notifyc
995
996 s.snapshotIfNeededAndCompactRaftLog(ep)
997 select {
998 // snapshot requested via send()
999 case m := <-s.r.msgSnapC:
1000 merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
1001 s.sendMergedSnap(merged)
1002 default:
1003 }
1004}
1005
1006func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
1007 if raft.IsEmptySnap(toApply.snapshot) {
1008 return
1009 }
1010 applySnapshotInProgress.Inc()
1011
1012 lg := s.Logger()
1013 lg.Info(
1014 "applying snapshot",
1015 zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
1016 zap.Uint64("current-applied-index", ep.appliedi),
1017 zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
1018 zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
1019 )
1020 defer func() {
1021 lg.Info(
1022 "applied snapshot",
1023 zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
1024 zap.Uint64("current-applied-index", ep.appliedi),
1025 zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
1026 zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
1027 )
1028 applySnapshotInProgress.Dec()
1029 }()
1030
1031 if toApply.snapshot.Metadata.Index <= ep.appliedi {
1032 lg.Panic(
1033 "unexpected leader snapshot from outdated index",
1034 zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
1035 zap.Uint64("current-applied-index", ep.appliedi),
1036 zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
1037 zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
1038 )
1039 }
1040
1041 // wait for raftNode to persist snapshot onto the disk
1042 <-toApply.notifyc
1043
1044 bemuUnlocked := false
1045 s.bemu.Lock()
1046 defer func() {
1047 if !bemuUnlocked {
1048 s.bemu.Unlock()
1049 }
1050 }()
1051
1052 // gofail: var applyBeforeOpenSnapshot struct{}
1053 newbe, err := serverstorage.OpenSnapshotBackend(s.Cfg, s.snapshotter, toApply.snapshot, s.beHooks)
1054 if err != nil {
1055 lg.Panic("failed to open snapshot backend", zap.Error(err))
1056 }
1057 lg.Info("applySnapshot: opened snapshot backend")
1058 // gofail: var applyAfterOpenSnapshot struct{}
1059
1060 // We need to set the backend to consistIndex before recovering the lessor,
1061 // because lessor.Recover will commit the boltDB transaction, accordingly it
1062 // will get the old consistent_index persisted into the db in OnPreCommitUnsafe.
1063 // Eventually the new consistent_index value coming from snapshot is overwritten
1064 // by the old value.
1065 s.consistIndex.SetBackend(newbe)
1066 verifySnapshotIndex(toApply.snapshot, s.consistIndex.ConsistentIndex())
1067
1068 // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
1069 // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
1070 if s.lessor != nil {
1071 lg.Info("restoring lease store")
1072
1073 s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })
1074
1075 lg.Info("restored lease store")
1076 }
1077
1078 lg.Info("restoring mvcc store")
1079
1080 if err := s.kv.Restore(newbe); err != nil {
1081 lg.Panic("failed to restore mvcc store", zap.Error(err))
1082 }
1083
1084 newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())
1085
1086 lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
1087
1088 oldbe := s.be
1089 s.be = newbe
1090 s.bemu.Unlock()
1091 bemuUnlocked = true
1092
1093 // Closing old backend might block until all the txns
1094 // on the backend are finished.
1095 // We do not want to wait on closing the old backend.
1096 go func() {
1097 lg.Info("closing old backend file")
1098 defer func() {
1099 lg.Info("closed old backend file")
1100 }()
1101 if err := oldbe.Close(); err != nil {
1102 lg.Panic("failed to close old backend", zap.Error(err))
1103 }
1104 }()
1105
1106 lg.Info("restoring alarm store")
1107
1108 if err := s.restoreAlarms(); err != nil {
1109 lg.Panic("failed to restore alarm store", zap.Error(err))
1110 }
1111
1112 lg.Info("restored alarm store")
1113
1114 if s.authStore != nil {
1115 lg.Info("restoring auth store")
1116
1117 s.authStore.Recover(schema.NewAuthBackend(lg, newbe))
1118
1119 lg.Info("restored auth store")
1120 }
1121
1122 lg.Info("restoring v2 store")
1123 if err := s.v2store.Recovery(toApply.snapshot.Data); err != nil {
1124 lg.Panic("failed to restore v2 store", zap.Error(err))
1125 }
1126
1127 if err := serverstorage.AssertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {
1128 lg.Panic("illegal v2store content", zap.Error(err))
1129 }
1130
1131 lg.Info("restored v2 store")
1132
1133 s.cluster.SetBackend(schema.NewMembershipBackend(lg, newbe))
1134
1135 lg.Info("restoring cluster configuration")
1136
1137 s.cluster.Recover(api.UpdateCapability)
1138
1139 lg.Info("restored cluster configuration")
1140 lg.Info("removing old peers from network")
1141
1142 // recover raft transport
1143 s.r.transport.RemoveAllPeers()
1144
1145 lg.Info("removed old peers from network")
1146 lg.Info("adding peers from new cluster configuration")
1147
1148 for _, m := range s.cluster.Members() {
1149 if m.ID == s.MemberID() {
1150 continue
1151 }
1152 s.r.transport.AddPeer(m.ID, m.PeerURLs)
1153 }
1154
1155 lg.Info("added peers from new cluster configuration")
1156
1157 ep.appliedt = toApply.snapshot.Metadata.Term
1158 ep.appliedi = toApply.snapshot.Metadata.Index
1159 ep.diskSnapshotIndex = ep.appliedi
1160 ep.memorySnapshotIndex = ep.appliedi
1161 ep.confState = toApply.snapshot.Metadata.ConfState
1162
1163 // As backends and implementations like alarmsStore changed, we need
1164 // to re-bootstrap Appliers.
1165 s.uberApply = s.NewUberApplier()
1166}
1167
1168func (s *EtcdServer) NewUberApplier() apply.UberApplier {
1169 return apply.NewUberApplier(s.lg, s.be, s.KV(), s.alarmStore, s.authStore, s.lessor, s.cluster, s, s, s.consistIndex,
1170 s.Cfg.WarningApplyDuration, s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer), s.Cfg.QuotaBackendBytes)
1171}
1172
1173func verifySnapshotIndex(snapshot raftpb.Snapshot, cindex uint64) {
1174 verify.Verify(func() {
1175 if cindex != snapshot.Metadata.Index {
1176 panic(fmt.Sprintf("consistent_index(%d) isn't equal to snapshot index (%d)", cindex, snapshot.Metadata.Index))
1177 }
1178 })
1179}
1180
1181func verifyConsistentIndexIsLatest(lg *zap.Logger, snapshot raftpb.Snapshot, cindex uint64) {
1182 verify.Verify(func() {
1183 if cindex < snapshot.Metadata.Index {
1184 lg.Panic(fmt.Sprintf("consistent_index(%d) is older than snapshot index (%d)", cindex, snapshot.Metadata.Index))
1185 }
1186 })
1187}
1188
1189func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
1190 if len(apply.entries) == 0 {
1191 return
1192 }
1193 firsti := apply.entries[0].Index
1194 if firsti > ep.appliedi+1 {
1195 lg := s.Logger()
1196 lg.Panic(
1197 "unexpected committed entry index",
1198 zap.Uint64("current-applied-index", ep.appliedi),
1199 zap.Uint64("first-committed-entry-index", firsti),
1200 )
1201 }
1202 var ents []raftpb.Entry
1203 if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
1204 ents = apply.entries[ep.appliedi+1-firsti:]
1205 }
1206 if len(ents) == 0 {
1207 return
1208 }
1209 var shouldstop bool
1210 if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.raftAdvancedC); shouldstop {
1211 go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
1212 }
1213}
1214
1215func (s *EtcdServer) ForceSnapshot() {
1216 s.forceDiskSnapshot = true
1217}
1218
1219func (s *EtcdServer) snapshotIfNeededAndCompactRaftLog(ep *etcdProgress) {
1220 // TODO: Remove disk snapshot in v3.7
1221 shouldSnapshotToDisk := s.shouldSnapshotToDisk(ep)
1222 shouldSnapshotToMemory := s.shouldSnapshotToMemory(ep)
1223 if !shouldSnapshotToDisk && !shouldSnapshotToMemory {
1224 return
1225 }
1226 s.snapshot(ep, shouldSnapshotToDisk)
1227 s.compactRaftLog(ep.appliedi)
1228}
1229
1230func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
1231 return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
1232}
1233
1234func (s *EtcdServer) shouldSnapshotToMemory(ep *etcdProgress) bool {
1235 return ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount
1236}
1237
1238func (s *EtcdServer) hasMultipleVotingMembers() bool {
1239 return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1
1240}
1241
1242func (s *EtcdServer) isLeader() bool {
1243 return uint64(s.MemberID()) == s.Lead()
1244}
1245
1246// MoveLeader transfers the leader to the given transferee.
1247func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error {
1248 member := s.cluster.Member(types.ID(transferee))
1249 if member == nil || member.IsLearner {
1250 return errors.ErrBadLeaderTransferee
1251 }
1252
1253 now := time.Now()
1254 interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
1255
1256 lg := s.Logger()
1257 lg.Info(
1258 "leadership transfer starting",
1259 zap.String("local-member-id", s.MemberID().String()),
1260 zap.String("current-leader-member-id", types.ID(lead).String()),
1261 zap.String("transferee-member-id", types.ID(transferee).String()),
1262 )
1263
1264 s.r.TransferLeadership(ctx, lead, transferee)
1265 for s.Lead() != transferee {
1266 select {
1267 case <-ctx.Done(): // time out
1268 return errors.ErrTimeoutLeaderTransfer
1269 case <-time.After(interval):
1270 }
1271 }
1272
1273 // TODO: drain all requests, or drop all messages to the old leader
1274 lg.Info(
1275 "leadership transfer finished",
1276 zap.String("local-member-id", s.MemberID().String()),
1277 zap.String("old-leader-member-id", types.ID(lead).String()),
1278 zap.String("new-leader-member-id", types.ID(transferee).String()),
1279 zap.Duration("took", time.Since(now)),
1280 )
1281 return nil
1282}
1283
1284// TryTransferLeadershipOnShutdown transfers the leader to the chosen transferee. It is only used in server graceful shutdown.
1285func (s *EtcdServer) TryTransferLeadershipOnShutdown() error {
1286 lg := s.Logger()
1287 if !s.isLeader() {
1288 lg.Info(
1289 "skipped leadership transfer; local server is not leader",
1290 zap.String("local-member-id", s.MemberID().String()),
1291 zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
1292 )
1293 return nil
1294 }
1295
1296 if !s.hasMultipleVotingMembers() {
1297 lg.Info(
1298 "skipped leadership transfer for single voting member cluster",
1299 zap.String("local-member-id", s.MemberID().String()),
1300 zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
1301 )
1302 return nil
1303 }
1304
1305 transferee, ok := longestConnected(s.r.transport, s.cluster.VotingMemberIDs())
1306 if !ok {
1307 return errors.ErrUnhealthy
1308 }
1309
1310 tm := s.Cfg.ReqTimeout()
1311 ctx, cancel := context.WithTimeout(s.ctx, tm)
1312 err := s.MoveLeader(ctx, s.Lead(), uint64(transferee))
1313 cancel()
1314 return err
1315}
1316
1317// HardStop stops the server without coordination with other members in the cluster.
1318func (s *EtcdServer) HardStop() {
1319 select {
1320 case s.stop <- struct{}{}:
1321 case <-s.done:
1322 return
1323 }
1324 <-s.done
1325}
1326
1327// Stop stops the server gracefully, and shuts down the running goroutine.
1328// Stop should be called after a Start(s), otherwise it will block forever.
1329// When stopping leader, Stop transfers its leadership to one of its peers
1330// before stopping the server.
1331// Stop terminates the Server and performs any necessary finalization.
1332// Do and Process cannot be called after Stop has been invoked.
1333func (s *EtcdServer) Stop() {
1334 lg := s.Logger()
1335 if err := s.TryTransferLeadershipOnShutdown(); err != nil {
1336 lg.Warn("leadership transfer failed", zap.String("local-member-id", s.MemberID().String()), zap.Error(err))
1337 }
1338 s.HardStop()
1339}
1340
1341// ReadyNotify returns a channel that will be closed when the server
1342// is ready to serve client requests
1343func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }
1344
1345func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
1346 select {
1347 case <-time.After(d):
1348 case <-s.done:
1349 }
1350 select {
1351 case s.errorc <- err:
1352 default:
1353 }
1354}
1355
1356// StopNotify returns a channel that receives an empty struct
1357// when the server is stopped.
1358func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
1359
1360// StoppingNotify returns a channel that receives an empty struct
1361// when the server is being stopped.
1362func (s *EtcdServer) StoppingNotify() <-chan struct{} { return s.stopping }
1363
1364func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) error {
1365 if s.authStore == nil {
1366 // In the context of ordinary etcd process, s.authStore will never be nil.
1367 // This branch is for handling cases in server_test.go
1368 return nil
1369 }
1370
1371 // Note that this permission check is done in the API layer,
1372 // so TOCTOU problem can be caused potentially in a schedule like this:
1373 // update membership with user A -> revoke root role of A -> toApply membership change
1374 // in the state machine layer
1375 // However, both of membership change and role management requires the root privilege.
1376 // So careful operation by admins can prevent the problem.
1377 authInfo, err := s.AuthInfoFromCtx(ctx)
1378 if err != nil {
1379 return err
1380 }
1381
1382 return s.AuthStore().IsAdminPermitted(authInfo)
1383}
1384
1385func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1386 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1387 return nil, err
1388 }
1389
1390 // TODO: move Member to protobuf type
1391 b, err := json.Marshal(memb)
1392 if err != nil {
1393 return nil, err
1394 }
1395
1396 // by default StrictReconfigCheck is enabled; reject new members if unhealthy.
1397 if err := s.mayAddMember(memb); err != nil {
1398 return nil, err
1399 }
1400
1401 cc := raftpb.ConfChange{
1402 Type: raftpb.ConfChangeAddNode,
1403 NodeID: uint64(memb.ID),
1404 Context: b,
1405 }
1406
1407 if memb.IsLearner {
1408 cc.Type = raftpb.ConfChangeAddLearnerNode
1409 }
1410
1411 return s.configure(ctx, cc)
1412}
1413
1414func (s *EtcdServer) mayAddMember(memb membership.Member) error {
1415 lg := s.Logger()
1416 if !s.Cfg.StrictReconfigCheck {
1417 return nil
1418 }
1419
1420 // protect quorum when adding voting member
1421 if !memb.IsLearner && !s.cluster.IsReadyToAddVotingMember() {
1422 lg.Warn(
1423 "rejecting member add request; not enough healthy members",
1424 zap.String("local-member-id", s.MemberID().String()),
1425 zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
1426 zap.Error(errors.ErrNotEnoughStartedMembers),
1427 )
1428 return errors.ErrNotEnoughStartedMembers
1429 }
1430
1431 if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.MemberID(), s.cluster.VotingMembers()) {
1432 lg.Warn(
1433 "rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum",
1434 zap.String("local-member-id", s.MemberID().String()),
1435 zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
1436 zap.Error(errors.ErrUnhealthy),
1437 )
1438 return errors.ErrUnhealthy
1439 }
1440
1441 return nil
1442}
1443
1444func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1445 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1446 return nil, err
1447 }
1448
1449 // by default StrictReconfigCheck is enabled; reject removal if leads to quorum loss
1450 if err := s.mayRemoveMember(types.ID(id)); err != nil {
1451 return nil, err
1452 }
1453
1454 cc := raftpb.ConfChange{
1455 Type: raftpb.ConfChangeRemoveNode,
1456 NodeID: id,
1457 }
1458 return s.configure(ctx, cc)
1459}
1460
1461// PromoteMember promotes a learner node to a voting node.
1462func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1463 // only raft leader has information on whether the to-be-promoted learner node is ready. If promoteMember call
1464 // fails with ErrNotLeader, forward the request to leader node via HTTP. If promoteMember call fails with error
1465 // other than ErrNotLeader, return the error.
1466 resp, err := s.promoteMember(ctx, id)
1467 if err == nil {
1468 learnerPromoteSucceed.Inc()
1469 return resp, nil
1470 }
1471 if !errorspkg.Is(err, errors.ErrNotLeader) {
1472 learnerPromoteFailed.WithLabelValues(err.Error()).Inc()
1473 return resp, err
1474 }
1475
1476 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
1477 defer cancel()
1478 // forward to leader
1479 for cctx.Err() == nil {
1480 leader, err := s.waitLeader(cctx)
1481 if err != nil {
1482 return nil, err
1483 }
1484 for _, url := range leader.PeerURLs {
1485 resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt)
1486 if err == nil {
1487 return resp, nil
1488 }
1489 // If member promotion failed, return early. Otherwise keep retry.
1490 if errorspkg.Is(err, errors.ErrLearnerNotReady) || errorspkg.Is(err, membership.ErrIDNotFound) || errorspkg.Is(err, membership.ErrMemberNotLearner) {
1491 return nil, err
1492 }
1493 }
1494 }
1495
1496 if errorspkg.Is(cctx.Err(), context.DeadlineExceeded) {
1497 return nil, errors.ErrTimeout
1498 }
1499 return nil, errors.ErrCanceled
1500}
1501
1502// promoteMember checks whether the to-be-promoted learner node is ready before sending the promote
1503// request to raft.
1504// The function returns ErrNotLeader if the local node is not raft leader (therefore does not have
1505// enough information to determine if the learner node is ready), returns ErrLearnerNotReady if the
1506// local node is leader (therefore has enough information) but decided the learner node is not ready
1507// to be promoted.
1508func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1509 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1510 return nil, err
1511 }
1512
1513 // check if we can promote this learner.
1514 if err := s.mayPromoteMember(types.ID(id)); err != nil {
1515 return nil, err
1516 }
1517
1518 // build the context for the promote confChange. mark IsLearner to false and IsPromote to true.
1519 promoteChangeContext := membership.ConfigChangeContext{
1520 Member: membership.Member{
1521 ID: types.ID(id),
1522 },
1523 IsPromote: true,
1524 }
1525
1526 b, err := json.Marshal(promoteChangeContext)
1527 if err != nil {
1528 return nil, err
1529 }
1530
1531 cc := raftpb.ConfChange{
1532 Type: raftpb.ConfChangeAddNode,
1533 NodeID: id,
1534 Context: b,
1535 }
1536
1537 return s.configure(ctx, cc)
1538}
1539
1540func (s *EtcdServer) mayPromoteMember(id types.ID) error {
1541 lg := s.Logger()
1542 if err := s.isLearnerReady(lg, uint64(id)); err != nil {
1543 return err
1544 }
1545
1546 if !s.Cfg.StrictReconfigCheck {
1547 return nil
1548 }
1549 if !s.cluster.IsReadyToPromoteMember(uint64(id)) {
1550 lg.Warn(
1551 "rejecting member promote request; not enough healthy members",
1552 zap.String("local-member-id", s.MemberID().String()),
1553 zap.String("requested-member-remove-id", id.String()),
1554 zap.Error(errors.ErrNotEnoughStartedMembers),
1555 )
1556 return errors.ErrNotEnoughStartedMembers
1557 }
1558
1559 return nil
1560}
1561
1562// check whether the learner catches up with leader or not.
1563// Note: it will return nil if member is not found in cluster or if member is not learner.
1564// These two conditions will be checked before toApply phase later.
1565func (s *EtcdServer) isLearnerReady(lg *zap.Logger, id uint64) error {
1566 if err := s.waitAppliedIndex(); err != nil {
1567 return err
1568 }
1569
1570 rs := s.raftStatus()
1571
1572 // leader's raftStatus.Progress is not nil
1573 if rs.Progress == nil {
1574 return errors.ErrNotLeader
1575 }
1576
1577 var learnerMatch uint64
1578 isFound := false
1579 leaderID := rs.ID
1580 for memberID, progress := range rs.Progress {
1581 if id == memberID {
1582 // check its status
1583 learnerMatch = progress.Match
1584 isFound = true
1585 break
1586 }
1587 }
1588
1589 // We should return an error in API directly, to avoid the request
1590 // being unnecessarily delivered to raft.
1591 if !isFound {
1592 return membership.ErrIDNotFound
1593 }
1594
1595 leaderMatch := rs.Progress[leaderID].Match
1596
1597 learnerReadyPercent := float64(learnerMatch) / float64(leaderMatch)
1598
1599 // the learner's Match not caught up with leader yet
1600 if learnerReadyPercent < readyPercentThreshold {
1601 lg.Error(
1602 "rejecting promote learner: learner is not ready",
1603 zap.Float64("learner-ready-percent", learnerReadyPercent),
1604 zap.Float64("ready-percent-threshold", readyPercentThreshold),
1605 )
1606 return errors.ErrLearnerNotReady
1607 }
1608
1609 return nil
1610}
1611
1612func (s *EtcdServer) mayRemoveMember(id types.ID) error {
1613 if !s.Cfg.StrictReconfigCheck {
1614 return nil
1615 }
1616
1617 lg := s.Logger()
1618 member := s.cluster.Member(id)
1619 // no need to check quorum when removing non-voting member
1620 if member != nil && member.IsLearner {
1621 return nil
1622 }
1623
1624 if !s.cluster.IsReadyToRemoveVotingMember(uint64(id)) {
1625 lg.Warn(
1626 "rejecting member remove request; not enough healthy members",
1627 zap.String("local-member-id", s.MemberID().String()),
1628 zap.String("requested-member-remove-id", id.String()),
1629 zap.Error(errors.ErrNotEnoughStartedMembers),
1630 )
1631 return errors.ErrNotEnoughStartedMembers
1632 }
1633
1634 // downed member is safe to remove since it's not part of the active quorum
1635 if t := s.r.transport.ActiveSince(id); id != s.MemberID() && t.IsZero() {
1636 return nil
1637 }
1638
1639 // protect quorum if some members are down
1640 m := s.cluster.VotingMembers()
1641 active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.MemberID(), m)
1642 if (active - 1) < 1+((len(m)-1)/2) {
1643 lg.Warn(
1644 "rejecting member remove request; local member has not been connected to all peers, reconfigure breaks active quorum",
1645 zap.String("local-member-id", s.MemberID().String()),
1646 zap.String("requested-member-remove", id.String()),
1647 zap.Int("active-peers", active),
1648 zap.Error(errors.ErrUnhealthy),
1649 )
1650 return errors.ErrUnhealthy
1651 }
1652
1653 return nil
1654}
1655
1656func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1657 b, merr := json.Marshal(memb)
1658 if merr != nil {
1659 return nil, merr
1660 }
1661
1662 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1663 return nil, err
1664 }
1665 cc := raftpb.ConfChange{
1666 Type: raftpb.ConfChangeUpdateNode,
1667 NodeID: uint64(memb.ID),
1668 Context: b,
1669 }
1670 return s.configure(ctx, cc)
1671}
1672
1673func (s *EtcdServer) setCommittedIndex(v uint64) {
1674 atomic.StoreUint64(&s.committedIndex, v)
1675}
1676
1677func (s *EtcdServer) getCommittedIndex() uint64 {
1678 return atomic.LoadUint64(&s.committedIndex)
1679}
1680
1681func (s *EtcdServer) setAppliedIndex(v uint64) {
1682 atomic.StoreUint64(&s.appliedIndex, v)
1683}
1684
1685func (s *EtcdServer) getAppliedIndex() uint64 {
1686 return atomic.LoadUint64(&s.appliedIndex)
1687}
1688
1689func (s *EtcdServer) setTerm(v uint64) {
1690 atomic.StoreUint64(&s.term, v)
1691}
1692
1693func (s *EtcdServer) getTerm() uint64 {
1694 return atomic.LoadUint64(&s.term)
1695}
1696
1697func (s *EtcdServer) setLead(v uint64) {
1698 atomic.StoreUint64(&s.lead, v)
1699}
1700
1701func (s *EtcdServer) getLead() uint64 {
1702 return atomic.LoadUint64(&s.lead)
1703}
1704
1705func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} {
1706 return s.leaderChanged.Receive()
1707}
1708
1709// FirstCommitInTermNotify returns channel that will be unlocked on first
1710// entry committed in new term, which is necessary for new leader to answer
1711// read-only requests (leader is not able to respond any read-only requests
1712// as long as linearizable semantic is required)
1713func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} {
1714 return s.firstCommitInTerm.Receive()
1715}
1716
1717// MemberId returns the ID of the local member.
1718// Deprecated: Please use (*EtcdServer) MemberID instead.
1719//
1720//revive:disable:var-naming
1721func (s *EtcdServer) MemberId() types.ID { return s.MemberID() }
1722
1723//revive:enable:var-naming
1724
1725func (s *EtcdServer) MemberID() types.ID { return s.memberID }
1726
1727func (s *EtcdServer) Leader() types.ID { return types.ID(s.getLead()) }
1728
1729func (s *EtcdServer) Lead() uint64 { return s.getLead() }
1730
1731func (s *EtcdServer) CommittedIndex() uint64 { return s.getCommittedIndex() }
1732
1733func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() }
1734
1735func (s *EtcdServer) Term() uint64 { return s.getTerm() }
1736
1737type confChangeResponse struct {
1738 membs []*membership.Member
1739 raftAdvanceC <-chan struct{}
1740 err error
1741}
1742
1743// configure sends a configuration change through consensus and
1744// then waits for it to be applied to the server. It
1745// will block until the change is performed or there is an error.
1746func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
1747 lg := s.Logger()
1748 cc.ID = s.reqIDGen.Next()
1749 ch := s.w.Register(cc.ID)
1750
1751 start := time.Now()
1752 if err := s.r.ProposeConfChange(ctx, cc); err != nil {
1753 s.w.Trigger(cc.ID, nil)
1754 return nil, err
1755 }
1756
1757 select {
1758 case x := <-ch:
1759 if x == nil {
1760 lg.Panic("failed to configure")
1761 }
1762 resp := x.(*confChangeResponse)
1763 // etcdserver need to ensure the raft has already been notified
1764 // or advanced before it responds to the client. Otherwise, the
1765 // following config change request may be rejected.
1766 // See https://github.com/etcd-io/etcd/issues/15528.
1767 <-resp.raftAdvanceC
1768 lg.Info(
1769 "applied a configuration change through raft",
1770 zap.String("local-member-id", s.MemberID().String()),
1771 zap.String("raft-conf-change", cc.Type.String()),
1772 zap.String("raft-conf-change-node-id", types.ID(cc.NodeID).String()),
1773 )
1774 return resp.membs, resp.err
1775
1776 case <-ctx.Done():
1777 s.w.Trigger(cc.ID, nil) // GC wait
1778 return nil, s.parseProposeCtxErr(ctx.Err(), start)
1779
1780 case <-s.stopping:
1781 return nil, errors.ErrStopped
1782 }
1783}
1784
1785// publishV3 registers server information into the cluster using v3 request. The
1786// information is the JSON representation of this server's member struct, updated
1787// with the static clientURLs of the server.
1788// The function keeps attempting to register until it succeeds,
1789// or its server is stopped.
1790func (s *EtcdServer) publishV3(timeout time.Duration) {
1791 req := &membershippb.ClusterMemberAttrSetRequest{
1792 Member_ID: uint64(s.MemberID()),
1793 MemberAttributes: &membershippb.Attributes{
1794 Name: s.attributes.Name,
1795 ClientUrls: s.attributes.ClientURLs,
1796 },
1797 }
1798 // gofail: var beforePublishing struct{}
1799 lg := s.Logger()
1800 for {
1801 select {
1802 case <-s.stopping:
1803 lg.Warn(
1804 "stopped publish because server is stopping",
1805 zap.String("local-member-id", s.MemberID().String()),
1806 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
1807 zap.Duration("publish-timeout", timeout),
1808 )
1809 return
1810
1811 default:
1812 }
1813
1814 ctx, cancel := context.WithTimeout(s.ctx, timeout)
1815 _, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterMemberAttrSet: req})
1816 cancel()
1817 switch err {
1818 case nil:
1819 close(s.readych)
1820 lg.Info(
1821 "published local member to cluster through raft",
1822 zap.String("local-member-id", s.MemberID().String()),
1823 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
1824 zap.String("cluster-id", s.cluster.ID().String()),
1825 zap.Duration("publish-timeout", timeout),
1826 )
1827 return
1828
1829 default:
1830 lg.Warn(
1831 "failed to publish local member to cluster through raft",
1832 zap.String("local-member-id", s.MemberID().String()),
1833 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
1834 zap.Duration("publish-timeout", timeout),
1835 zap.Error(err),
1836 )
1837 }
1838 }
1839}
1840
1841func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
1842 atomic.AddInt64(&s.inflightSnapshots, 1)
1843
1844 lg := s.Logger()
1845 fields := []zap.Field{
1846 zap.String("from", s.MemberID().String()),
1847 zap.String("to", types.ID(merged.To).String()),
1848 zap.Int64("bytes", merged.TotalSize),
1849 zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
1850 }
1851
1852 now := time.Now()
1853 s.r.transport.SendSnapshot(merged)
1854 lg.Info("sending merged snapshot", fields...)
1855
1856 s.GoAttach(func() {
1857 select {
1858 case ok := <-merged.CloseNotify():
1859 // delay releasing inflight snapshot for another 30 seconds to
1860 // block log compaction.
1861 // If the follower still fails to catch up, it is probably just too slow
1862 // to catch up. We cannot avoid the snapshot cycle anyway.
1863 if ok {
1864 select {
1865 case <-time.After(releaseDelayAfterSnapshot):
1866 case <-s.stopping:
1867 }
1868 }
1869
1870 atomic.AddInt64(&s.inflightSnapshots, -1)
1871
1872 lg.Info("sent merged snapshot", append(fields, zap.Duration("took", time.Since(now)))...)
1873
1874 case <-s.stopping:
1875 lg.Warn("canceled sending merged snapshot; server stopping", fields...)
1876 return
1877 }
1878 })
1879}
1880
1881// toApply takes entries received from Raft (after it has been committed) and
1882// applies them to the current state of the EtcdServer.
1883// The given entries should not be empty.
1884func (s *EtcdServer) apply(
1885 es []raftpb.Entry,
1886 confState *raftpb.ConfState,
1887 raftAdvancedC <-chan struct{},
1888) (appliedt uint64, appliedi uint64, shouldStop bool) {
1889 s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
1890 for i := range es {
1891 e := es[i]
1892 index := s.consistIndex.ConsistentIndex()
1893 s.lg.Debug("Applying entry",
1894 zap.Uint64("consistent-index", index),
1895 zap.Uint64("entry-index", e.Index),
1896 zap.Uint64("entry-term", e.Term),
1897 zap.Stringer("entry-type", e.Type))
1898
1899 // We need to toApply all WAL entries on top of v2store
1900 // and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
1901 shouldApplyV3 := membership.ApplyV2storeOnly
1902 if e.Index > index {
1903 shouldApplyV3 = membership.ApplyBoth
1904 // set the consistent index of current executing entry
1905 s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
1906 }
1907 switch e.Type {
1908 case raftpb.EntryNormal:
1909 // gofail: var beforeApplyOneEntryNormal struct{}
1910 s.applyEntryNormal(&e, shouldApplyV3)
1911 s.setAppliedIndex(e.Index)
1912 s.setTerm(e.Term)
1913
1914 case raftpb.EntryConfChange:
1915 // gofail: var beforeApplyOneConfChange struct{}
1916 var cc raftpb.ConfChange
1917 pbutil.MustUnmarshal(&cc, e.Data)
1918 removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
1919 s.setAppliedIndex(e.Index)
1920 s.setTerm(e.Term)
1921 shouldStop = shouldStop || removedSelf
1922 s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), raftAdvancedC, err})
1923
1924 default:
1925 lg := s.Logger()
1926 lg.Panic(
1927 "unknown entry type; must be either EntryNormal or EntryConfChange",
1928 zap.String("type", e.Type.String()),
1929 )
1930 }
1931 appliedi, appliedt = e.Index, e.Term
1932 }
1933 return appliedt, appliedi, shouldStop
1934}
1935
1936// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
1937func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.ShouldApplyV3) {
1938 var ar *apply.Result
1939 if shouldApplyV3 {
1940 defer func() {
1941 // The txPostLockInsideApplyHook will not get called in some cases,
1942 // in which we should move the consistent index forward directly.
1943 newIndex := s.consistIndex.ConsistentIndex()
1944 if newIndex < e.Index {
1945 s.consistIndex.SetConsistentIndex(e.Index, e.Term)
1946 }
1947 }()
1948 }
1949
1950 // raft state machine may generate noop entry when leader confirmation.
1951 // skip it in advance to avoid some potential bug in the future
1952 if len(e.Data) == 0 {
1953 s.firstCommitInTerm.Notify()
1954
1955 // promote lessor when the local member is leader and finished
1956 // applying all entries from the last term.
1957 if s.isLeader() {
1958 s.lessor.Promote(s.Cfg.ElectionTimeout())
1959 }
1960 return
1961 }
1962
1963 var raftReq pb.InternalRaftRequest
1964 if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
1965 var r pb.Request
1966 rp := &r
1967 pbutil.MustUnmarshal(rp, e.Data)
1968 s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
1969 raftReq = v2ToV3Request(s.lg, (*RequestV2)(rp))
1970 }
1971 s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
1972
1973 if raftReq.V2 != nil {
1974 req := (*RequestV2)(raftReq.V2)
1975 raftReq = v2ToV3Request(s.lg, req)
1976 }
1977
1978 id := raftReq.ID
1979 if id == 0 {
1980 if raftReq.Header == nil {
1981 s.lg.Panic("applyEntryNormal, could not find a header")
1982 }
1983 id = raftReq.Header.ID
1984 }
1985
1986 needResult := s.w.IsRegistered(id)
1987 if needResult || !noSideEffect(&raftReq) {
1988 if !needResult && raftReq.Txn != nil {
1989 removeNeedlessRangeReqs(raftReq.Txn)
1990 }
1991 ar = s.applyInternalRaftRequest(&raftReq, shouldApplyV3)
1992 }
1993
1994 // do not re-toApply applied entries.
1995 if !shouldApplyV3 {
1996 return
1997 }
1998
1999 if ar == nil {
2000 return
2001 }
2002
2003 if !errorspkg.Is(ar.Err, errors.ErrNoSpace) || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
2004 s.w.Trigger(id, ar)
2005 return
2006 }
2007
2008 lg := s.Logger()
2009 lg.Warn(
2010 "message exceeded backend quota; raising alarm",
2011 zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
2012 zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))),
2013 zap.Error(ar.Err),
2014 )
2015
2016 s.GoAttach(func() {
2017 a := &pb.AlarmRequest{
2018 MemberID: uint64(s.MemberID()),
2019 Action: pb.AlarmRequest_ACTIVATE,
2020 Alarm: pb.AlarmType_NOSPACE,
2021 }
2022 s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
2023 s.w.Trigger(id, ar)
2024 })
2025}
2026
2027func (s *EtcdServer) applyInternalRaftRequest(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *apply.Result {
2028 if r.ClusterVersionSet == nil && r.ClusterMemberAttrSet == nil && r.DowngradeInfoSet == nil && r.DowngradeVersionTest == nil {
2029 if !shouldApplyV3 {
2030 return nil
2031 }
2032 return s.uberApply.Apply(r)
2033 }
2034 membershipApplier := apply.NewApplierMembership(s.lg, s.cluster, s)
2035 op := "unknown"
2036 defer func(start time.Time) {
2037 txn.ApplySecObserve("v3", op, true, time.Since(start))
2038 txn.WarnOfExpensiveRequest(s.lg, s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, nil, nil)
2039 }(time.Now())
2040 switch {
2041 case r.ClusterVersionSet != nil:
2042 op = "ClusterVersionSet" // Implemented in 3.5.x
2043 membershipApplier.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
2044 return &apply.Result{}
2045 case r.ClusterMemberAttrSet != nil:
2046 op = "ClusterMemberAttrSet" // Implemented in 3.5.x
2047 membershipApplier.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
2048 case r.DowngradeInfoSet != nil:
2049 op = "DowngradeInfoSet" // Implemented in 3.5.x
2050 membershipApplier.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
2051 case r.DowngradeVersionTest != nil:
2052 op = "DowngradeVersionTest" // Implemented in 3.6 for test only
2053 // do nothing, we are just to ensure etcdserver don't panic in case
2054 // users(test cases) intentionally inject DowngradeVersionTestRequest
2055 // into the WAL files.
2056 default:
2057 s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
2058 return nil
2059 }
2060 return &apply.Result{}
2061}
2062
2063func noSideEffect(r *pb.InternalRaftRequest) bool {
2064 return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil
2065}
2066
2067func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
2068 f := func(ops []*pb.RequestOp) []*pb.RequestOp {
2069 j := 0
2070 for i := 0; i < len(ops); i++ {
2071 if _, ok := ops[i].Request.(*pb.RequestOp_RequestRange); ok {
2072 continue
2073 }
2074 ops[j] = ops[i]
2075 j++
2076 }
2077
2078 return ops[:j]
2079 }
2080
2081 txn.Success = f(txn.Success)
2082 txn.Failure = f(txn.Failure)
2083}
2084
2085// applyConfChange applies a ConfChange to the server. It is only
2086// invoked with a ConfChange that has already passed through Raft
2087func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
2088 lg := s.Logger()
2089 if err := s.cluster.ValidateConfigurationChange(cc, shouldApplyV3); err != nil {
2090 lg.Error("Validation on configuration change failed", zap.Bool("shouldApplyV3", bool(shouldApplyV3)), zap.Error(err))
2091 cc.NodeID = raft.None
2092 s.r.ApplyConfChange(cc)
2093
2094 // The txPostLock callback will not get called in this case,
2095 // so we should set the consistent index directly.
2096 if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
2097 applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
2098 s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
2099 }
2100 return false, err
2101 }
2102
2103 *confState = *s.r.ApplyConfChange(cc)
2104 s.beHooks.SetConfState(confState)
2105 switch cc.Type {
2106 case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
2107 confChangeContext := new(membership.ConfigChangeContext)
2108 if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
2109 lg.Panic("failed to unmarshal member", zap.Error(err))
2110 }
2111 if cc.NodeID != uint64(confChangeContext.Member.ID) {
2112 lg.Panic(
2113 "got different member ID",
2114 zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
2115 zap.String("member-id-from-message", confChangeContext.Member.ID.String()),
2116 )
2117 }
2118 if confChangeContext.IsPromote {
2119 s.cluster.PromoteMember(confChangeContext.Member.ID, shouldApplyV3)
2120 } else {
2121 s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3)
2122
2123 if confChangeContext.Member.ID != s.MemberID() {
2124 s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
2125 }
2126 }
2127
2128 case raftpb.ConfChangeRemoveNode:
2129 id := types.ID(cc.NodeID)
2130 s.cluster.RemoveMember(id, shouldApplyV3)
2131 if id == s.MemberID() {
2132 return true, nil
2133 }
2134 s.r.transport.RemovePeer(id)
2135
2136 case raftpb.ConfChangeUpdateNode:
2137 m := new(membership.Member)
2138 if err := json.Unmarshal(cc.Context, m); err != nil {
2139 lg.Panic("failed to unmarshal member", zap.Error(err))
2140 }
2141 if cc.NodeID != uint64(m.ID) {
2142 lg.Panic(
2143 "got different member ID",
2144 zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
2145 zap.String("member-id-from-message", m.ID.String()),
2146 )
2147 }
2148 s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3)
2149 if m.ID != s.MemberID() {
2150 s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
2151 }
2152 }
2153
2154 verify.Verify(func() {
2155 s.verifyV3StoreInSyncWithV2Store(shouldApplyV3)
2156 })
2157
2158 return false, nil
2159}
2160
2161func (s *EtcdServer) verifyV3StoreInSyncWithV2Store(shouldApplyV3 membership.ShouldApplyV3) {
2162 // If shouldApplyV3 == false, then it means v2store hasn't caught up with v3store.
2163 if !shouldApplyV3 {
2164 return
2165 }
2166
2167 // clean up the Attributes, and we only care about the RaftAttributes
2168 cleanAttributesFunc := func(members map[types.ID]*membership.Member) map[types.ID]*membership.Member {
2169 processedMembers := make(map[types.ID]*membership.Member)
2170 for id, m := range members {
2171 clonedMember := m.Clone()
2172 clonedMember.Attributes = membership.Attributes{}
2173 processedMembers[id] = clonedMember
2174 }
2175
2176 return processedMembers
2177 }
2178
2179 v2Members, _ := s.cluster.MembersFromStore()
2180 v3Members, _ := s.cluster.MembersFromBackend()
2181
2182 processedV2Members := cleanAttributesFunc(v2Members)
2183 processedV3Members := cleanAttributesFunc(v3Members)
2184
2185 if match := reflect.DeepEqual(processedV2Members, processedV3Members); !match {
2186 v2Data, v2Err := json.Marshal(processedV2Members)
2187 v3Data, v3Err := json.Marshal(processedV3Members)
2188
2189 if v2Err != nil || v3Err != nil {
2190 panic("members in v2store doesn't match v3store")
2191 }
2192 panic(fmt.Sprintf("members in v2store doesn't match v3store, v2store: %s, v3store: %s", string(v2Data), string(v3Data)))
2193 }
2194}
2195
2196// TODO: non-blocking snapshot
2197func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) {
2198 lg := s.Logger()
2199 d := GetMembershipInfoInV2Format(lg, s.cluster)
2200 if toDisk {
2201 s.Logger().Info(
2202 "triggering snapshot",
2203 zap.String("local-member-id", s.MemberID().String()),
2204 zap.Uint64("local-member-applied-index", ep.appliedi),
2205 zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
2206 zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
2207 zap.Bool("snapshot-forced", s.forceDiskSnapshot),
2208 )
2209 s.forceDiskSnapshot = false
2210 // commit kv to write metadata (for example: consistent index) to disk.
2211 //
2212 // This guarantees that Backend's consistent_index is >= index of last snapshot.
2213 //
2214 // KV().commit() updates the consistent index in backend.
2215 // All operations that update consistent index must be called sequentially
2216 // from applyAll function.
2217 // So KV().Commit() cannot run in parallel with toApply. It has to be called outside
2218 // the go routine created below.
2219 s.KV().Commit()
2220 }
2221
2222 // For backward compatibility, generate v2 snapshot from v3 state.
2223 snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d)
2224 if err != nil {
2225 // the snapshot was done asynchronously with the progress of raft.
2226 // raft might have already got a newer snapshot.
2227 if errorspkg.Is(err, raft.ErrSnapOutOfDate) {
2228 return
2229 }
2230 lg.Panic("failed to create snapshot", zap.Error(err))
2231 }
2232 ep.memorySnapshotIndex = ep.appliedi
2233
2234 verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
2235
2236 if toDisk {
2237 // SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
2238 if err = s.r.storage.SaveSnap(snap); err != nil {
2239 lg.Panic("failed to save snapshot", zap.Error(err))
2240 }
2241 ep.diskSnapshotIndex = ep.appliedi
2242 if err = s.r.storage.Release(snap); err != nil {
2243 lg.Panic("failed to release wal", zap.Error(err))
2244 }
2245
2246 lg.Info(
2247 "saved snapshot to disk",
2248 zap.Uint64("snapshot-index", snap.Metadata.Index),
2249 )
2250 }
2251}
2252
2253func (s *EtcdServer) compactRaftLog(snapi uint64) {
2254 lg := s.Logger()
2255
2256 // When sending a snapshot, etcd will pause compaction.
2257 // After receives a snapshot, the slow follower needs to get all the entries right after
2258 // the snapshot sent to catch up. If we do not pause compaction, the log entries right after
2259 // the snapshot sent might already be compacted. It happens when the snapshot takes long time
2260 // to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
2261 if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
2262 lg.Info("skip compaction since there is an inflight snapshot")
2263 return
2264 }
2265
2266 // keep some in memory log entries for slow followers.
2267 compacti := uint64(1)
2268 if snapi > s.Cfg.SnapshotCatchUpEntries {
2269 compacti = snapi - s.Cfg.SnapshotCatchUpEntries
2270 }
2271 err := s.r.raftStorage.Compact(compacti)
2272 if err != nil {
2273 // the compaction was done asynchronously with the progress of raft.
2274 // raft log might already been compact.
2275 if errorspkg.Is(err, raft.ErrCompacted) {
2276 return
2277 }
2278 lg.Panic("failed to compact", zap.Error(err))
2279 }
2280 lg.Debug(
2281 "compacted Raft logs",
2282 zap.Uint64("compact-index", compacti),
2283 )
2284}
2285
2286// CutPeer drops messages to the specified peer.
2287func (s *EtcdServer) CutPeer(id types.ID) {
2288 tr, ok := s.r.transport.(*rafthttp.Transport)
2289 if ok {
2290 tr.CutPeer(id)
2291 }
2292}
2293
2294// MendPeer recovers the message dropping behavior of the given peer.
2295func (s *EtcdServer) MendPeer(id types.ID) {
2296 tr, ok := s.r.transport.(*rafthttp.Transport)
2297 if ok {
2298 tr.MendPeer(id)
2299 }
2300}
2301
2302func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
2303
2304func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
2305
2306func (s *EtcdServer) ClusterVersion() *semver.Version {
2307 if s.cluster == nil {
2308 return nil
2309 }
2310 return s.cluster.Version()
2311}
2312
2313func (s *EtcdServer) StorageVersion() *semver.Version {
2314 // `applySnapshot` sets a new backend instance, so we need to acquire the bemu lock.
2315 s.bemu.RLock()
2316 defer s.bemu.RUnlock()
2317
2318 v, err := schema.DetectSchemaVersion(s.lg, s.be.ReadTx())
2319 if err != nil {
2320 s.lg.Warn("Failed to detect schema version", zap.Error(err))
2321 return nil
2322 }
2323 return &v
2324}
2325
2326// monitorClusterVersions every monitorVersionInterval checks if it's the leader and updates cluster version if needed.
2327func (s *EtcdServer) monitorClusterVersions() {
2328 lg := s.Logger()
2329 monitor := serverversion.NewMonitor(lg, NewServerVersionAdapter(s))
2330 for {
2331 select {
2332 case <-s.firstCommitInTerm.Receive():
2333 case <-time.After(monitorVersionInterval):
2334 case <-s.stopping:
2335 lg.Info("server has stopped; stopping cluster version's monitor")
2336 return
2337 }
2338
2339 if s.Leader() != s.MemberID() {
2340 continue
2341 }
2342 err := monitor.UpdateClusterVersionIfNeeded()
2343 if err != nil {
2344 s.lg.Error("Failed to monitor cluster version", zap.Error(err))
2345 }
2346 }
2347}
2348
2349// monitorStorageVersion every monitorVersionInterval updates storage version if needed.
2350func (s *EtcdServer) monitorStorageVersion() {
2351 lg := s.Logger()
2352 monitor := serverversion.NewMonitor(lg, NewServerVersionAdapter(s))
2353 for {
2354 select {
2355 case <-time.After(monitorVersionInterval):
2356 case <-s.clusterVersionChanged.Receive():
2357 case <-s.stopping:
2358 lg.Info("server has stopped; stopping storage version's monitor")
2359 return
2360 }
2361 monitor.UpdateStorageVersionIfNeeded()
2362 }
2363}
2364
2365func (s *EtcdServer) monitorKVHash() {
2366 t := s.Cfg.CorruptCheckTime
2367 if t == 0 {
2368 return
2369 }
2370 checkTicker := time.NewTicker(t)
2371 defer checkTicker.Stop()
2372
2373 lg := s.Logger()
2374 lg.Info(
2375 "enabled corruption checking",
2376 zap.String("local-member-id", s.MemberID().String()),
2377 zap.Duration("interval", t),
2378 )
2379 for {
2380 select {
2381 case <-s.stopping:
2382 lg.Info("server has stopped; stopping kv hash's monitor")
2383 return
2384 case <-checkTicker.C:
2385 }
2386 backend.VerifyBackendConsistency(s.be, lg, false, schema.AllBuckets...)
2387 if !s.isLeader() {
2388 continue
2389 }
2390 if err := s.corruptionChecker.PeriodicCheck(); err != nil {
2391 lg.Warn("failed to check hash KV", zap.Error(err))
2392 }
2393 }
2394}
2395
2396func (s *EtcdServer) monitorCompactHash() {
2397 if !s.FeatureEnabled(features.CompactHashCheck) {
2398 return
2399 }
2400 t := s.Cfg.CompactHashCheckTime
2401 for {
2402 select {
2403 case <-time.After(t):
2404 case <-s.stopping:
2405 lg := s.Logger()
2406 lg.Info("server has stopped; stopping compact hash's monitor")
2407 return
2408 }
2409 if !s.isLeader() {
2410 continue
2411 }
2412 s.corruptionChecker.CompactHashCheck()
2413 }
2414}
2415
2416func (s *EtcdServer) updateClusterVersionV3(ver string) {
2417 lg := s.Logger()
2418
2419 if s.cluster.Version() == nil {
2420 lg.Info(
2421 "setting up initial cluster version using v3 API",
2422 zap.String("cluster-version", version.Cluster(ver)),
2423 )
2424 } else {
2425 lg.Info(
2426 "updating cluster version using v3 API",
2427 zap.String("from", version.Cluster(s.cluster.Version().String())),
2428 zap.String("to", version.Cluster(ver)),
2429 )
2430 }
2431
2432 req := membershippb.ClusterVersionSetRequest{Ver: ver}
2433
2434 ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
2435 _, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterVersionSet: &req})
2436 cancel()
2437
2438 switch {
2439 case errorspkg.Is(err, nil):
2440 lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
2441 return
2442
2443 case errorspkg.Is(err, errors.ErrStopped):
2444 lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
2445 return
2446
2447 default:
2448 lg.Warn("failed to update cluster version", zap.Error(err))
2449 }
2450}
2451
2452// monitorDowngrade every DowngradeCheckTime checks if it's the leader and cancels downgrade if needed.
2453func (s *EtcdServer) monitorDowngrade() {
2454 monitor := serverversion.NewMonitor(s.Logger(), NewServerVersionAdapter(s))
2455 t := s.Cfg.DowngradeCheckTime
2456 if t == 0 {
2457 return
2458 }
2459 for {
2460 select {
2461 case <-time.After(t):
2462 case <-s.stopping:
2463 return
2464 }
2465
2466 if !s.isLeader() {
2467 continue
2468 }
2469 monitor.CancelDowngradeIfNeeded()
2470 }
2471}
2472
2473func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
2474 switch {
2475 case errorspkg.Is(err, context.Canceled):
2476 return errors.ErrCanceled
2477
2478 case errorspkg.Is(err, context.DeadlineExceeded):
2479 s.leadTimeMu.RLock()
2480 curLeadElected := s.leadElectedTime
2481 s.leadTimeMu.RUnlock()
2482 prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
2483 if start.After(prevLeadLost) && start.Before(curLeadElected) {
2484 return errors.ErrTimeoutDueToLeaderFail
2485 }
2486 lead := types.ID(s.getLead())
2487 switch lead {
2488 case types.ID(raft.None):
2489 // TODO: return error to specify it happens because the cluster does not have leader now
2490 case s.MemberID():
2491 if !isConnectedToQuorumSince(s.r.transport, start, s.MemberID(), s.cluster.Members()) {
2492 return errors.ErrTimeoutDueToConnectionLost
2493 }
2494 default:
2495 if !isConnectedSince(s.r.transport, start, lead) {
2496 return errors.ErrTimeoutDueToConnectionLost
2497 }
2498 }
2499 return errors.ErrTimeout
2500
2501 default:
2502 return err
2503 }
2504}
2505
2506func (s *EtcdServer) KV() mvcc.WatchableKV { return s.kv }
2507func (s *EtcdServer) Backend() backend.Backend {
2508 s.bemu.RLock()
2509 defer s.bemu.RUnlock()
2510 return s.be
2511}
2512
2513func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
2514
2515func (s *EtcdServer) restoreAlarms() error {
2516 as, err := v3alarm.NewAlarmStore(s.lg, schema.NewAlarmBackend(s.lg, s.be))
2517 if err != nil {
2518 return err
2519 }
2520 s.alarmStore = as
2521 return nil
2522}
2523
2524// GoAttach creates a goroutine on a given function and tracks it using
2525// the etcdserver waitgroup.
2526// The passed function should interrupt on s.StoppingNotify().
2527func (s *EtcdServer) GoAttach(f func()) {
2528 s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
2529 defer s.wgMu.RUnlock()
2530 select {
2531 case <-s.stopping:
2532 lg := s.Logger()
2533 lg.Warn("server has stopped; skipping GoAttach")
2534 return
2535 default:
2536 }
2537
2538 // now safe to add since waitgroup wait has not started yet
2539 s.wg.Add(1)
2540 go func() {
2541 defer s.wg.Done()
2542 f()
2543 }()
2544}
2545
2546func (s *EtcdServer) Alarms() []*pb.AlarmMember {
2547 return s.alarmStore.Get(pb.AlarmType_NONE)
2548}
2549
2550// IsLearner returns if the local member is raft learner
2551func (s *EtcdServer) IsLearner() bool {
2552 return s.cluster.IsLocalMemberLearner()
2553}
2554
2555// IsMemberExist returns if the member with the given id exists in cluster.
2556func (s *EtcdServer) IsMemberExist(id types.ID) bool {
2557 return s.cluster.IsMemberExist(id)
2558}
2559
2560// raftStatus returns the raft status of this etcd node.
2561func (s *EtcdServer) raftStatus() raft.Status {
2562 return s.r.Node.Status()
2563}
2564
2565func (s *EtcdServer) Version() *serverversion.Manager {
2566 return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s))
2567}
2568
2569func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
2570 return func() {
2571 applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
2572 if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
2573 s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
2574 }
2575 }
2576}
2577
2578func (s *EtcdServer) CorruptionChecker() CorruptionChecker {
2579 return s.corruptionChecker
2580}
2581
2582func addFeatureGateMetrics(fg featuregate.FeatureGate, guageVec *prometheus.GaugeVec) {
2583 for feature, featureSpec := range fg.(featuregate.MutableFeatureGate).GetAll() {
2584 var metricVal float64
2585 if fg.Enabled(feature) {
2586 metricVal = 1
2587 } else {
2588 metricVal = 0
2589 }
2590 guageVec.With(prometheus.Labels{"name": string(feature), "stage": string(featureSpec.PreRelease)}).Set(metricVal)
2591 }
2592}