[VOL-5486] Fix deprecated versions
Change-Id: I3e03ea246020547ae75fa92ce8cf5cbba7e8f3bb
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/go.etcd.io/etcd/server/v3/etcdserver/server.go b/vendor/go.etcd.io/etcd/server/v3/etcdserver/server.go
new file mode 100644
index 0000000..0eb16b7
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/server/v3/etcdserver/server.go
@@ -0,0 +1,2592 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdserver
+
+import (
+ "context"
+ "encoding/json"
+ errorspkg "errors"
+ "expvar"
+ "fmt"
+ "math"
+ "net/http"
+ "path"
+ "reflect"
+ "regexp"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/coreos/go-semver/semver"
+ humanize "github.com/dustin/go-humanize"
+ "github.com/prometheus/client_golang/prometheus"
+ "go.uber.org/zap"
+
+ pb "go.etcd.io/etcd/api/v3/etcdserverpb"
+ "go.etcd.io/etcd/api/v3/membershippb"
+ "go.etcd.io/etcd/api/v3/version"
+ "go.etcd.io/etcd/client/pkg/v3/fileutil"
+ "go.etcd.io/etcd/client/pkg/v3/types"
+ "go.etcd.io/etcd/client/pkg/v3/verify"
+ "go.etcd.io/etcd/pkg/v3/featuregate"
+ "go.etcd.io/etcd/pkg/v3/idutil"
+ "go.etcd.io/etcd/pkg/v3/notify"
+ "go.etcd.io/etcd/pkg/v3/pbutil"
+ "go.etcd.io/etcd/pkg/v3/runtime"
+ "go.etcd.io/etcd/pkg/v3/schedule"
+ "go.etcd.io/etcd/pkg/v3/traceutil"
+ "go.etcd.io/etcd/pkg/v3/wait"
+ "go.etcd.io/etcd/server/v3/auth"
+ "go.etcd.io/etcd/server/v3/config"
+ "go.etcd.io/etcd/server/v3/etcdserver/api"
+ httptypes "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp/types"
+ "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
+ "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
+ "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
+ stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
+ "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
+ "go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
+ "go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor"
+ "go.etcd.io/etcd/server/v3/etcdserver/apply"
+ "go.etcd.io/etcd/server/v3/etcdserver/cindex"
+ "go.etcd.io/etcd/server/v3/etcdserver/errors"
+ "go.etcd.io/etcd/server/v3/etcdserver/txn"
+ serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
+ "go.etcd.io/etcd/server/v3/features"
+ "go.etcd.io/etcd/server/v3/lease"
+ "go.etcd.io/etcd/server/v3/lease/leasehttp"
+ serverstorage "go.etcd.io/etcd/server/v3/storage"
+ "go.etcd.io/etcd/server/v3/storage/backend"
+ "go.etcd.io/etcd/server/v3/storage/mvcc"
+ "go.etcd.io/etcd/server/v3/storage/schema"
+ "go.etcd.io/raft/v3"
+ "go.etcd.io/raft/v3/raftpb"
+)
+
+const (
+ DefaultSnapshotCount = 10000
+
+ // DefaultSnapshotCatchUpEntries is the number of entries for a slow follower
+ // to catch-up after compacting the raft storage entries.
+ // We expect the follower has a millisecond level latency with the leader.
+ // The max throughput is around 10K. Keep a 5K entries is enough for helping
+ // follower to catch up.
+ DefaultSnapshotCatchUpEntries uint64 = 5000
+
+ StoreClusterPrefix = "/0"
+ StoreKeysPrefix = "/1"
+
+ // HealthInterval is the minimum time the cluster should be healthy
+ // before accepting add and delete member requests.
+ HealthInterval = 5 * time.Second
+
+ purgeFileInterval = 30 * time.Second
+
+ // max number of in-flight snapshot messages etcdserver allows to have
+ // This number is more than enough for most clusters with 5 machines.
+ maxInFlightMsgSnap = 16
+
+ releaseDelayAfterSnapshot = 30 * time.Second
+
+ // maxPendingRevokes is the maximum number of outstanding expired lease revocations.
+ maxPendingRevokes = 16
+
+ recommendedMaxRequestBytes = 10 * 1024 * 1024
+
+ // readyPercentThreshold is a threshold used to determine
+ // whether a learner is ready for a transition into a full voting member or not.
+ readyPercentThreshold = 0.9
+
+ DowngradeEnabledPath = "/downgrade/enabled"
+ memorySnapshotCount = 100
+)
+
+var (
+ // monitorVersionInterval should be smaller than the timeout
+ // on the connection. Or we will not be able to reuse the connection
+ // (since it will timeout).
+ monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
+
+ recommendedMaxRequestBytesString = humanize.Bytes(uint64(recommendedMaxRequestBytes))
+ storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
+)
+
+func init() {
+ expvar.Publish(
+ "file_descriptor_limit",
+ expvar.Func(
+ func() any {
+ n, _ := runtime.FDLimit()
+ return n
+ },
+ ),
+ )
+}
+
+type Response struct {
+ Term uint64
+ Index uint64
+ Event *v2store.Event
+ Watcher v2store.Watcher
+ Err error
+}
+
+type ServerV2 interface {
+ Server
+ Leader() types.ID
+
+ ClientCertAuthEnabled() bool
+}
+
+type ServerV3 interface {
+ Server
+ apply.RaftStatusGetter
+}
+
+func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled }
+
+type Server interface {
+ // AddMember attempts to add a member into the cluster. It will return
+ // ErrIDRemoved if member ID is removed from the cluster, or return
+ // ErrIDExists if member ID exists in the cluster.
+ AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
+ // RemoveMember attempts to remove a member from the cluster. It will
+ // return ErrIDRemoved if member ID is removed from the cluster, or return
+ // ErrIDNotFound if member ID is not in the cluster.
+ RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
+ // UpdateMember attempts to update an existing member in the cluster. It will
+ // return ErrIDNotFound if the member ID does not exist.
+ UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
+ // PromoteMember attempts to promote a non-voting node to a voting node. It will
+ // return ErrIDNotFound if the member ID does not exist.
+ // return ErrLearnerNotReady if the member are not ready.
+ // return ErrMemberNotLearner if the member is not a learner.
+ PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
+
+ // ClusterVersion is the cluster-wide minimum major.minor version.
+ // Cluster version is set to the min version that an etcd member is
+ // compatible with when first bootstrap.
+ //
+ // ClusterVersion is nil until the cluster is bootstrapped (has a quorum).
+ //
+ // During a rolling upgrades, the ClusterVersion will be updated
+ // automatically after a sync. (5 second by default)
+ //
+ // The API/raft component can utilize ClusterVersion to determine if
+ // it can accept a client request or a raft RPC.
+ // NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and
+ // the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since
+ // this feature is introduced post 2.0.
+ ClusterVersion() *semver.Version
+ // StorageVersion is the storage schema version. It's supported starting
+ // from 3.6.
+ StorageVersion() *semver.Version
+ Cluster() api.Cluster
+ Alarms() []*pb.AlarmMember
+
+ // LeaderChangedNotify returns a channel for application level code to be notified
+ // when etcd leader changes, this function is intend to be used only in application
+ // which embed etcd.
+ // Caution:
+ // 1. the returned channel is being closed when the leadership changes.
+ // 2. so the new channel needs to be obtained for each raft term.
+ // 3. user can loose some consecutive channel changes using this API.
+ LeaderChangedNotify() <-chan struct{}
+}
+
+// EtcdServer is the production implementation of the Server interface
+type EtcdServer struct {
+ // inflightSnapshots holds count the number of snapshots currently inflight.
+ inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned.
+ appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
+ committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
+ term uint64 // must use atomic operations to access; keep 64-bit aligned.
+ lead uint64 // must use atomic operations to access; keep 64-bit aligned.
+
+ consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
+ r raftNode // uses 64-bit atomics; keep 64-bit aligned.
+
+ readych chan struct{}
+ Cfg config.ServerConfig
+
+ lgMu *sync.RWMutex
+ lg *zap.Logger
+
+ w wait.Wait
+
+ readMu sync.RWMutex
+ // read routine notifies etcd server that it waits for reading by sending an empty struct to
+ // readwaitC
+ readwaitc chan struct{}
+ // readNotifier is used to notify the read routine that it can process the request
+ // when there is no error
+ readNotifier *notifier
+
+ // stop signals the run goroutine should shutdown.
+ stop chan struct{}
+ // stopping is closed by run goroutine on shutdown.
+ stopping chan struct{}
+ // done is closed when all goroutines from start() complete.
+ done chan struct{}
+ // leaderChanged is used to notify the linearizable read loop to drop the old read requests.
+ leaderChanged *notify.Notifier
+
+ errorc chan error
+ memberID types.ID
+ attributes membership.Attributes
+
+ cluster *membership.RaftCluster
+
+ v2store v2store.Store
+ snapshotter *snap.Snapshotter
+
+ uberApply apply.UberApplier
+
+ applyWait wait.WaitTime
+
+ kv mvcc.WatchableKV
+ lessor lease.Lessor
+ bemu sync.RWMutex
+ be backend.Backend
+ beHooks *serverstorage.BackendHooks
+ authStore auth.AuthStore
+ alarmStore *v3alarm.AlarmStore
+
+ stats *stats.ServerStats
+ lstats *stats.LeaderStats
+
+ SyncTicker *time.Ticker
+ // compactor is used to auto-compact the KV.
+ compactor v3compactor.Compactor
+
+ // peerRt used to send requests (version, lease) to peers.
+ peerRt http.RoundTripper
+ reqIDGen *idutil.Generator
+
+ // wgMu blocks concurrent waitgroup mutation while server stopping
+ wgMu sync.RWMutex
+ // wg is used to wait for the goroutines that depends on the server state
+ // to exit when stopping the server.
+ wg sync.WaitGroup
+
+ // ctx is used for etcd-initiated requests that may need to be canceled
+ // on etcd server shutdown.
+ ctx context.Context
+ cancel context.CancelFunc
+
+ leadTimeMu sync.RWMutex
+ leadElectedTime time.Time
+
+ firstCommitInTerm *notify.Notifier
+ clusterVersionChanged *notify.Notifier
+
+ *AccessController
+ // forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
+ // Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
+ // TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
+ forceDiskSnapshot bool
+ corruptionChecker CorruptionChecker
+}
+
+// NewServer creates a new EtcdServer from the supplied configuration. The
+// configuration is considered static for the lifetime of the EtcdServer.
+func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
+ b, err := bootstrap(cfg)
+ if err != nil {
+ cfg.Logger.Error("bootstrap failed", zap.Error(err))
+ return nil, err
+ }
+ cfg.Logger.Info("bootstrap successfully")
+
+ defer func() {
+ if err != nil {
+ b.Close()
+ }
+ }()
+
+ sstats := stats.NewServerStats(cfg.Name, b.cluster.cl.String())
+ lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String())
+
+ heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
+ srv = &EtcdServer{
+ readych: make(chan struct{}),
+ Cfg: cfg,
+ lgMu: new(sync.RWMutex),
+ lg: cfg.Logger,
+ errorc: make(chan error, 1),
+ v2store: b.storage.st,
+ snapshotter: b.ss,
+ r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
+ memberID: b.cluster.nodeID,
+ attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
+ cluster: b.cluster.cl,
+ stats: sstats,
+ lstats: lstats,
+ SyncTicker: time.NewTicker(500 * time.Millisecond),
+ peerRt: b.prt,
+ reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),
+ AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
+ consistIndex: b.storage.backend.ci,
+ firstCommitInTerm: notify.NewNotifier(),
+ clusterVersionChanged: notify.NewNotifier(),
+ }
+
+ addFeatureGateMetrics(cfg.ServerFeatureGate, serverFeatureEnabled)
+ serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1)
+ srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
+
+ srv.be = b.storage.backend.be
+ srv.beHooks = b.storage.backend.beHooks
+ minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
+
+ // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
+ // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
+ srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
+ MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
+ CheckpointInterval: cfg.LeaseCheckpointInterval,
+ CheckpointPersist: cfg.ServerFeatureGate.Enabled(features.LeaseCheckpointPersist),
+ ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
+ })
+
+ tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
+ func(index uint64) <-chan struct{} {
+ return srv.applyWait.Wait(index)
+ },
+ time.Duration(cfg.TokenTTL)*time.Second,
+ )
+ if err != nil {
+ cfg.Logger.Warn("failed to create token provider", zap.Error(err))
+ return nil, err
+ }
+
+ mvccStoreConfig := mvcc.StoreConfig{
+ CompactionBatchLimit: cfg.CompactionBatchLimit,
+ CompactionSleepInterval: cfg.CompactionSleepInterval,
+ }
+ srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
+ srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())
+
+ srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))
+
+ newSrv := srv // since srv == nil in defer if srv is returned as nil
+ defer func() {
+ // closing backend without first closing kv can cause
+ // resumed compactions to fail with closed tx errors
+ if err != nil {
+ newSrv.kv.Close()
+ }
+ }()
+ if num := cfg.AutoCompactionRetention; num != 0 {
+ srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
+ if err != nil {
+ return nil, err
+ }
+ srv.compactor.Run()
+ }
+
+ if err = srv.restoreAlarms(); err != nil {
+ return nil, err
+ }
+ srv.uberApply = srv.NewUberApplier()
+
+ if srv.FeatureEnabled(features.LeaseCheckpoint) {
+ // setting checkpointer enables lease checkpoint feature.
+ srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
+ if !srv.ensureLeadership() {
+ srv.lg.Warn("Ignore the checkpoint request because current member isn't a leader",
+ zap.Uint64("local-member-id", uint64(srv.MemberID())))
+ return lease.ErrNotPrimary
+ }
+
+ srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
+ return nil
+ })
+ }
+
+ // Set the hook after EtcdServer finishes the initialization to avoid
+ // the hook being called during the initialization process.
+ srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
+
+ // TODO: move transport initialization near the definition of remote
+ tr := &rafthttp.Transport{
+ Logger: cfg.Logger,
+ TLSInfo: cfg.PeerTLSInfo,
+ DialTimeout: cfg.PeerDialTimeout(),
+ ID: b.cluster.nodeID,
+ URLs: cfg.PeerURLs,
+ ClusterID: b.cluster.cl.ID(),
+ Raft: srv,
+ Snapshotter: b.ss,
+ ServerStats: sstats,
+ LeaderStats: lstats,
+ ErrorC: srv.errorc,
+ }
+ if err = tr.Start(); err != nil {
+ return nil, err
+ }
+ // add all remotes into transport
+ for _, m := range b.cluster.remotes {
+ if m.ID != b.cluster.nodeID {
+ tr.AddRemote(m.ID, m.PeerURLs)
+ }
+ }
+ for _, m := range b.cluster.cl.Members() {
+ if m.ID != b.cluster.nodeID {
+ tr.AddPeer(m.ID, m.PeerURLs)
+ }
+ }
+ srv.r.transport = tr
+
+ return srv, nil
+}
+
+func (s *EtcdServer) Logger() *zap.Logger {
+ s.lgMu.RLock()
+ l := s.lg
+ s.lgMu.RUnlock()
+ return l
+}
+
+func (s *EtcdServer) Config() config.ServerConfig {
+ return s.Cfg
+}
+
+// FeatureEnabled returns true if the feature is enabled by the etcd server, false otherwise.
+func (s *EtcdServer) FeatureEnabled(f featuregate.Feature) bool {
+ return s.Cfg.ServerFeatureGate.Enabled(f)
+}
+
+func tickToDur(ticks int, tickMs uint) string {
+ return fmt.Sprintf("%v", time.Duration(ticks)*time.Duration(tickMs)*time.Millisecond)
+}
+
+func (s *EtcdServer) adjustTicks() {
+ lg := s.Logger()
+ clusterN := len(s.cluster.Members())
+
+ // single-node fresh start, or single-node recovers from snapshot
+ if clusterN == 1 {
+ ticks := s.Cfg.ElectionTicks - 1
+ lg.Info(
+ "started as single-node; fast-forwarding election ticks",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.Int("forward-ticks", ticks),
+ zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
+ zap.Int("election-ticks", s.Cfg.ElectionTicks),
+ zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)),
+ )
+ s.r.advanceTicks(ticks)
+ return
+ }
+
+ if !s.Cfg.InitialElectionTickAdvance {
+ lg.Info("skipping initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks))
+ return
+ }
+ lg.Info("starting initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks))
+
+ // retry up to "rafthttp.ConnReadTimeout", which is 5-sec
+ // until peer connection reports; otherwise:
+ // 1. all connections failed, or
+ // 2. no active peers, or
+ // 3. restarted single-node with no snapshot
+ // then, do nothing, because advancing ticks would have no effect
+ waitTime := rafthttp.ConnReadTimeout
+ itv := 50 * time.Millisecond
+ for i := int64(0); i < int64(waitTime/itv); i++ {
+ select {
+ case <-time.After(itv):
+ case <-s.stopping:
+ return
+ }
+
+ peerN := s.r.transport.ActivePeers()
+ if peerN > 1 {
+ // multi-node received peer connection reports
+ // adjust ticks, in case slow leader message receive
+ ticks := s.Cfg.ElectionTicks - 2
+
+ lg.Info(
+ "initialized peer connections; fast-forwarding election ticks",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.Int("forward-ticks", ticks),
+ zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
+ zap.Int("election-ticks", s.Cfg.ElectionTicks),
+ zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)),
+ zap.Int("active-remote-members", peerN),
+ )
+
+ s.r.advanceTicks(ticks)
+ return
+ }
+ }
+}
+
+// Start performs any initialization of the Server necessary for it to
+// begin serving requests. It must be called before Do or Process.
+// Start must be non-blocking; any long-running server functionality
+// should be implemented in goroutines.
+func (s *EtcdServer) Start() {
+ s.start()
+ s.GoAttach(func() { s.adjustTicks() })
+ s.GoAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
+ s.GoAttach(s.purgeFile)
+ s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
+ s.GoAttach(s.monitorClusterVersions)
+ s.GoAttach(s.monitorStorageVersion)
+ s.GoAttach(s.linearizableReadLoop)
+ s.GoAttach(s.monitorKVHash)
+ s.GoAttach(s.monitorCompactHash)
+ s.GoAttach(s.monitorDowngrade)
+}
+
+// start prepares and starts server in a new goroutine. It is no longer safe to
+// modify a server's fields after it has been sent to Start.
+// This function is just used for testing.
+func (s *EtcdServer) start() {
+ lg := s.Logger()
+
+ if s.Cfg.SnapshotCount == 0 {
+ lg.Info(
+ "updating snapshot-count to default",
+ zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount),
+ zap.Uint64("updated-snapshot-count", DefaultSnapshotCount),
+ )
+ s.Cfg.SnapshotCount = DefaultSnapshotCount
+ }
+ if s.Cfg.SnapshotCatchUpEntries == 0 {
+ lg.Info(
+ "updating snapshot catch-up entries to default",
+ zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries),
+ zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries),
+ )
+ s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
+ }
+
+ s.w = wait.New()
+ s.applyWait = wait.NewTimeList()
+ s.done = make(chan struct{})
+ s.stop = make(chan struct{})
+ s.stopping = make(chan struct{}, 1)
+ s.ctx, s.cancel = context.WithCancel(context.Background())
+ s.readwaitc = make(chan struct{}, 1)
+ s.readNotifier = newNotifier()
+ s.leaderChanged = notify.NewNotifier()
+ if s.ClusterVersion() != nil {
+ lg.Info(
+ "starting etcd server",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("local-server-version", version.Version),
+ zap.String("cluster-id", s.Cluster().ID().String()),
+ zap.String("cluster-version", version.Cluster(s.ClusterVersion().String())),
+ )
+ membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(s.ClusterVersion().String())}).Set(1)
+ } else {
+ lg.Info(
+ "starting etcd server",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("local-server-version", version.Version),
+ zap.String("cluster-version", "to_be_decided"),
+ )
+ }
+
+ // TODO: if this is an empty log, writes all peer infos
+ // into the first entry
+ go s.run()
+}
+
+func (s *EtcdServer) purgeFile() {
+ lg := s.Logger()
+ var dberrc, serrc, werrc <-chan error
+ var dbdonec, sdonec, wdonec <-chan struct{}
+ if s.Cfg.MaxSnapFiles > 0 {
+ dbdonec, dberrc = fileutil.PurgeFileWithoutFlock(lg, s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
+ sdonec, serrc = fileutil.PurgeFileWithoutFlock(lg, s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
+ }
+ if s.Cfg.MaxWALFiles > 0 {
+ wdonec, werrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping)
+ }
+
+ select {
+ case e := <-dberrc:
+ lg.Fatal("failed to purge snap db file", zap.Error(e))
+ case e := <-serrc:
+ lg.Fatal("failed to purge snap file", zap.Error(e))
+ case e := <-werrc:
+ lg.Fatal("failed to purge wal file", zap.Error(e))
+ case <-s.stopping:
+ if dbdonec != nil {
+ <-dbdonec
+ }
+ if sdonec != nil {
+ <-sdonec
+ }
+ if wdonec != nil {
+ <-wdonec
+ }
+ return
+ }
+}
+
+func (s *EtcdServer) Cluster() api.Cluster { return s.cluster }
+
+func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
+
+type ServerPeer interface {
+ ServerV2
+ RaftHandler() http.Handler
+ LeaseHandler() http.Handler
+}
+
+func (s *EtcdServer) LeaseHandler() http.Handler {
+ if s.lessor == nil {
+ return nil
+ }
+ return leasehttp.NewHandler(s.lessor, s.ApplyWait)
+}
+
+func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
+
+type ServerPeerV2 interface {
+ ServerPeer
+ HashKVHandler() http.Handler
+ DowngradeEnabledHandler() http.Handler
+}
+
+func (s *EtcdServer) DowngradeInfo() *serverversion.DowngradeInfo { return s.cluster.DowngradeInfo() }
+
+type downgradeEnabledHandler struct {
+ lg *zap.Logger
+ cluster api.Cluster
+ server *EtcdServer
+}
+
+func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
+ return &downgradeEnabledHandler{
+ lg: s.Logger(),
+ cluster: s.cluster,
+ server: s,
+ }
+}
+
+func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ w.Header().Set("Allow", http.MethodGet)
+ http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
+
+ if r.URL.Path != DowngradeEnabledPath {
+ http.Error(w, "bad path", http.StatusBadRequest)
+ return
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), h.server.Cfg.ReqTimeout())
+ defer cancel()
+
+ // serve with linearized downgrade info
+ if err := h.server.linearizableReadNotify(ctx); err != nil {
+ http.Error(w, fmt.Sprintf("failed linearized read: %v", err),
+ http.StatusInternalServerError)
+ return
+ }
+ enabled := h.server.DowngradeInfo().Enabled
+ w.Header().Set("Content-Type", "text/plain")
+ w.Write([]byte(strconv.FormatBool(enabled)))
+}
+
+// Process takes a raft message and applies it to the server's raft state
+// machine, respecting any timeout of the given context.
+func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
+ lg := s.Logger()
+ if s.cluster.IsIDRemoved(types.ID(m.From)) {
+ lg.Warn(
+ "rejected Raft message from removed member",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("removed-member-id", types.ID(m.From).String()),
+ )
+ return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
+ }
+ if s.MemberID() != types.ID(m.To) {
+ lg.Warn(
+ "rejected Raft message to mismatch member",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("mismatch-member-id", types.ID(m.To).String()),
+ )
+ return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message to mismatch member")
+ }
+ if m.Type == raftpb.MsgApp {
+ s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
+ }
+ return s.r.Step(ctx, m)
+}
+
+func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) }
+
+func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
+
+// ReportSnapshot reports snapshot sent status to the raft state machine,
+// and clears the used snapshot from the snapshot store.
+func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
+ s.r.ReportSnapshot(id, status)
+}
+
+type etcdProgress struct {
+ confState raftpb.ConfState
+ diskSnapshotIndex uint64
+ memorySnapshotIndex uint64
+ appliedt uint64
+ appliedi uint64
+}
+
+// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
+// and helps decouple state machine logic from Raft algorithms.
+// TODO: add a state machine interface to toApply the commit entries and do snapshot/recover
+type raftReadyHandler struct {
+ getLead func() (lead uint64)
+ updateLead func(lead uint64)
+ updateLeadership func(newLeader bool)
+ updateCommittedIndex func(uint64)
+}
+
+func (s *EtcdServer) run() {
+ lg := s.Logger()
+
+ sn, err := s.r.raftStorage.Snapshot()
+ if err != nil {
+ lg.Panic("failed to get snapshot from Raft storage", zap.Error(err))
+ }
+
+ // asynchronously accept toApply packets, dispatch progress in-order
+ sched := schedule.NewFIFOScheduler(lg)
+
+ rh := &raftReadyHandler{
+ getLead: func() (lead uint64) { return s.getLead() },
+ updateLead: func(lead uint64) { s.setLead(lead) },
+ updateLeadership: func(newLeader bool) {
+ if !s.isLeader() {
+ if s.lessor != nil {
+ s.lessor.Demote()
+ }
+ if s.compactor != nil {
+ s.compactor.Pause()
+ }
+ } else {
+ if newLeader {
+ t := time.Now()
+ s.leadTimeMu.Lock()
+ s.leadElectedTime = t
+ s.leadTimeMu.Unlock()
+ }
+ if s.compactor != nil {
+ s.compactor.Resume()
+ }
+ }
+ if newLeader {
+ s.leaderChanged.Notify()
+ }
+ // TODO: remove the nil checking
+ // current test utility does not provide the stats
+ if s.stats != nil {
+ s.stats.BecomeLeader()
+ }
+ },
+ updateCommittedIndex: func(ci uint64) {
+ cci := s.getCommittedIndex()
+ if ci > cci {
+ s.setCommittedIndex(ci)
+ }
+ },
+ }
+ s.r.start(rh)
+
+ ep := etcdProgress{
+ confState: sn.Metadata.ConfState,
+ diskSnapshotIndex: sn.Metadata.Index,
+ memorySnapshotIndex: sn.Metadata.Index,
+ appliedt: sn.Metadata.Term,
+ appliedi: sn.Metadata.Index,
+ }
+
+ defer func() {
+ s.wgMu.Lock() // block concurrent waitgroup adds in GoAttach while stopping
+ close(s.stopping)
+ s.wgMu.Unlock()
+ s.cancel()
+ sched.Stop()
+
+ // wait for goroutines before closing raft so wal stays open
+ s.wg.Wait()
+
+ s.SyncTicker.Stop()
+
+ // must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
+ // by adding a peer after raft stops the transport
+ s.r.stop()
+
+ s.Cleanup()
+
+ close(s.done)
+ }()
+
+ var expiredLeaseC <-chan []*lease.Lease
+ if s.lessor != nil {
+ expiredLeaseC = s.lessor.ExpiredLeasesC()
+ }
+
+ for {
+ select {
+ case ap := <-s.r.apply():
+ f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
+ sched.Schedule(f)
+ case leases := <-expiredLeaseC:
+ s.revokeExpiredLeases(leases)
+ case err := <-s.errorc:
+ lg.Warn("server error", zap.Error(err))
+ lg.Warn("data-dir used by this member must be removed")
+ return
+ case <-s.stop:
+ return
+ }
+ }
+}
+
+func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
+ s.GoAttach(func() {
+ // We shouldn't revoke any leases if current member isn't a leader,
+ // because the operation should only be performed by the leader. When
+ // the leader gets blocked on the raft loop, such as writing WAL entries,
+ // it can't process any events or messages from raft. It may think it
+ // is still the leader even the leader has already changed.
+ // Refer to https://github.com/etcd-io/etcd/issues/15247
+ lg := s.Logger()
+ if !s.ensureLeadership() {
+ lg.Warn("Ignore the lease revoking request because current member isn't a leader",
+ zap.Uint64("local-member-id", uint64(s.MemberID())))
+ return
+ }
+
+ // Increases throughput of expired leases deletion process through parallelization
+ c := make(chan struct{}, maxPendingRevokes)
+ for _, curLease := range leases {
+ select {
+ case c <- struct{}{}:
+ case <-s.stopping:
+ return
+ }
+
+ f := func(lid int64) {
+ s.GoAttach(func() {
+ ctx := s.authStore.WithRoot(s.ctx)
+ _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lid})
+ if lerr == nil {
+ leaseExpired.Inc()
+ } else {
+ lg.Warn(
+ "failed to revoke lease",
+ zap.String("lease-id", fmt.Sprintf("%016x", lid)),
+ zap.Error(lerr),
+ )
+ }
+
+ <-c
+ })
+ }
+
+ f(int64(curLease.ID))
+ }
+ })
+}
+
+// isActive checks if the etcd instance is still actively processing the
+// heartbeat message (ticks). It returns false if no heartbeat has been
+// received within 3 * tickMs.
+func (s *EtcdServer) isActive() bool {
+ latestTickTs := s.r.getLatestTickTs()
+ threshold := 3 * time.Duration(s.Cfg.TickMs) * time.Millisecond
+ return latestTickTs.Add(threshold).After(time.Now())
+}
+
+// ensureLeadership checks whether current member is still the leader.
+func (s *EtcdServer) ensureLeadership() bool {
+ lg := s.Logger()
+
+ if s.isActive() {
+ lg.Debug("The member is active, skip checking leadership",
+ zap.Time("latestTickTs", s.r.getLatestTickTs()),
+ zap.Time("now", time.Now()))
+ return true
+ }
+
+ ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
+ defer cancel()
+ if err := s.linearizableReadNotify(ctx); err != nil {
+ lg.Warn("Failed to check current member's leadership",
+ zap.Error(err))
+ return false
+ }
+
+ newLeaderID := s.raftStatus().Lead
+ if newLeaderID != uint64(s.MemberID()) {
+ lg.Warn("Current member isn't a leader",
+ zap.Uint64("local-member-id", uint64(s.MemberID())),
+ zap.Uint64("new-lead", newLeaderID))
+ return false
+ }
+
+ return true
+}
+
+// Cleanup removes allocated objects by EtcdServer.NewServer in
+// situation that EtcdServer::Start was not called (that takes care of cleanup).
+func (s *EtcdServer) Cleanup() {
+ // kv, lessor and backend can be nil if running without v3 enabled
+ // or running unit tests.
+ if s.lessor != nil {
+ s.lessor.Stop()
+ }
+ if s.kv != nil {
+ s.kv.Close()
+ }
+ if s.authStore != nil {
+ s.authStore.Close()
+ }
+ if s.be != nil {
+ s.be.Close()
+ }
+ if s.compactor != nil {
+ s.compactor.Stop()
+ }
+}
+
+func (s *EtcdServer) Defragment() error {
+ s.bemu.Lock()
+ defer s.bemu.Unlock()
+ return s.be.Defrag()
+}
+
+func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
+ s.applySnapshot(ep, apply)
+ s.applyEntries(ep, apply)
+ backend.VerifyBackendConsistency(s.Backend(), s.Logger(), true, schema.AllBuckets...)
+
+ proposalsApplied.Set(float64(ep.appliedi))
+ s.applyWait.Trigger(ep.appliedi)
+
+ // wait for the raft routine to finish the disk writes before triggering a
+ // snapshot. or applied index might be greater than the last index in raft
+ // storage, since the raft routine might be slower than toApply routine.
+ <-apply.notifyc
+
+ s.snapshotIfNeededAndCompactRaftLog(ep)
+ select {
+ // snapshot requested via send()
+ case m := <-s.r.msgSnapC:
+ merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
+ s.sendMergedSnap(merged)
+ default:
+ }
+}
+
+func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
+ if raft.IsEmptySnap(toApply.snapshot) {
+ return
+ }
+ applySnapshotInProgress.Inc()
+
+ lg := s.Logger()
+ lg.Info(
+ "applying snapshot",
+ zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
+ zap.Uint64("current-applied-index", ep.appliedi),
+ zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
+ zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
+ )
+ defer func() {
+ lg.Info(
+ "applied snapshot",
+ zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
+ zap.Uint64("current-applied-index", ep.appliedi),
+ zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
+ zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
+ )
+ applySnapshotInProgress.Dec()
+ }()
+
+ if toApply.snapshot.Metadata.Index <= ep.appliedi {
+ lg.Panic(
+ "unexpected leader snapshot from outdated index",
+ zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
+ zap.Uint64("current-applied-index", ep.appliedi),
+ zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
+ zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
+ )
+ }
+
+ // wait for raftNode to persist snapshot onto the disk
+ <-toApply.notifyc
+
+ bemuUnlocked := false
+ s.bemu.Lock()
+ defer func() {
+ if !bemuUnlocked {
+ s.bemu.Unlock()
+ }
+ }()
+
+ // gofail: var applyBeforeOpenSnapshot struct{}
+ newbe, err := serverstorage.OpenSnapshotBackend(s.Cfg, s.snapshotter, toApply.snapshot, s.beHooks)
+ if err != nil {
+ lg.Panic("failed to open snapshot backend", zap.Error(err))
+ }
+ lg.Info("applySnapshot: opened snapshot backend")
+ // gofail: var applyAfterOpenSnapshot struct{}
+
+ // We need to set the backend to consistIndex before recovering the lessor,
+ // because lessor.Recover will commit the boltDB transaction, accordingly it
+ // will get the old consistent_index persisted into the db in OnPreCommitUnsafe.
+ // Eventually the new consistent_index value coming from snapshot is overwritten
+ // by the old value.
+ s.consistIndex.SetBackend(newbe)
+ verifySnapshotIndex(toApply.snapshot, s.consistIndex.ConsistentIndex())
+
+ // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
+ // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
+ if s.lessor != nil {
+ lg.Info("restoring lease store")
+
+ s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })
+
+ lg.Info("restored lease store")
+ }
+
+ lg.Info("restoring mvcc store")
+
+ if err := s.kv.Restore(newbe); err != nil {
+ lg.Panic("failed to restore mvcc store", zap.Error(err))
+ }
+
+ newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())
+
+ lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
+
+ oldbe := s.be
+ s.be = newbe
+ s.bemu.Unlock()
+ bemuUnlocked = true
+
+ // Closing old backend might block until all the txns
+ // on the backend are finished.
+ // We do not want to wait on closing the old backend.
+ go func() {
+ lg.Info("closing old backend file")
+ defer func() {
+ lg.Info("closed old backend file")
+ }()
+ if err := oldbe.Close(); err != nil {
+ lg.Panic("failed to close old backend", zap.Error(err))
+ }
+ }()
+
+ lg.Info("restoring alarm store")
+
+ if err := s.restoreAlarms(); err != nil {
+ lg.Panic("failed to restore alarm store", zap.Error(err))
+ }
+
+ lg.Info("restored alarm store")
+
+ if s.authStore != nil {
+ lg.Info("restoring auth store")
+
+ s.authStore.Recover(schema.NewAuthBackend(lg, newbe))
+
+ lg.Info("restored auth store")
+ }
+
+ lg.Info("restoring v2 store")
+ if err := s.v2store.Recovery(toApply.snapshot.Data); err != nil {
+ lg.Panic("failed to restore v2 store", zap.Error(err))
+ }
+
+ if err := serverstorage.AssertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {
+ lg.Panic("illegal v2store content", zap.Error(err))
+ }
+
+ lg.Info("restored v2 store")
+
+ s.cluster.SetBackend(schema.NewMembershipBackend(lg, newbe))
+
+ lg.Info("restoring cluster configuration")
+
+ s.cluster.Recover(api.UpdateCapability)
+
+ lg.Info("restored cluster configuration")
+ lg.Info("removing old peers from network")
+
+ // recover raft transport
+ s.r.transport.RemoveAllPeers()
+
+ lg.Info("removed old peers from network")
+ lg.Info("adding peers from new cluster configuration")
+
+ for _, m := range s.cluster.Members() {
+ if m.ID == s.MemberID() {
+ continue
+ }
+ s.r.transport.AddPeer(m.ID, m.PeerURLs)
+ }
+
+ lg.Info("added peers from new cluster configuration")
+
+ ep.appliedt = toApply.snapshot.Metadata.Term
+ ep.appliedi = toApply.snapshot.Metadata.Index
+ ep.diskSnapshotIndex = ep.appliedi
+ ep.memorySnapshotIndex = ep.appliedi
+ ep.confState = toApply.snapshot.Metadata.ConfState
+
+ // As backends and implementations like alarmsStore changed, we need
+ // to re-bootstrap Appliers.
+ s.uberApply = s.NewUberApplier()
+}
+
+func (s *EtcdServer) NewUberApplier() apply.UberApplier {
+ return apply.NewUberApplier(s.lg, s.be, s.KV(), s.alarmStore, s.authStore, s.lessor, s.cluster, s, s, s.consistIndex,
+ s.Cfg.WarningApplyDuration, s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer), s.Cfg.QuotaBackendBytes)
+}
+
+func verifySnapshotIndex(snapshot raftpb.Snapshot, cindex uint64) {
+ verify.Verify(func() {
+ if cindex != snapshot.Metadata.Index {
+ panic(fmt.Sprintf("consistent_index(%d) isn't equal to snapshot index (%d)", cindex, snapshot.Metadata.Index))
+ }
+ })
+}
+
+func verifyConsistentIndexIsLatest(lg *zap.Logger, snapshot raftpb.Snapshot, cindex uint64) {
+ verify.Verify(func() {
+ if cindex < snapshot.Metadata.Index {
+ lg.Panic(fmt.Sprintf("consistent_index(%d) is older than snapshot index (%d)", cindex, snapshot.Metadata.Index))
+ }
+ })
+}
+
+func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
+ if len(apply.entries) == 0 {
+ return
+ }
+ firsti := apply.entries[0].Index
+ if firsti > ep.appliedi+1 {
+ lg := s.Logger()
+ lg.Panic(
+ "unexpected committed entry index",
+ zap.Uint64("current-applied-index", ep.appliedi),
+ zap.Uint64("first-committed-entry-index", firsti),
+ )
+ }
+ var ents []raftpb.Entry
+ if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
+ ents = apply.entries[ep.appliedi+1-firsti:]
+ }
+ if len(ents) == 0 {
+ return
+ }
+ var shouldstop bool
+ if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.raftAdvancedC); shouldstop {
+ go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
+ }
+}
+
+func (s *EtcdServer) ForceSnapshot() {
+ s.forceDiskSnapshot = true
+}
+
+func (s *EtcdServer) snapshotIfNeededAndCompactRaftLog(ep *etcdProgress) {
+ // TODO: Remove disk snapshot in v3.7
+ shouldSnapshotToDisk := s.shouldSnapshotToDisk(ep)
+ shouldSnapshotToMemory := s.shouldSnapshotToMemory(ep)
+ if !shouldSnapshotToDisk && !shouldSnapshotToMemory {
+ return
+ }
+ s.snapshot(ep, shouldSnapshotToDisk)
+ s.compactRaftLog(ep.appliedi)
+}
+
+func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
+ return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
+}
+
+func (s *EtcdServer) shouldSnapshotToMemory(ep *etcdProgress) bool {
+ return ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount
+}
+
+func (s *EtcdServer) hasMultipleVotingMembers() bool {
+ return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1
+}
+
+func (s *EtcdServer) isLeader() bool {
+ return uint64(s.MemberID()) == s.Lead()
+}
+
+// MoveLeader transfers the leader to the given transferee.
+func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error {
+ member := s.cluster.Member(types.ID(transferee))
+ if member == nil || member.IsLearner {
+ return errors.ErrBadLeaderTransferee
+ }
+
+ now := time.Now()
+ interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
+
+ lg := s.Logger()
+ lg.Info(
+ "leadership transfer starting",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("current-leader-member-id", types.ID(lead).String()),
+ zap.String("transferee-member-id", types.ID(transferee).String()),
+ )
+
+ s.r.TransferLeadership(ctx, lead, transferee)
+ for s.Lead() != transferee {
+ select {
+ case <-ctx.Done(): // time out
+ return errors.ErrTimeoutLeaderTransfer
+ case <-time.After(interval):
+ }
+ }
+
+ // TODO: drain all requests, or drop all messages to the old leader
+ lg.Info(
+ "leadership transfer finished",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("old-leader-member-id", types.ID(lead).String()),
+ zap.String("new-leader-member-id", types.ID(transferee).String()),
+ zap.Duration("took", time.Since(now)),
+ )
+ return nil
+}
+
+// TryTransferLeadershipOnShutdown transfers the leader to the chosen transferee. It is only used in server graceful shutdown.
+func (s *EtcdServer) TryTransferLeadershipOnShutdown() error {
+ lg := s.Logger()
+ if !s.isLeader() {
+ lg.Info(
+ "skipped leadership transfer; local server is not leader",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
+ )
+ return nil
+ }
+
+ if !s.hasMultipleVotingMembers() {
+ lg.Info(
+ "skipped leadership transfer for single voting member cluster",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
+ )
+ return nil
+ }
+
+ transferee, ok := longestConnected(s.r.transport, s.cluster.VotingMemberIDs())
+ if !ok {
+ return errors.ErrUnhealthy
+ }
+
+ tm := s.Cfg.ReqTimeout()
+ ctx, cancel := context.WithTimeout(s.ctx, tm)
+ err := s.MoveLeader(ctx, s.Lead(), uint64(transferee))
+ cancel()
+ return err
+}
+
+// HardStop stops the server without coordination with other members in the cluster.
+func (s *EtcdServer) HardStop() {
+ select {
+ case s.stop <- struct{}{}:
+ case <-s.done:
+ return
+ }
+ <-s.done
+}
+
+// Stop stops the server gracefully, and shuts down the running goroutine.
+// Stop should be called after a Start(s), otherwise it will block forever.
+// When stopping leader, Stop transfers its leadership to one of its peers
+// before stopping the server.
+// Stop terminates the Server and performs any necessary finalization.
+// Do and Process cannot be called after Stop has been invoked.
+func (s *EtcdServer) Stop() {
+ lg := s.Logger()
+ if err := s.TryTransferLeadershipOnShutdown(); err != nil {
+ lg.Warn("leadership transfer failed", zap.String("local-member-id", s.MemberID().String()), zap.Error(err))
+ }
+ s.HardStop()
+}
+
+// ReadyNotify returns a channel that will be closed when the server
+// is ready to serve client requests
+func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }
+
+func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
+ select {
+ case <-time.After(d):
+ case <-s.done:
+ }
+ select {
+ case s.errorc <- err:
+ default:
+ }
+}
+
+// StopNotify returns a channel that receives an empty struct
+// when the server is stopped.
+func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
+
+// StoppingNotify returns a channel that receives an empty struct
+// when the server is being stopped.
+func (s *EtcdServer) StoppingNotify() <-chan struct{} { return s.stopping }
+
+func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) error {
+ if s.authStore == nil {
+ // In the context of ordinary etcd process, s.authStore will never be nil.
+ // This branch is for handling cases in server_test.go
+ return nil
+ }
+
+ // Note that this permission check is done in the API layer,
+ // so TOCTOU problem can be caused potentially in a schedule like this:
+ // update membership with user A -> revoke root role of A -> toApply membership change
+ // in the state machine layer
+ // However, both of membership change and role management requires the root privilege.
+ // So careful operation by admins can prevent the problem.
+ authInfo, err := s.AuthInfoFromCtx(ctx)
+ if err != nil {
+ return err
+ }
+
+ return s.AuthStore().IsAdminPermitted(authInfo)
+}
+
+func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
+ if err := s.checkMembershipOperationPermission(ctx); err != nil {
+ return nil, err
+ }
+
+ // TODO: move Member to protobuf type
+ b, err := json.Marshal(memb)
+ if err != nil {
+ return nil, err
+ }
+
+ // by default StrictReconfigCheck is enabled; reject new members if unhealthy.
+ if err := s.mayAddMember(memb); err != nil {
+ return nil, err
+ }
+
+ cc := raftpb.ConfChange{
+ Type: raftpb.ConfChangeAddNode,
+ NodeID: uint64(memb.ID),
+ Context: b,
+ }
+
+ if memb.IsLearner {
+ cc.Type = raftpb.ConfChangeAddLearnerNode
+ }
+
+ return s.configure(ctx, cc)
+}
+
+func (s *EtcdServer) mayAddMember(memb membership.Member) error {
+ lg := s.Logger()
+ if !s.Cfg.StrictReconfigCheck {
+ return nil
+ }
+
+ // protect quorum when adding voting member
+ if !memb.IsLearner && !s.cluster.IsReadyToAddVotingMember() {
+ lg.Warn(
+ "rejecting member add request; not enough healthy members",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
+ zap.Error(errors.ErrNotEnoughStartedMembers),
+ )
+ return errors.ErrNotEnoughStartedMembers
+ }
+
+ if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.MemberID(), s.cluster.VotingMembers()) {
+ lg.Warn(
+ "rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
+ zap.Error(errors.ErrUnhealthy),
+ )
+ return errors.ErrUnhealthy
+ }
+
+ return nil
+}
+
+func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
+ if err := s.checkMembershipOperationPermission(ctx); err != nil {
+ return nil, err
+ }
+
+ // by default StrictReconfigCheck is enabled; reject removal if leads to quorum loss
+ if err := s.mayRemoveMember(types.ID(id)); err != nil {
+ return nil, err
+ }
+
+ cc := raftpb.ConfChange{
+ Type: raftpb.ConfChangeRemoveNode,
+ NodeID: id,
+ }
+ return s.configure(ctx, cc)
+}
+
+// PromoteMember promotes a learner node to a voting node.
+func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
+ // only raft leader has information on whether the to-be-promoted learner node is ready. If promoteMember call
+ // fails with ErrNotLeader, forward the request to leader node via HTTP. If promoteMember call fails with error
+ // other than ErrNotLeader, return the error.
+ resp, err := s.promoteMember(ctx, id)
+ if err == nil {
+ learnerPromoteSucceed.Inc()
+ return resp, nil
+ }
+ if !errorspkg.Is(err, errors.ErrNotLeader) {
+ learnerPromoteFailed.WithLabelValues(err.Error()).Inc()
+ return resp, err
+ }
+
+ cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
+ defer cancel()
+ // forward to leader
+ for cctx.Err() == nil {
+ leader, err := s.waitLeader(cctx)
+ if err != nil {
+ return nil, err
+ }
+ for _, url := range leader.PeerURLs {
+ resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt)
+ if err == nil {
+ return resp, nil
+ }
+ // If member promotion failed, return early. Otherwise keep retry.
+ if errorspkg.Is(err, errors.ErrLearnerNotReady) || errorspkg.Is(err, membership.ErrIDNotFound) || errorspkg.Is(err, membership.ErrMemberNotLearner) {
+ return nil, err
+ }
+ }
+ }
+
+ if errorspkg.Is(cctx.Err(), context.DeadlineExceeded) {
+ return nil, errors.ErrTimeout
+ }
+ return nil, errors.ErrCanceled
+}
+
+// promoteMember checks whether the to-be-promoted learner node is ready before sending the promote
+// request to raft.
+// The function returns ErrNotLeader if the local node is not raft leader (therefore does not have
+// enough information to determine if the learner node is ready), returns ErrLearnerNotReady if the
+// local node is leader (therefore has enough information) but decided the learner node is not ready
+// to be promoted.
+func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
+ if err := s.checkMembershipOperationPermission(ctx); err != nil {
+ return nil, err
+ }
+
+ // check if we can promote this learner.
+ if err := s.mayPromoteMember(types.ID(id)); err != nil {
+ return nil, err
+ }
+
+ // build the context for the promote confChange. mark IsLearner to false and IsPromote to true.
+ promoteChangeContext := membership.ConfigChangeContext{
+ Member: membership.Member{
+ ID: types.ID(id),
+ },
+ IsPromote: true,
+ }
+
+ b, err := json.Marshal(promoteChangeContext)
+ if err != nil {
+ return nil, err
+ }
+
+ cc := raftpb.ConfChange{
+ Type: raftpb.ConfChangeAddNode,
+ NodeID: id,
+ Context: b,
+ }
+
+ return s.configure(ctx, cc)
+}
+
+func (s *EtcdServer) mayPromoteMember(id types.ID) error {
+ lg := s.Logger()
+ if err := s.isLearnerReady(lg, uint64(id)); err != nil {
+ return err
+ }
+
+ if !s.Cfg.StrictReconfigCheck {
+ return nil
+ }
+ if !s.cluster.IsReadyToPromoteMember(uint64(id)) {
+ lg.Warn(
+ "rejecting member promote request; not enough healthy members",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("requested-member-remove-id", id.String()),
+ zap.Error(errors.ErrNotEnoughStartedMembers),
+ )
+ return errors.ErrNotEnoughStartedMembers
+ }
+
+ return nil
+}
+
+// check whether the learner catches up with leader or not.
+// Note: it will return nil if member is not found in cluster or if member is not learner.
+// These two conditions will be checked before toApply phase later.
+func (s *EtcdServer) isLearnerReady(lg *zap.Logger, id uint64) error {
+ if err := s.waitAppliedIndex(); err != nil {
+ return err
+ }
+
+ rs := s.raftStatus()
+
+ // leader's raftStatus.Progress is not nil
+ if rs.Progress == nil {
+ return errors.ErrNotLeader
+ }
+
+ var learnerMatch uint64
+ isFound := false
+ leaderID := rs.ID
+ for memberID, progress := range rs.Progress {
+ if id == memberID {
+ // check its status
+ learnerMatch = progress.Match
+ isFound = true
+ break
+ }
+ }
+
+ // We should return an error in API directly, to avoid the request
+ // being unnecessarily delivered to raft.
+ if !isFound {
+ return membership.ErrIDNotFound
+ }
+
+ leaderMatch := rs.Progress[leaderID].Match
+
+ learnerReadyPercent := float64(learnerMatch) / float64(leaderMatch)
+
+ // the learner's Match not caught up with leader yet
+ if learnerReadyPercent < readyPercentThreshold {
+ lg.Error(
+ "rejecting promote learner: learner is not ready",
+ zap.Float64("learner-ready-percent", learnerReadyPercent),
+ zap.Float64("ready-percent-threshold", readyPercentThreshold),
+ )
+ return errors.ErrLearnerNotReady
+ }
+
+ return nil
+}
+
+func (s *EtcdServer) mayRemoveMember(id types.ID) error {
+ if !s.Cfg.StrictReconfigCheck {
+ return nil
+ }
+
+ lg := s.Logger()
+ member := s.cluster.Member(id)
+ // no need to check quorum when removing non-voting member
+ if member != nil && member.IsLearner {
+ return nil
+ }
+
+ if !s.cluster.IsReadyToRemoveVotingMember(uint64(id)) {
+ lg.Warn(
+ "rejecting member remove request; not enough healthy members",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("requested-member-remove-id", id.String()),
+ zap.Error(errors.ErrNotEnoughStartedMembers),
+ )
+ return errors.ErrNotEnoughStartedMembers
+ }
+
+ // downed member is safe to remove since it's not part of the active quorum
+ if t := s.r.transport.ActiveSince(id); id != s.MemberID() && t.IsZero() {
+ return nil
+ }
+
+ // protect quorum if some members are down
+ m := s.cluster.VotingMembers()
+ active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.MemberID(), m)
+ if (active - 1) < 1+((len(m)-1)/2) {
+ lg.Warn(
+ "rejecting member remove request; local member has not been connected to all peers, reconfigure breaks active quorum",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("requested-member-remove", id.String()),
+ zap.Int("active-peers", active),
+ zap.Error(errors.ErrUnhealthy),
+ )
+ return errors.ErrUnhealthy
+ }
+
+ return nil
+}
+
+func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
+ b, merr := json.Marshal(memb)
+ if merr != nil {
+ return nil, merr
+ }
+
+ if err := s.checkMembershipOperationPermission(ctx); err != nil {
+ return nil, err
+ }
+ cc := raftpb.ConfChange{
+ Type: raftpb.ConfChangeUpdateNode,
+ NodeID: uint64(memb.ID),
+ Context: b,
+ }
+ return s.configure(ctx, cc)
+}
+
+func (s *EtcdServer) setCommittedIndex(v uint64) {
+ atomic.StoreUint64(&s.committedIndex, v)
+}
+
+func (s *EtcdServer) getCommittedIndex() uint64 {
+ return atomic.LoadUint64(&s.committedIndex)
+}
+
+func (s *EtcdServer) setAppliedIndex(v uint64) {
+ atomic.StoreUint64(&s.appliedIndex, v)
+}
+
+func (s *EtcdServer) getAppliedIndex() uint64 {
+ return atomic.LoadUint64(&s.appliedIndex)
+}
+
+func (s *EtcdServer) setTerm(v uint64) {
+ atomic.StoreUint64(&s.term, v)
+}
+
+func (s *EtcdServer) getTerm() uint64 {
+ return atomic.LoadUint64(&s.term)
+}
+
+func (s *EtcdServer) setLead(v uint64) {
+ atomic.StoreUint64(&s.lead, v)
+}
+
+func (s *EtcdServer) getLead() uint64 {
+ return atomic.LoadUint64(&s.lead)
+}
+
+func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} {
+ return s.leaderChanged.Receive()
+}
+
+// FirstCommitInTermNotify returns channel that will be unlocked on first
+// entry committed in new term, which is necessary for new leader to answer
+// read-only requests (leader is not able to respond any read-only requests
+// as long as linearizable semantic is required)
+func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} {
+ return s.firstCommitInTerm.Receive()
+}
+
+// MemberId returns the ID of the local member.
+// Deprecated: Please use (*EtcdServer) MemberID instead.
+//
+//revive:disable:var-naming
+func (s *EtcdServer) MemberId() types.ID { return s.MemberID() }
+
+//revive:enable:var-naming
+
+func (s *EtcdServer) MemberID() types.ID { return s.memberID }
+
+func (s *EtcdServer) Leader() types.ID { return types.ID(s.getLead()) }
+
+func (s *EtcdServer) Lead() uint64 { return s.getLead() }
+
+func (s *EtcdServer) CommittedIndex() uint64 { return s.getCommittedIndex() }
+
+func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() }
+
+func (s *EtcdServer) Term() uint64 { return s.getTerm() }
+
+type confChangeResponse struct {
+ membs []*membership.Member
+ raftAdvanceC <-chan struct{}
+ err error
+}
+
+// configure sends a configuration change through consensus and
+// then waits for it to be applied to the server. It
+// will block until the change is performed or there is an error.
+func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
+ lg := s.Logger()
+ cc.ID = s.reqIDGen.Next()
+ ch := s.w.Register(cc.ID)
+
+ start := time.Now()
+ if err := s.r.ProposeConfChange(ctx, cc); err != nil {
+ s.w.Trigger(cc.ID, nil)
+ return nil, err
+ }
+
+ select {
+ case x := <-ch:
+ if x == nil {
+ lg.Panic("failed to configure")
+ }
+ resp := x.(*confChangeResponse)
+ // etcdserver need to ensure the raft has already been notified
+ // or advanced before it responds to the client. Otherwise, the
+ // following config change request may be rejected.
+ // See https://github.com/etcd-io/etcd/issues/15528.
+ <-resp.raftAdvanceC
+ lg.Info(
+ "applied a configuration change through raft",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("raft-conf-change", cc.Type.String()),
+ zap.String("raft-conf-change-node-id", types.ID(cc.NodeID).String()),
+ )
+ return resp.membs, resp.err
+
+ case <-ctx.Done():
+ s.w.Trigger(cc.ID, nil) // GC wait
+ return nil, s.parseProposeCtxErr(ctx.Err(), start)
+
+ case <-s.stopping:
+ return nil, errors.ErrStopped
+ }
+}
+
+// publishV3 registers server information into the cluster using v3 request. The
+// information is the JSON representation of this server's member struct, updated
+// with the static clientURLs of the server.
+// The function keeps attempting to register until it succeeds,
+// or its server is stopped.
+func (s *EtcdServer) publishV3(timeout time.Duration) {
+ req := &membershippb.ClusterMemberAttrSetRequest{
+ Member_ID: uint64(s.MemberID()),
+ MemberAttributes: &membershippb.Attributes{
+ Name: s.attributes.Name,
+ ClientUrls: s.attributes.ClientURLs,
+ },
+ }
+ // gofail: var beforePublishing struct{}
+ lg := s.Logger()
+ for {
+ select {
+ case <-s.stopping:
+ lg.Warn(
+ "stopped publish because server is stopping",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
+ zap.Duration("publish-timeout", timeout),
+ )
+ return
+
+ default:
+ }
+
+ ctx, cancel := context.WithTimeout(s.ctx, timeout)
+ _, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterMemberAttrSet: req})
+ cancel()
+ switch err {
+ case nil:
+ close(s.readych)
+ lg.Info(
+ "published local member to cluster through raft",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
+ zap.String("cluster-id", s.cluster.ID().String()),
+ zap.Duration("publish-timeout", timeout),
+ )
+ return
+
+ default:
+ lg.Warn(
+ "failed to publish local member to cluster through raft",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
+ zap.Duration("publish-timeout", timeout),
+ zap.Error(err),
+ )
+ }
+ }
+}
+
+func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
+ atomic.AddInt64(&s.inflightSnapshots, 1)
+
+ lg := s.Logger()
+ fields := []zap.Field{
+ zap.String("from", s.MemberID().String()),
+ zap.String("to", types.ID(merged.To).String()),
+ zap.Int64("bytes", merged.TotalSize),
+ zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
+ }
+
+ now := time.Now()
+ s.r.transport.SendSnapshot(merged)
+ lg.Info("sending merged snapshot", fields...)
+
+ s.GoAttach(func() {
+ select {
+ case ok := <-merged.CloseNotify():
+ // delay releasing inflight snapshot for another 30 seconds to
+ // block log compaction.
+ // If the follower still fails to catch up, it is probably just too slow
+ // to catch up. We cannot avoid the snapshot cycle anyway.
+ if ok {
+ select {
+ case <-time.After(releaseDelayAfterSnapshot):
+ case <-s.stopping:
+ }
+ }
+
+ atomic.AddInt64(&s.inflightSnapshots, -1)
+
+ lg.Info("sent merged snapshot", append(fields, zap.Duration("took", time.Since(now)))...)
+
+ case <-s.stopping:
+ lg.Warn("canceled sending merged snapshot; server stopping", fields...)
+ return
+ }
+ })
+}
+
+// toApply takes entries received from Raft (after it has been committed) and
+// applies them to the current state of the EtcdServer.
+// The given entries should not be empty.
+func (s *EtcdServer) apply(
+ es []raftpb.Entry,
+ confState *raftpb.ConfState,
+ raftAdvancedC <-chan struct{},
+) (appliedt uint64, appliedi uint64, shouldStop bool) {
+ s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
+ for i := range es {
+ e := es[i]
+ index := s.consistIndex.ConsistentIndex()
+ s.lg.Debug("Applying entry",
+ zap.Uint64("consistent-index", index),
+ zap.Uint64("entry-index", e.Index),
+ zap.Uint64("entry-term", e.Term),
+ zap.Stringer("entry-type", e.Type))
+
+ // We need to toApply all WAL entries on top of v2store
+ // and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
+ shouldApplyV3 := membership.ApplyV2storeOnly
+ if e.Index > index {
+ shouldApplyV3 = membership.ApplyBoth
+ // set the consistent index of current executing entry
+ s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
+ }
+ switch e.Type {
+ case raftpb.EntryNormal:
+ // gofail: var beforeApplyOneEntryNormal struct{}
+ s.applyEntryNormal(&e, shouldApplyV3)
+ s.setAppliedIndex(e.Index)
+ s.setTerm(e.Term)
+
+ case raftpb.EntryConfChange:
+ // gofail: var beforeApplyOneConfChange struct{}
+ var cc raftpb.ConfChange
+ pbutil.MustUnmarshal(&cc, e.Data)
+ removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
+ s.setAppliedIndex(e.Index)
+ s.setTerm(e.Term)
+ shouldStop = shouldStop || removedSelf
+ s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), raftAdvancedC, err})
+
+ default:
+ lg := s.Logger()
+ lg.Panic(
+ "unknown entry type; must be either EntryNormal or EntryConfChange",
+ zap.String("type", e.Type.String()),
+ )
+ }
+ appliedi, appliedt = e.Index, e.Term
+ }
+ return appliedt, appliedi, shouldStop
+}
+
+// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
+func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.ShouldApplyV3) {
+ var ar *apply.Result
+ if shouldApplyV3 {
+ defer func() {
+ // The txPostLockInsideApplyHook will not get called in some cases,
+ // in which we should move the consistent index forward directly.
+ newIndex := s.consistIndex.ConsistentIndex()
+ if newIndex < e.Index {
+ s.consistIndex.SetConsistentIndex(e.Index, e.Term)
+ }
+ }()
+ }
+
+ // raft state machine may generate noop entry when leader confirmation.
+ // skip it in advance to avoid some potential bug in the future
+ if len(e.Data) == 0 {
+ s.firstCommitInTerm.Notify()
+
+ // promote lessor when the local member is leader and finished
+ // applying all entries from the last term.
+ if s.isLeader() {
+ s.lessor.Promote(s.Cfg.ElectionTimeout())
+ }
+ return
+ }
+
+ var raftReq pb.InternalRaftRequest
+ if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
+ var r pb.Request
+ rp := &r
+ pbutil.MustUnmarshal(rp, e.Data)
+ s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
+ raftReq = v2ToV3Request(s.lg, (*RequestV2)(rp))
+ }
+ s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
+
+ if raftReq.V2 != nil {
+ req := (*RequestV2)(raftReq.V2)
+ raftReq = v2ToV3Request(s.lg, req)
+ }
+
+ id := raftReq.ID
+ if id == 0 {
+ if raftReq.Header == nil {
+ s.lg.Panic("applyEntryNormal, could not find a header")
+ }
+ id = raftReq.Header.ID
+ }
+
+ needResult := s.w.IsRegistered(id)
+ if needResult || !noSideEffect(&raftReq) {
+ if !needResult && raftReq.Txn != nil {
+ removeNeedlessRangeReqs(raftReq.Txn)
+ }
+ ar = s.applyInternalRaftRequest(&raftReq, shouldApplyV3)
+ }
+
+ // do not re-toApply applied entries.
+ if !shouldApplyV3 {
+ return
+ }
+
+ if ar == nil {
+ return
+ }
+
+ if !errorspkg.Is(ar.Err, errors.ErrNoSpace) || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
+ s.w.Trigger(id, ar)
+ return
+ }
+
+ lg := s.Logger()
+ lg.Warn(
+ "message exceeded backend quota; raising alarm",
+ zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
+ zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))),
+ zap.Error(ar.Err),
+ )
+
+ s.GoAttach(func() {
+ a := &pb.AlarmRequest{
+ MemberID: uint64(s.MemberID()),
+ Action: pb.AlarmRequest_ACTIVATE,
+ Alarm: pb.AlarmType_NOSPACE,
+ }
+ s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
+ s.w.Trigger(id, ar)
+ })
+}
+
+func (s *EtcdServer) applyInternalRaftRequest(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *apply.Result {
+ if r.ClusterVersionSet == nil && r.ClusterMemberAttrSet == nil && r.DowngradeInfoSet == nil && r.DowngradeVersionTest == nil {
+ if !shouldApplyV3 {
+ return nil
+ }
+ return s.uberApply.Apply(r)
+ }
+ membershipApplier := apply.NewApplierMembership(s.lg, s.cluster, s)
+ op := "unknown"
+ defer func(start time.Time) {
+ txn.ApplySecObserve("v3", op, true, time.Since(start))
+ txn.WarnOfExpensiveRequest(s.lg, s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, nil, nil)
+ }(time.Now())
+ switch {
+ case r.ClusterVersionSet != nil:
+ op = "ClusterVersionSet" // Implemented in 3.5.x
+ membershipApplier.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
+ return &apply.Result{}
+ case r.ClusterMemberAttrSet != nil:
+ op = "ClusterMemberAttrSet" // Implemented in 3.5.x
+ membershipApplier.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
+ case r.DowngradeInfoSet != nil:
+ op = "DowngradeInfoSet" // Implemented in 3.5.x
+ membershipApplier.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
+ case r.DowngradeVersionTest != nil:
+ op = "DowngradeVersionTest" // Implemented in 3.6 for test only
+ // do nothing, we are just to ensure etcdserver don't panic in case
+ // users(test cases) intentionally inject DowngradeVersionTestRequest
+ // into the WAL files.
+ default:
+ s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
+ return nil
+ }
+ return &apply.Result{}
+}
+
+func noSideEffect(r *pb.InternalRaftRequest) bool {
+ return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil
+}
+
+func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
+ f := func(ops []*pb.RequestOp) []*pb.RequestOp {
+ j := 0
+ for i := 0; i < len(ops); i++ {
+ if _, ok := ops[i].Request.(*pb.RequestOp_RequestRange); ok {
+ continue
+ }
+ ops[j] = ops[i]
+ j++
+ }
+
+ return ops[:j]
+ }
+
+ txn.Success = f(txn.Success)
+ txn.Failure = f(txn.Failure)
+}
+
+// applyConfChange applies a ConfChange to the server. It is only
+// invoked with a ConfChange that has already passed through Raft
+func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
+ lg := s.Logger()
+ if err := s.cluster.ValidateConfigurationChange(cc, shouldApplyV3); err != nil {
+ lg.Error("Validation on configuration change failed", zap.Bool("shouldApplyV3", bool(shouldApplyV3)), zap.Error(err))
+ cc.NodeID = raft.None
+ s.r.ApplyConfChange(cc)
+
+ // The txPostLock callback will not get called in this case,
+ // so we should set the consistent index directly.
+ if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
+ applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
+ s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
+ }
+ return false, err
+ }
+
+ *confState = *s.r.ApplyConfChange(cc)
+ s.beHooks.SetConfState(confState)
+ switch cc.Type {
+ case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
+ confChangeContext := new(membership.ConfigChangeContext)
+ if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
+ lg.Panic("failed to unmarshal member", zap.Error(err))
+ }
+ if cc.NodeID != uint64(confChangeContext.Member.ID) {
+ lg.Panic(
+ "got different member ID",
+ zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
+ zap.String("member-id-from-message", confChangeContext.Member.ID.String()),
+ )
+ }
+ if confChangeContext.IsPromote {
+ s.cluster.PromoteMember(confChangeContext.Member.ID, shouldApplyV3)
+ } else {
+ s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3)
+
+ if confChangeContext.Member.ID != s.MemberID() {
+ s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
+ }
+ }
+
+ case raftpb.ConfChangeRemoveNode:
+ id := types.ID(cc.NodeID)
+ s.cluster.RemoveMember(id, shouldApplyV3)
+ if id == s.MemberID() {
+ return true, nil
+ }
+ s.r.transport.RemovePeer(id)
+
+ case raftpb.ConfChangeUpdateNode:
+ m := new(membership.Member)
+ if err := json.Unmarshal(cc.Context, m); err != nil {
+ lg.Panic("failed to unmarshal member", zap.Error(err))
+ }
+ if cc.NodeID != uint64(m.ID) {
+ lg.Panic(
+ "got different member ID",
+ zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
+ zap.String("member-id-from-message", m.ID.String()),
+ )
+ }
+ s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3)
+ if m.ID != s.MemberID() {
+ s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
+ }
+ }
+
+ verify.Verify(func() {
+ s.verifyV3StoreInSyncWithV2Store(shouldApplyV3)
+ })
+
+ return false, nil
+}
+
+func (s *EtcdServer) verifyV3StoreInSyncWithV2Store(shouldApplyV3 membership.ShouldApplyV3) {
+ // If shouldApplyV3 == false, then it means v2store hasn't caught up with v3store.
+ if !shouldApplyV3 {
+ return
+ }
+
+ // clean up the Attributes, and we only care about the RaftAttributes
+ cleanAttributesFunc := func(members map[types.ID]*membership.Member) map[types.ID]*membership.Member {
+ processedMembers := make(map[types.ID]*membership.Member)
+ for id, m := range members {
+ clonedMember := m.Clone()
+ clonedMember.Attributes = membership.Attributes{}
+ processedMembers[id] = clonedMember
+ }
+
+ return processedMembers
+ }
+
+ v2Members, _ := s.cluster.MembersFromStore()
+ v3Members, _ := s.cluster.MembersFromBackend()
+
+ processedV2Members := cleanAttributesFunc(v2Members)
+ processedV3Members := cleanAttributesFunc(v3Members)
+
+ if match := reflect.DeepEqual(processedV2Members, processedV3Members); !match {
+ v2Data, v2Err := json.Marshal(processedV2Members)
+ v3Data, v3Err := json.Marshal(processedV3Members)
+
+ if v2Err != nil || v3Err != nil {
+ panic("members in v2store doesn't match v3store")
+ }
+ panic(fmt.Sprintf("members in v2store doesn't match v3store, v2store: %s, v3store: %s", string(v2Data), string(v3Data)))
+ }
+}
+
+// TODO: non-blocking snapshot
+func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) {
+ lg := s.Logger()
+ d := GetMembershipInfoInV2Format(lg, s.cluster)
+ if toDisk {
+ s.Logger().Info(
+ "triggering snapshot",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.Uint64("local-member-applied-index", ep.appliedi),
+ zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
+ zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
+ zap.Bool("snapshot-forced", s.forceDiskSnapshot),
+ )
+ s.forceDiskSnapshot = false
+ // commit kv to write metadata (for example: consistent index) to disk.
+ //
+ // This guarantees that Backend's consistent_index is >= index of last snapshot.
+ //
+ // KV().commit() updates the consistent index in backend.
+ // All operations that update consistent index must be called sequentially
+ // from applyAll function.
+ // So KV().Commit() cannot run in parallel with toApply. It has to be called outside
+ // the go routine created below.
+ s.KV().Commit()
+ }
+
+ // For backward compatibility, generate v2 snapshot from v3 state.
+ snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d)
+ if err != nil {
+ // the snapshot was done asynchronously with the progress of raft.
+ // raft might have already got a newer snapshot.
+ if errorspkg.Is(err, raft.ErrSnapOutOfDate) {
+ return
+ }
+ lg.Panic("failed to create snapshot", zap.Error(err))
+ }
+ ep.memorySnapshotIndex = ep.appliedi
+
+ verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
+
+ if toDisk {
+ // SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
+ if err = s.r.storage.SaveSnap(snap); err != nil {
+ lg.Panic("failed to save snapshot", zap.Error(err))
+ }
+ ep.diskSnapshotIndex = ep.appliedi
+ if err = s.r.storage.Release(snap); err != nil {
+ lg.Panic("failed to release wal", zap.Error(err))
+ }
+
+ lg.Info(
+ "saved snapshot to disk",
+ zap.Uint64("snapshot-index", snap.Metadata.Index),
+ )
+ }
+}
+
+func (s *EtcdServer) compactRaftLog(snapi uint64) {
+ lg := s.Logger()
+
+ // When sending a snapshot, etcd will pause compaction.
+ // After receives a snapshot, the slow follower needs to get all the entries right after
+ // the snapshot sent to catch up. If we do not pause compaction, the log entries right after
+ // the snapshot sent might already be compacted. It happens when the snapshot takes long time
+ // to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
+ if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
+ lg.Info("skip compaction since there is an inflight snapshot")
+ return
+ }
+
+ // keep some in memory log entries for slow followers.
+ compacti := uint64(1)
+ if snapi > s.Cfg.SnapshotCatchUpEntries {
+ compacti = snapi - s.Cfg.SnapshotCatchUpEntries
+ }
+ err := s.r.raftStorage.Compact(compacti)
+ if err != nil {
+ // the compaction was done asynchronously with the progress of raft.
+ // raft log might already been compact.
+ if errorspkg.Is(err, raft.ErrCompacted) {
+ return
+ }
+ lg.Panic("failed to compact", zap.Error(err))
+ }
+ lg.Debug(
+ "compacted Raft logs",
+ zap.Uint64("compact-index", compacti),
+ )
+}
+
+// CutPeer drops messages to the specified peer.
+func (s *EtcdServer) CutPeer(id types.ID) {
+ tr, ok := s.r.transport.(*rafthttp.Transport)
+ if ok {
+ tr.CutPeer(id)
+ }
+}
+
+// MendPeer recovers the message dropping behavior of the given peer.
+func (s *EtcdServer) MendPeer(id types.ID) {
+ tr, ok := s.r.transport.(*rafthttp.Transport)
+ if ok {
+ tr.MendPeer(id)
+ }
+}
+
+func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
+
+func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
+
+func (s *EtcdServer) ClusterVersion() *semver.Version {
+ if s.cluster == nil {
+ return nil
+ }
+ return s.cluster.Version()
+}
+
+func (s *EtcdServer) StorageVersion() *semver.Version {
+ // `applySnapshot` sets a new backend instance, so we need to acquire the bemu lock.
+ s.bemu.RLock()
+ defer s.bemu.RUnlock()
+
+ v, err := schema.DetectSchemaVersion(s.lg, s.be.ReadTx())
+ if err != nil {
+ s.lg.Warn("Failed to detect schema version", zap.Error(err))
+ return nil
+ }
+ return &v
+}
+
+// monitorClusterVersions every monitorVersionInterval checks if it's the leader and updates cluster version if needed.
+func (s *EtcdServer) monitorClusterVersions() {
+ lg := s.Logger()
+ monitor := serverversion.NewMonitor(lg, NewServerVersionAdapter(s))
+ for {
+ select {
+ case <-s.firstCommitInTerm.Receive():
+ case <-time.After(monitorVersionInterval):
+ case <-s.stopping:
+ lg.Info("server has stopped; stopping cluster version's monitor")
+ return
+ }
+
+ if s.Leader() != s.MemberID() {
+ continue
+ }
+ err := monitor.UpdateClusterVersionIfNeeded()
+ if err != nil {
+ s.lg.Error("Failed to monitor cluster version", zap.Error(err))
+ }
+ }
+}
+
+// monitorStorageVersion every monitorVersionInterval updates storage version if needed.
+func (s *EtcdServer) monitorStorageVersion() {
+ lg := s.Logger()
+ monitor := serverversion.NewMonitor(lg, NewServerVersionAdapter(s))
+ for {
+ select {
+ case <-time.After(monitorVersionInterval):
+ case <-s.clusterVersionChanged.Receive():
+ case <-s.stopping:
+ lg.Info("server has stopped; stopping storage version's monitor")
+ return
+ }
+ monitor.UpdateStorageVersionIfNeeded()
+ }
+}
+
+func (s *EtcdServer) monitorKVHash() {
+ t := s.Cfg.CorruptCheckTime
+ if t == 0 {
+ return
+ }
+ checkTicker := time.NewTicker(t)
+ defer checkTicker.Stop()
+
+ lg := s.Logger()
+ lg.Info(
+ "enabled corruption checking",
+ zap.String("local-member-id", s.MemberID().String()),
+ zap.Duration("interval", t),
+ )
+ for {
+ select {
+ case <-s.stopping:
+ lg.Info("server has stopped; stopping kv hash's monitor")
+ return
+ case <-checkTicker.C:
+ }
+ backend.VerifyBackendConsistency(s.be, lg, false, schema.AllBuckets...)
+ if !s.isLeader() {
+ continue
+ }
+ if err := s.corruptionChecker.PeriodicCheck(); err != nil {
+ lg.Warn("failed to check hash KV", zap.Error(err))
+ }
+ }
+}
+
+func (s *EtcdServer) monitorCompactHash() {
+ if !s.FeatureEnabled(features.CompactHashCheck) {
+ return
+ }
+ t := s.Cfg.CompactHashCheckTime
+ for {
+ select {
+ case <-time.After(t):
+ case <-s.stopping:
+ lg := s.Logger()
+ lg.Info("server has stopped; stopping compact hash's monitor")
+ return
+ }
+ if !s.isLeader() {
+ continue
+ }
+ s.corruptionChecker.CompactHashCheck()
+ }
+}
+
+func (s *EtcdServer) updateClusterVersionV3(ver string) {
+ lg := s.Logger()
+
+ if s.cluster.Version() == nil {
+ lg.Info(
+ "setting up initial cluster version using v3 API",
+ zap.String("cluster-version", version.Cluster(ver)),
+ )
+ } else {
+ lg.Info(
+ "updating cluster version using v3 API",
+ zap.String("from", version.Cluster(s.cluster.Version().String())),
+ zap.String("to", version.Cluster(ver)),
+ )
+ }
+
+ req := membershippb.ClusterVersionSetRequest{Ver: ver}
+
+ ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
+ _, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterVersionSet: &req})
+ cancel()
+
+ switch {
+ case errorspkg.Is(err, nil):
+ lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
+ return
+
+ case errorspkg.Is(err, errors.ErrStopped):
+ lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
+ return
+
+ default:
+ lg.Warn("failed to update cluster version", zap.Error(err))
+ }
+}
+
+// monitorDowngrade every DowngradeCheckTime checks if it's the leader and cancels downgrade if needed.
+func (s *EtcdServer) monitorDowngrade() {
+ monitor := serverversion.NewMonitor(s.Logger(), NewServerVersionAdapter(s))
+ t := s.Cfg.DowngradeCheckTime
+ if t == 0 {
+ return
+ }
+ for {
+ select {
+ case <-time.After(t):
+ case <-s.stopping:
+ return
+ }
+
+ if !s.isLeader() {
+ continue
+ }
+ monitor.CancelDowngradeIfNeeded()
+ }
+}
+
+func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
+ switch {
+ case errorspkg.Is(err, context.Canceled):
+ return errors.ErrCanceled
+
+ case errorspkg.Is(err, context.DeadlineExceeded):
+ s.leadTimeMu.RLock()
+ curLeadElected := s.leadElectedTime
+ s.leadTimeMu.RUnlock()
+ prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
+ if start.After(prevLeadLost) && start.Before(curLeadElected) {
+ return errors.ErrTimeoutDueToLeaderFail
+ }
+ lead := types.ID(s.getLead())
+ switch lead {
+ case types.ID(raft.None):
+ // TODO: return error to specify it happens because the cluster does not have leader now
+ case s.MemberID():
+ if !isConnectedToQuorumSince(s.r.transport, start, s.MemberID(), s.cluster.Members()) {
+ return errors.ErrTimeoutDueToConnectionLost
+ }
+ default:
+ if !isConnectedSince(s.r.transport, start, lead) {
+ return errors.ErrTimeoutDueToConnectionLost
+ }
+ }
+ return errors.ErrTimeout
+
+ default:
+ return err
+ }
+}
+
+func (s *EtcdServer) KV() mvcc.WatchableKV { return s.kv }
+func (s *EtcdServer) Backend() backend.Backend {
+ s.bemu.RLock()
+ defer s.bemu.RUnlock()
+ return s.be
+}
+
+func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
+
+func (s *EtcdServer) restoreAlarms() error {
+ as, err := v3alarm.NewAlarmStore(s.lg, schema.NewAlarmBackend(s.lg, s.be))
+ if err != nil {
+ return err
+ }
+ s.alarmStore = as
+ return nil
+}
+
+// GoAttach creates a goroutine on a given function and tracks it using
+// the etcdserver waitgroup.
+// The passed function should interrupt on s.StoppingNotify().
+func (s *EtcdServer) GoAttach(f func()) {
+ s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
+ defer s.wgMu.RUnlock()
+ select {
+ case <-s.stopping:
+ lg := s.Logger()
+ lg.Warn("server has stopped; skipping GoAttach")
+ return
+ default:
+ }
+
+ // now safe to add since waitgroup wait has not started yet
+ s.wg.Add(1)
+ go func() {
+ defer s.wg.Done()
+ f()
+ }()
+}
+
+func (s *EtcdServer) Alarms() []*pb.AlarmMember {
+ return s.alarmStore.Get(pb.AlarmType_NONE)
+}
+
+// IsLearner returns if the local member is raft learner
+func (s *EtcdServer) IsLearner() bool {
+ return s.cluster.IsLocalMemberLearner()
+}
+
+// IsMemberExist returns if the member with the given id exists in cluster.
+func (s *EtcdServer) IsMemberExist(id types.ID) bool {
+ return s.cluster.IsMemberExist(id)
+}
+
+// raftStatus returns the raft status of this etcd node.
+func (s *EtcdServer) raftStatus() raft.Status {
+ return s.r.Node.Status()
+}
+
+func (s *EtcdServer) Version() *serverversion.Manager {
+ return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s))
+}
+
+func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
+ return func() {
+ applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
+ if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
+ s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
+ }
+ }
+}
+
+func (s *EtcdServer) CorruptionChecker() CorruptionChecker {
+ return s.corruptionChecker
+}
+
+func addFeatureGateMetrics(fg featuregate.FeatureGate, guageVec *prometheus.GaugeVec) {
+ for feature, featureSpec := range fg.(featuregate.MutableFeatureGate).GetAll() {
+ var metricVal float64
+ if fg.Enabled(feature) {
+ metricVal = 1
+ } else {
+ metricVal = 0
+ }
+ guageVec.With(prometheus.Labels{"name": string(feature), "stage": string(featureSpec.PreRelease)}).Set(metricVal)
+ }
+}