| Abhay Kumar | 40252eb | 2025-10-13 13:25:53 +0000 | [diff] [blame^] | 1 | // Copyright 2021 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package etcdserver |
| 16 | |
| 17 | import ( |
| 18 | "encoding/json" |
| 19 | "errors" |
| 20 | "fmt" |
| 21 | "io" |
| 22 | "net/http" |
| 23 | "os" |
| 24 | "strings" |
| 25 | "time" |
| 26 | |
| 27 | "github.com/coreos/go-semver/semver" |
| 28 | "github.com/dustin/go-humanize" |
| 29 | "go.uber.org/zap" |
| 30 | |
| 31 | "go.etcd.io/etcd/api/v3/etcdserverpb" |
| 32 | "go.etcd.io/etcd/client/pkg/v3/fileutil" |
| 33 | "go.etcd.io/etcd/client/pkg/v3/types" |
| 34 | "go.etcd.io/etcd/pkg/v3/pbutil" |
| 35 | "go.etcd.io/etcd/server/v3/config" |
| 36 | "go.etcd.io/etcd/server/v3/etcdserver/api" |
| 37 | "go.etcd.io/etcd/server/v3/etcdserver/api/membership" |
| 38 | "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" |
| 39 | "go.etcd.io/etcd/server/v3/etcdserver/api/snap" |
| 40 | "go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery" |
| 41 | "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" |
| 42 | "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery" |
| 43 | "go.etcd.io/etcd/server/v3/etcdserver/cindex" |
| 44 | servererrors "go.etcd.io/etcd/server/v3/etcdserver/errors" |
| 45 | serverstorage "go.etcd.io/etcd/server/v3/storage" |
| 46 | "go.etcd.io/etcd/server/v3/storage/backend" |
| 47 | "go.etcd.io/etcd/server/v3/storage/schema" |
| 48 | "go.etcd.io/etcd/server/v3/storage/wal" |
| 49 | "go.etcd.io/etcd/server/v3/storage/wal/walpb" |
| 50 | "go.etcd.io/raft/v3" |
| 51 | "go.etcd.io/raft/v3/raftpb" |
| 52 | ) |
| 53 | |
| 54 | func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { |
| 55 | if cfg.MaxRequestBytes > recommendedMaxRequestBytes { |
| 56 | cfg.Logger.Warn( |
| 57 | "exceeded recommended request limit", |
| 58 | zap.Uint("max-request-bytes", cfg.MaxRequestBytes), |
| 59 | zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))), |
| 60 | zap.Int("recommended-request-bytes", recommendedMaxRequestBytes), |
| 61 | zap.String("recommended-request-size", recommendedMaxRequestBytesString), |
| 62 | ) |
| 63 | } |
| 64 | |
| 65 | if terr := fileutil.TouchDirAll(cfg.Logger, cfg.DataDir); terr != nil { |
| 66 | return nil, fmt.Errorf("cannot access data directory: %w", terr) |
| 67 | } |
| 68 | |
| 69 | if terr := fileutil.TouchDirAll(cfg.Logger, cfg.MemberDir()); terr != nil { |
| 70 | return nil, fmt.Errorf("cannot access member directory: %w", terr) |
| 71 | } |
| 72 | ss := bootstrapSnapshot(cfg) |
| 73 | prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout()) |
| 74 | if err != nil { |
| 75 | return nil, err |
| 76 | } |
| 77 | |
| 78 | haveWAL := wal.Exist(cfg.WALDir()) |
| 79 | st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) |
| 80 | backend, err := bootstrapBackend(cfg, haveWAL, st, ss) |
| 81 | if err != nil { |
| 82 | return nil, err |
| 83 | } |
| 84 | var bwal *bootstrappedWAL |
| 85 | |
| 86 | if haveWAL { |
| 87 | if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil { |
| 88 | return nil, fmt.Errorf("cannot write to WAL directory: %w", err) |
| 89 | } |
| 90 | cfg.Logger.Info("Bootstrapping WAL from snapshot") |
| 91 | bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot, backend.ci) |
| 92 | } |
| 93 | |
| 94 | cfg.Logger.Info("bootstrapping cluster") |
| 95 | cluster, err := bootstrapCluster(cfg, bwal, prt) |
| 96 | if err != nil { |
| 97 | backend.Close() |
| 98 | return nil, err |
| 99 | } |
| 100 | |
| 101 | cfg.Logger.Info("bootstrapping storage") |
| 102 | s := bootstrapStorage(cfg, st, backend, bwal, cluster) |
| 103 | |
| 104 | if err = cluster.Finalize(cfg, s); err != nil { |
| 105 | backend.Close() |
| 106 | return nil, err |
| 107 | } |
| 108 | |
| 109 | cfg.Logger.Info("bootstrapping raft") |
| 110 | raft := bootstrapRaft(cfg, cluster, s.wal) |
| 111 | return &bootstrappedServer{ |
| 112 | prt: prt, |
| 113 | ss: ss, |
| 114 | storage: s, |
| 115 | cluster: cluster, |
| 116 | raft: raft, |
| 117 | }, nil |
| 118 | } |
| 119 | |
| 120 | type bootstrappedServer struct { |
| 121 | storage *bootstrappedStorage |
| 122 | cluster *bootstrappedCluster |
| 123 | raft *bootstrappedRaft |
| 124 | prt http.RoundTripper |
| 125 | ss *snap.Snapshotter |
| 126 | } |
| 127 | |
| 128 | func (s *bootstrappedServer) Close() { |
| 129 | s.storage.Close() |
| 130 | } |
| 131 | |
| 132 | type bootstrappedStorage struct { |
| 133 | backend *bootstrappedBackend |
| 134 | wal *bootstrappedWAL |
| 135 | st v2store.Store |
| 136 | } |
| 137 | |
| 138 | func (s *bootstrappedStorage) Close() { |
| 139 | s.backend.Close() |
| 140 | } |
| 141 | |
| 142 | type bootstrappedBackend struct { |
| 143 | beHooks *serverstorage.BackendHooks |
| 144 | be backend.Backend |
| 145 | ci cindex.ConsistentIndexer |
| 146 | beExist bool |
| 147 | snapshot *raftpb.Snapshot |
| 148 | } |
| 149 | |
| 150 | func (s *bootstrappedBackend) Close() { |
| 151 | s.be.Close() |
| 152 | } |
| 153 | |
| 154 | type bootstrappedCluster struct { |
| 155 | remotes []*membership.Member |
| 156 | cl *membership.RaftCluster |
| 157 | nodeID types.ID |
| 158 | } |
| 159 | |
| 160 | type bootstrappedRaft struct { |
| 161 | lg *zap.Logger |
| 162 | heartbeat time.Duration |
| 163 | |
| 164 | peers []raft.Peer |
| 165 | config *raft.Config |
| 166 | storage *raft.MemoryStorage |
| 167 | } |
| 168 | |
| 169 | func bootstrapStorage(cfg config.ServerConfig, st v2store.Store, be *bootstrappedBackend, wal *bootstrappedWAL, cl *bootstrappedCluster) *bootstrappedStorage { |
| 170 | if wal == nil { |
| 171 | wal = bootstrapNewWAL(cfg, cl) |
| 172 | } |
| 173 | |
| 174 | return &bootstrappedStorage{ |
| 175 | backend: be, |
| 176 | st: st, |
| 177 | wal: wal, |
| 178 | } |
| 179 | } |
| 180 | |
| 181 | func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter { |
| 182 | if err := fileutil.TouchDirAll(cfg.Logger, cfg.SnapDir()); err != nil { |
| 183 | cfg.Logger.Fatal( |
| 184 | "failed to create snapshot directory", |
| 185 | zap.String("path", cfg.SnapDir()), |
| 186 | zap.Error(err), |
| 187 | ) |
| 188 | } |
| 189 | |
| 190 | if err := fileutil.RemoveMatchFile(cfg.Logger, cfg.SnapDir(), func(fileName string) bool { |
| 191 | return strings.HasPrefix(fileName, "tmp") |
| 192 | }); err != nil { |
| 193 | cfg.Logger.Error( |
| 194 | "failed to remove temp file(s) in snapshot directory", |
| 195 | zap.String("path", cfg.SnapDir()), |
| 196 | zap.Error(err), |
| 197 | ) |
| 198 | } |
| 199 | return snap.New(cfg.Logger, cfg.SnapDir()) |
| 200 | } |
| 201 | |
| 202 | func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, ss *snap.Snapshotter) (backend *bootstrappedBackend, err error) { |
| 203 | beExist := fileutil.Exist(cfg.BackendPath()) |
| 204 | ci := cindex.NewConsistentIndex(nil) |
| 205 | beHooks := serverstorage.NewBackendHooks(cfg.Logger, ci) |
| 206 | be := serverstorage.OpenBackend(cfg, beHooks) |
| 207 | defer func() { |
| 208 | if err != nil && be != nil { |
| 209 | be.Close() |
| 210 | } |
| 211 | }() |
| 212 | ci.SetBackend(be) |
| 213 | schema.CreateMetaBucket(be.BatchTx()) |
| 214 | if cfg.BootstrapDefragThresholdMegabytes != 0 { |
| 215 | err = maybeDefragBackend(cfg, be) |
| 216 | if err != nil { |
| 217 | return nil, err |
| 218 | } |
| 219 | } |
| 220 | cfg.Logger.Info("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex())) |
| 221 | |
| 222 | // TODO(serathius): Implement schema setup in fresh storage |
| 223 | var snapshot *raftpb.Snapshot |
| 224 | if haveWAL { |
| 225 | snapshot, be, err = recoverSnapshot(cfg, st, be, beExist, beHooks, ci, ss) |
| 226 | if err != nil { |
| 227 | return nil, err |
| 228 | } |
| 229 | } |
| 230 | if beExist { |
| 231 | s1, s2 := be.Size(), be.SizeInUse() |
| 232 | cfg.Logger.Info( |
| 233 | "recovered v3 backend", |
| 234 | zap.Int64("backend-size-bytes", s1), |
| 235 | zap.String("backend-size", humanize.Bytes(uint64(s1))), |
| 236 | zap.Int64("backend-size-in-use-bytes", s2), |
| 237 | zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), |
| 238 | ) |
| 239 | if err = schema.Validate(cfg.Logger, be.ReadTx()); err != nil { |
| 240 | cfg.Logger.Error("Failed to validate schema", zap.Error(err)) |
| 241 | return nil, err |
| 242 | } |
| 243 | } |
| 244 | |
| 245 | return &bootstrappedBackend{ |
| 246 | beHooks: beHooks, |
| 247 | be: be, |
| 248 | ci: ci, |
| 249 | beExist: beExist, |
| 250 | snapshot: snapshot, |
| 251 | }, nil |
| 252 | } |
| 253 | |
| 254 | func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { |
| 255 | size := be.Size() |
| 256 | sizeInUse := be.SizeInUse() |
| 257 | freeableMemory := uint(size - sizeInUse) |
| 258 | thresholdBytes := cfg.BootstrapDefragThresholdMegabytes * 1024 * 1024 |
| 259 | if freeableMemory < thresholdBytes { |
| 260 | cfg.Logger.Info("Skipping defragmentation", |
| 261 | zap.Int64("current-db-size-bytes", size), |
| 262 | zap.String("current-db-size", humanize.Bytes(uint64(size))), |
| 263 | zap.Int64("current-db-size-in-use-bytes", sizeInUse), |
| 264 | zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse))), |
| 265 | zap.Uint("experimental-bootstrap-defrag-threshold-bytes", thresholdBytes), |
| 266 | zap.String("experimental-bootstrap-defrag-threshold", humanize.Bytes(uint64(thresholdBytes))), |
| 267 | ) |
| 268 | return nil |
| 269 | } |
| 270 | return be.Defrag() |
| 271 | } |
| 272 | |
| 273 | func bootstrapCluster(cfg config.ServerConfig, bwal *bootstrappedWAL, prt http.RoundTripper) (c *bootstrappedCluster, err error) { |
| 274 | switch { |
| 275 | case bwal == nil && !cfg.NewCluster: |
| 276 | c, err = bootstrapExistingClusterNoWAL(cfg, prt) |
| 277 | case bwal == nil && cfg.NewCluster: |
| 278 | c, err = bootstrapNewClusterNoWAL(cfg, prt) |
| 279 | case bwal != nil && bwal.haveWAL: |
| 280 | c, err = bootstrapClusterWithWAL(cfg, bwal.meta) |
| 281 | default: |
| 282 | return nil, fmt.Errorf("unsupported bootstrap config") |
| 283 | } |
| 284 | if err != nil { |
| 285 | return nil, err |
| 286 | } |
| 287 | return c, nil |
| 288 | } |
| 289 | |
| 290 | func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*bootstrappedCluster, error) { |
| 291 | if err := cfg.VerifyJoinExisting(); err != nil { |
| 292 | return nil, err |
| 293 | } |
| 294 | cl, err := membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap, membership.WithMaxLearners(cfg.MaxLearners)) |
| 295 | if err != nil { |
| 296 | return nil, err |
| 297 | } |
| 298 | existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt) |
| 299 | if gerr != nil { |
| 300 | return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %w", gerr) |
| 301 | } |
| 302 | if err := membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil { |
| 303 | return nil, fmt.Errorf("error validating peerURLs %s: %w", existingCluster, err) |
| 304 | } |
| 305 | if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt, cfg.ReqTimeout()) { |
| 306 | return nil, fmt.Errorf("incompatible with current running cluster") |
| 307 | } |
| 308 | scaleUpLearners := false |
| 309 | if err := membership.ValidateMaxLearnerConfig(cfg.MaxLearners, existingCluster.Members(), scaleUpLearners); err != nil { |
| 310 | return nil, err |
| 311 | } |
| 312 | remotes := existingCluster.Members() |
| 313 | cl.SetID(types.ID(0), existingCluster.ID()) |
| 314 | member := cl.MemberByName(cfg.Name) |
| 315 | return &bootstrappedCluster{ |
| 316 | remotes: remotes, |
| 317 | cl: cl, |
| 318 | nodeID: member.ID, |
| 319 | }, nil |
| 320 | } |
| 321 | |
| 322 | func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*bootstrappedCluster, error) { |
| 323 | if err := cfg.VerifyBootstrap(); err != nil { |
| 324 | return nil, err |
| 325 | } |
| 326 | cl, err := membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap, membership.WithMaxLearners(cfg.MaxLearners)) |
| 327 | if err != nil { |
| 328 | return nil, err |
| 329 | } |
| 330 | m := cl.MemberByName(cfg.Name) |
| 331 | if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.BootstrapTimeoutEffective()) { |
| 332 | return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) |
| 333 | } |
| 334 | if cfg.ShouldDiscover() { |
| 335 | var str string |
| 336 | if cfg.DiscoveryURL != "" { |
| 337 | cfg.Logger.Warn("V2 discovery is deprecated!") |
| 338 | str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) |
| 339 | } else { |
| 340 | cfg.Logger.Info("Bootstrapping cluster using v3 discovery.") |
| 341 | str, err = v3discovery.JoinCluster(cfg.Logger, &cfg.DiscoveryCfg, m.ID, cfg.InitialPeerURLsMap.String()) |
| 342 | } |
| 343 | if err != nil { |
| 344 | return nil, &servererrors.DiscoveryError{Op: "join", Err: err} |
| 345 | } |
| 346 | var urlsmap types.URLsMap |
| 347 | urlsmap, err = types.NewURLsMap(str) |
| 348 | if err != nil { |
| 349 | return nil, err |
| 350 | } |
| 351 | if config.CheckDuplicateURL(urlsmap) { |
| 352 | return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) |
| 353 | } |
| 354 | if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap, membership.WithMaxLearners(cfg.MaxLearners)); err != nil { |
| 355 | return nil, err |
| 356 | } |
| 357 | } |
| 358 | return &bootstrappedCluster{ |
| 359 | remotes: nil, |
| 360 | cl: cl, |
| 361 | nodeID: m.ID, |
| 362 | }, nil |
| 363 | } |
| 364 | |
| 365 | func bootstrapClusterWithWAL(cfg config.ServerConfig, meta *snapshotMetadata) (*bootstrappedCluster, error) { |
| 366 | if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { |
| 367 | return nil, fmt.Errorf("cannot write to member directory: %w", err) |
| 368 | } |
| 369 | |
| 370 | if cfg.ShouldDiscover() { |
| 371 | cfg.Logger.Warn( |
| 372 | "discovery token is ignored since cluster already initialized; valid logs are found", |
| 373 | zap.String("wal-dir", cfg.WALDir()), |
| 374 | ) |
| 375 | } |
| 376 | cl := membership.NewCluster(cfg.Logger, membership.WithMaxLearners(cfg.MaxLearners)) |
| 377 | |
| 378 | scaleUpLearners := false |
| 379 | if err := membership.ValidateMaxLearnerConfig(cfg.MaxLearners, cl.Members(), scaleUpLearners); err != nil { |
| 380 | return nil, err |
| 381 | } |
| 382 | |
| 383 | cl.SetID(meta.nodeID, meta.clusterID) |
| 384 | return &bootstrappedCluster{ |
| 385 | cl: cl, |
| 386 | nodeID: meta.nodeID, |
| 387 | }, nil |
| 388 | } |
| 389 | |
| 390 | func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backend, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer, ss *snap.Snapshotter) (*raftpb.Snapshot, backend.Backend, error) { |
| 391 | // Find a snapshot to start/restart a raft node |
| 392 | walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir()) |
| 393 | if err != nil { |
| 394 | return nil, be, err |
| 395 | } |
| 396 | // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding |
| 397 | // bwal log entries |
| 398 | snapshot, err := ss.LoadNewestAvailable(walSnaps) |
| 399 | if err != nil && !errors.Is(err, snap.ErrNoSnapshot) { |
| 400 | return nil, be, err |
| 401 | } |
| 402 | |
| 403 | if snapshot != nil { |
| 404 | if err = st.Recovery(snapshot.Data); err != nil { |
| 405 | cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) |
| 406 | } |
| 407 | |
| 408 | if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { |
| 409 | cfg.Logger.Error("illegal v2store content", zap.Error(err)) |
| 410 | return nil, be, err |
| 411 | } |
| 412 | |
| 413 | cfg.Logger.Info( |
| 414 | "recovered v2 store from snapshot", |
| 415 | zap.Uint64("snapshot-index", snapshot.Metadata.Index), |
| 416 | zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), |
| 417 | ) |
| 418 | |
| 419 | if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { |
| 420 | cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) |
| 421 | } |
| 422 | // A snapshot db may have already been recovered, and the old db should have |
| 423 | // already been closed in this case, so we should set the backend again. |
| 424 | ci.SetBackend(be) |
| 425 | |
| 426 | if beExist { |
| 427 | // TODO: remove kvindex != 0 checking when we do not expect users to upgrade |
| 428 | // etcd from pre-3.0 release. |
| 429 | kvindex := ci.ConsistentIndex() |
| 430 | if kvindex < snapshot.Metadata.Index { |
| 431 | if kvindex != 0 { |
| 432 | return nil, be, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) |
| 433 | } |
| 434 | cfg.Logger.Warn( |
| 435 | "consistent index was never saved", |
| 436 | zap.Uint64("snapshot-index", snapshot.Metadata.Index), |
| 437 | ) |
| 438 | } |
| 439 | } |
| 440 | } else { |
| 441 | cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") |
| 442 | } |
| 443 | return snapshot, be, nil |
| 444 | } |
| 445 | |
| 446 | func (c *bootstrappedCluster) Finalize(cfg config.ServerConfig, s *bootstrappedStorage) error { |
| 447 | if !s.wal.haveWAL { |
| 448 | c.cl.SetID(c.nodeID, c.cl.ID()) |
| 449 | } |
| 450 | c.cl.SetStore(s.st) |
| 451 | c.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, s.backend.be)) |
| 452 | |
| 453 | // Workaround the issues which have already been affected |
| 454 | // by https://github.com/etcd-io/etcd/issues/19557. |
| 455 | c.cl.SyncLearnerPromotionIfNeeded() |
| 456 | |
| 457 | if s.wal.haveWAL { |
| 458 | c.cl.Recover(api.UpdateCapability) |
| 459 | if c.databaseFileMissing(s) { |
| 460 | bepath := cfg.BackendPath() |
| 461 | os.RemoveAll(bepath) |
| 462 | return fmt.Errorf("database file (%v) of the backend is missing", bepath) |
| 463 | } |
| 464 | } |
| 465 | scaleUpLearners := false |
| 466 | return membership.ValidateMaxLearnerConfig(cfg.MaxLearners, c.cl.Members(), scaleUpLearners) |
| 467 | } |
| 468 | |
| 469 | func (c *bootstrappedCluster) databaseFileMissing(s *bootstrappedStorage) bool { |
| 470 | v3Cluster := c.cl.Version() != nil && !c.cl.Version().LessThan(semver.Version{Major: 3}) |
| 471 | return v3Cluster && !s.backend.beExist |
| 472 | } |
| 473 | |
| 474 | func bootstrapRaft(cfg config.ServerConfig, cluster *bootstrappedCluster, bwal *bootstrappedWAL) *bootstrappedRaft { |
| 475 | switch { |
| 476 | case !bwal.haveWAL && !cfg.NewCluster: |
| 477 | return bootstrapRaftFromCluster(cfg, cluster.cl, nil, bwal) |
| 478 | case !bwal.haveWAL && cfg.NewCluster: |
| 479 | return bootstrapRaftFromCluster(cfg, cluster.cl, cluster.cl.MemberIDs(), bwal) |
| 480 | case bwal.haveWAL: |
| 481 | return bootstrapRaftFromWAL(cfg, bwal) |
| 482 | default: |
| 483 | cfg.Logger.Panic("unsupported bootstrap config") |
| 484 | return nil |
| 485 | } |
| 486 | } |
| 487 | |
| 488 | func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft { |
| 489 | member := cl.MemberByName(cfg.Name) |
| 490 | peers := make([]raft.Peer, len(ids)) |
| 491 | for i, id := range ids { |
| 492 | var ctx []byte |
| 493 | ctx, err := json.Marshal((*cl).Member(id)) |
| 494 | if err != nil { |
| 495 | cfg.Logger.Panic("failed to marshal member", zap.Error(err)) |
| 496 | } |
| 497 | peers[i] = raft.Peer{ID: uint64(id), Context: ctx} |
| 498 | } |
| 499 | cfg.Logger.Info( |
| 500 | "starting local member", |
| 501 | zap.String("local-member-id", member.ID.String()), |
| 502 | zap.String("cluster-id", cl.ID().String()), |
| 503 | ) |
| 504 | s := bwal.MemoryStorage() |
| 505 | return &bootstrappedRaft{ |
| 506 | lg: cfg.Logger, |
| 507 | heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, |
| 508 | config: raftConfig(cfg, uint64(member.ID), s), |
| 509 | peers: peers, |
| 510 | storage: s, |
| 511 | } |
| 512 | } |
| 513 | |
| 514 | func bootstrapRaftFromWAL(cfg config.ServerConfig, bwal *bootstrappedWAL) *bootstrappedRaft { |
| 515 | s := bwal.MemoryStorage() |
| 516 | return &bootstrappedRaft{ |
| 517 | lg: cfg.Logger, |
| 518 | heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, |
| 519 | config: raftConfig(cfg, uint64(bwal.meta.nodeID), s), |
| 520 | storage: s, |
| 521 | } |
| 522 | } |
| 523 | |
| 524 | func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft.Config { |
| 525 | return &raft.Config{ |
| 526 | ID: id, |
| 527 | ElectionTick: cfg.ElectionTicks, |
| 528 | HeartbeatTick: 1, |
| 529 | Storage: s, |
| 530 | MaxSizePerMsg: maxSizePerMsg, |
| 531 | MaxInflightMsgs: maxInflightMsgs, |
| 532 | CheckQuorum: true, |
| 533 | PreVote: cfg.PreVote, |
| 534 | Logger: NewRaftLoggerZap(cfg.Logger.Named("raft")), |
| 535 | } |
| 536 | } |
| 537 | |
| 538 | func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *membership.RaftCluster) *raftNode { |
| 539 | var n raft.Node |
| 540 | if len(b.peers) == 0 { |
| 541 | n = raft.RestartNode(b.config) |
| 542 | } else { |
| 543 | n = raft.StartNode(b.config, b.peers) |
| 544 | } |
| 545 | raftStatusMu.Lock() |
| 546 | raftStatus = n.Status |
| 547 | raftStatusMu.Unlock() |
| 548 | return newRaftNode( |
| 549 | raftNodeConfig{ |
| 550 | lg: b.lg, |
| 551 | isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, |
| 552 | Node: n, |
| 553 | heartbeat: b.heartbeat, |
| 554 | raftStorage: b.storage, |
| 555 | storage: serverstorage.NewStorage(b.lg, wal, ss), |
| 556 | }, |
| 557 | ) |
| 558 | } |
| 559 | |
| 560 | func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot, ci cindex.ConsistentIndexer) *bootstrappedWAL { |
| 561 | wal, st, ents, snap, meta := openWALFromSnapshot(cfg, snapshot) |
| 562 | bwal := &bootstrappedWAL{ |
| 563 | lg: cfg.Logger, |
| 564 | w: wal, |
| 565 | st: st, |
| 566 | ents: ents, |
| 567 | snapshot: snap, |
| 568 | meta: meta, |
| 569 | haveWAL: true, |
| 570 | } |
| 571 | |
| 572 | if cfg.ForceNewCluster { |
| 573 | consistentIndex := ci.ConsistentIndex() |
| 574 | oldCommitIndex := bwal.st.Commit |
| 575 | // If only `HardState.Commit` increases, HardState won't be persisted |
| 576 | // to disk, even though the committed entries might have already been |
| 577 | // applied. This can result in consistent_index > CommitIndex. |
| 578 | // |
| 579 | // When restarting etcd with `--force-new-cluster`, all uncommitted |
| 580 | // entries are dropped. To avoid losing entries that were actually |
| 581 | // committed, we reset Commit to max(HardState.Commit, consistent_index). |
| 582 | // |
| 583 | // See: https://github.com/etcd-io/raft/pull/300 for more details. |
| 584 | bwal.st.Commit = max(oldCommitIndex, consistentIndex) |
| 585 | |
| 586 | // discard the previously uncommitted entries |
| 587 | bwal.ents = bwal.CommitedEntries() |
| 588 | entries := bwal.NewConfigChangeEntries() |
| 589 | // force commit config change entries |
| 590 | bwal.AppendAndCommitEntries(entries) |
| 591 | cfg.Logger.Info( |
| 592 | "forcing restart member", |
| 593 | zap.String("cluster-id", meta.clusterID.String()), |
| 594 | zap.String("local-member-id", meta.nodeID.String()), |
| 595 | zap.Uint64("wal-commit-index", oldCommitIndex), |
| 596 | zap.Uint64("commit-index", bwal.st.Commit), |
| 597 | ) |
| 598 | } else { |
| 599 | cfg.Logger.Info( |
| 600 | "restarting local member", |
| 601 | zap.String("cluster-id", meta.clusterID.String()), |
| 602 | zap.String("local-member-id", meta.nodeID.String()), |
| 603 | zap.Uint64("commit-index", bwal.st.Commit), |
| 604 | ) |
| 605 | } |
| 606 | return bwal |
| 607 | } |
| 608 | |
| 609 | // openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear |
| 610 | // after the position of the given snap in the WAL. |
| 611 | // The snap must have been previously saved to the WAL, or this call will panic. |
| 612 | func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*wal.WAL, *raftpb.HardState, []raftpb.Entry, *raftpb.Snapshot, *snapshotMetadata) { |
| 613 | var walsnap walpb.Snapshot |
| 614 | if snapshot != nil { |
| 615 | walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term |
| 616 | } |
| 617 | repaired := false |
| 618 | for { |
| 619 | w, err := wal.Open(cfg.Logger, cfg.WALDir(), walsnap) |
| 620 | if err != nil { |
| 621 | cfg.Logger.Fatal("failed to open WAL", zap.Error(err)) |
| 622 | } |
| 623 | if cfg.UnsafeNoFsync { |
| 624 | w.SetUnsafeNoFsync() |
| 625 | } |
| 626 | wmetadata, st, ents, err := w.ReadAll() |
| 627 | if err != nil { |
| 628 | w.Close() |
| 629 | // we can only repair ErrUnexpectedEOF and we never repair twice. |
| 630 | if repaired || !errors.Is(err, io.ErrUnexpectedEOF) { |
| 631 | cfg.Logger.Fatal("failed to read WAL, cannot be repaired", zap.Error(err)) |
| 632 | } |
| 633 | if !wal.Repair(cfg.Logger, cfg.WALDir()) { |
| 634 | cfg.Logger.Fatal("failed to repair WAL", zap.Error(err)) |
| 635 | } else { |
| 636 | cfg.Logger.Info("repaired WAL", zap.Error(err)) |
| 637 | repaired = true |
| 638 | } |
| 639 | continue |
| 640 | } |
| 641 | var metadata etcdserverpb.Metadata |
| 642 | pbutil.MustUnmarshal(&metadata, wmetadata) |
| 643 | id := types.ID(metadata.NodeID) |
| 644 | cid := types.ID(metadata.ClusterID) |
| 645 | meta := &snapshotMetadata{clusterID: cid, nodeID: id} |
| 646 | return w, &st, ents, snapshot, meta |
| 647 | } |
| 648 | } |
| 649 | |
| 650 | type snapshotMetadata struct { |
| 651 | nodeID, clusterID types.ID |
| 652 | } |
| 653 | |
| 654 | func bootstrapNewWAL(cfg config.ServerConfig, cl *bootstrappedCluster) *bootstrappedWAL { |
| 655 | metadata := pbutil.MustMarshal( |
| 656 | &etcdserverpb.Metadata{ |
| 657 | NodeID: uint64(cl.nodeID), |
| 658 | ClusterID: uint64(cl.cl.ID()), |
| 659 | }, |
| 660 | ) |
| 661 | w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) |
| 662 | if err != nil { |
| 663 | cfg.Logger.Panic("failed to create WAL", zap.Error(err)) |
| 664 | } |
| 665 | if cfg.UnsafeNoFsync { |
| 666 | w.SetUnsafeNoFsync() |
| 667 | } |
| 668 | return &bootstrappedWAL{ |
| 669 | lg: cfg.Logger, |
| 670 | w: w, |
| 671 | } |
| 672 | } |
| 673 | |
| 674 | type bootstrappedWAL struct { |
| 675 | lg *zap.Logger |
| 676 | |
| 677 | haveWAL bool |
| 678 | w *wal.WAL |
| 679 | st *raftpb.HardState |
| 680 | ents []raftpb.Entry |
| 681 | snapshot *raftpb.Snapshot |
| 682 | meta *snapshotMetadata |
| 683 | } |
| 684 | |
| 685 | func (wal *bootstrappedWAL) MemoryStorage() *raft.MemoryStorage { |
| 686 | s := raft.NewMemoryStorage() |
| 687 | if wal.snapshot != nil { |
| 688 | s.ApplySnapshot(*wal.snapshot) |
| 689 | } |
| 690 | if wal.st != nil { |
| 691 | s.SetHardState(*wal.st) |
| 692 | } |
| 693 | if len(wal.ents) != 0 { |
| 694 | s.Append(wal.ents) |
| 695 | } |
| 696 | return s |
| 697 | } |
| 698 | |
| 699 | func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry { |
| 700 | for i, ent := range wal.ents { |
| 701 | if ent.Index > wal.st.Commit { |
| 702 | wal.lg.Info( |
| 703 | "discarding uncommitted WAL entries", |
| 704 | zap.Uint64("entry-index", ent.Index), |
| 705 | zap.Uint64("commit-index-from-wal", wal.st.Commit), |
| 706 | zap.Int("number-of-discarded-entries", len(wal.ents)-i), |
| 707 | ) |
| 708 | return wal.ents[:i] |
| 709 | } |
| 710 | } |
| 711 | return wal.ents |
| 712 | } |
| 713 | |
| 714 | func (wal *bootstrappedWAL) NewConfigChangeEntries() []raftpb.Entry { |
| 715 | return serverstorage.CreateConfigChangeEnts( |
| 716 | wal.lg, |
| 717 | serverstorage.GetEffectiveNodeIDsFromWALEntries(wal.lg, wal.snapshot, wal.ents), |
| 718 | uint64(wal.meta.nodeID), |
| 719 | wal.st.Term, |
| 720 | wal.st.Commit, |
| 721 | ) |
| 722 | } |
| 723 | |
| 724 | func (wal *bootstrappedWAL) AppendAndCommitEntries(ents []raftpb.Entry) { |
| 725 | wal.ents = append(wal.ents, ents...) |
| 726 | err := wal.w.Save(raftpb.HardState{}, ents) |
| 727 | if err != nil { |
| 728 | wal.lg.Fatal("failed to save hard state and entries", zap.Error(err)) |
| 729 | } |
| 730 | if len(wal.ents) != 0 { |
| 731 | wal.st.Commit = wal.ents[len(wal.ents)-1].Index |
| 732 | } |
| 733 | } |