blob: b76d979efea725547625477e013917c38db41dbd [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2021 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "encoding/json"
19 "errors"
20 "fmt"
21 "io"
22 "net/http"
23 "os"
24 "strings"
25 "time"
26
27 "github.com/coreos/go-semver/semver"
28 "github.com/dustin/go-humanize"
29 "go.uber.org/zap"
30
31 "go.etcd.io/etcd/api/v3/etcdserverpb"
32 "go.etcd.io/etcd/client/pkg/v3/fileutil"
33 "go.etcd.io/etcd/client/pkg/v3/types"
34 "go.etcd.io/etcd/pkg/v3/pbutil"
35 "go.etcd.io/etcd/server/v3/config"
36 "go.etcd.io/etcd/server/v3/etcdserver/api"
37 "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
38 "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
39 "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
40 "go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery"
41 "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
42 "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery"
43 "go.etcd.io/etcd/server/v3/etcdserver/cindex"
44 servererrors "go.etcd.io/etcd/server/v3/etcdserver/errors"
45 serverstorage "go.etcd.io/etcd/server/v3/storage"
46 "go.etcd.io/etcd/server/v3/storage/backend"
47 "go.etcd.io/etcd/server/v3/storage/schema"
48 "go.etcd.io/etcd/server/v3/storage/wal"
49 "go.etcd.io/etcd/server/v3/storage/wal/walpb"
50 "go.etcd.io/raft/v3"
51 "go.etcd.io/raft/v3/raftpb"
52)
53
54func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
55 if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
56 cfg.Logger.Warn(
57 "exceeded recommended request limit",
58 zap.Uint("max-request-bytes", cfg.MaxRequestBytes),
59 zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))),
60 zap.Int("recommended-request-bytes", recommendedMaxRequestBytes),
61 zap.String("recommended-request-size", recommendedMaxRequestBytesString),
62 )
63 }
64
65 if terr := fileutil.TouchDirAll(cfg.Logger, cfg.DataDir); terr != nil {
66 return nil, fmt.Errorf("cannot access data directory: %w", terr)
67 }
68
69 if terr := fileutil.TouchDirAll(cfg.Logger, cfg.MemberDir()); terr != nil {
70 return nil, fmt.Errorf("cannot access member directory: %w", terr)
71 }
72 ss := bootstrapSnapshot(cfg)
73 prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
74 if err != nil {
75 return nil, err
76 }
77
78 haveWAL := wal.Exist(cfg.WALDir())
79 st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
80 backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
81 if err != nil {
82 return nil, err
83 }
84 var bwal *bootstrappedWAL
85
86 if haveWAL {
87 if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
88 return nil, fmt.Errorf("cannot write to WAL directory: %w", err)
89 }
90 cfg.Logger.Info("Bootstrapping WAL from snapshot")
91 bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot, backend.ci)
92 }
93
94 cfg.Logger.Info("bootstrapping cluster")
95 cluster, err := bootstrapCluster(cfg, bwal, prt)
96 if err != nil {
97 backend.Close()
98 return nil, err
99 }
100
101 cfg.Logger.Info("bootstrapping storage")
102 s := bootstrapStorage(cfg, st, backend, bwal, cluster)
103
104 if err = cluster.Finalize(cfg, s); err != nil {
105 backend.Close()
106 return nil, err
107 }
108
109 cfg.Logger.Info("bootstrapping raft")
110 raft := bootstrapRaft(cfg, cluster, s.wal)
111 return &bootstrappedServer{
112 prt: prt,
113 ss: ss,
114 storage: s,
115 cluster: cluster,
116 raft: raft,
117 }, nil
118}
119
120type bootstrappedServer struct {
121 storage *bootstrappedStorage
122 cluster *bootstrappedCluster
123 raft *bootstrappedRaft
124 prt http.RoundTripper
125 ss *snap.Snapshotter
126}
127
128func (s *bootstrappedServer) Close() {
129 s.storage.Close()
130}
131
132type bootstrappedStorage struct {
133 backend *bootstrappedBackend
134 wal *bootstrappedWAL
135 st v2store.Store
136}
137
138func (s *bootstrappedStorage) Close() {
139 s.backend.Close()
140}
141
142type bootstrappedBackend struct {
143 beHooks *serverstorage.BackendHooks
144 be backend.Backend
145 ci cindex.ConsistentIndexer
146 beExist bool
147 snapshot *raftpb.Snapshot
148}
149
150func (s *bootstrappedBackend) Close() {
151 s.be.Close()
152}
153
154type bootstrappedCluster struct {
155 remotes []*membership.Member
156 cl *membership.RaftCluster
157 nodeID types.ID
158}
159
160type bootstrappedRaft struct {
161 lg *zap.Logger
162 heartbeat time.Duration
163
164 peers []raft.Peer
165 config *raft.Config
166 storage *raft.MemoryStorage
167}
168
169func bootstrapStorage(cfg config.ServerConfig, st v2store.Store, be *bootstrappedBackend, wal *bootstrappedWAL, cl *bootstrappedCluster) *bootstrappedStorage {
170 if wal == nil {
171 wal = bootstrapNewWAL(cfg, cl)
172 }
173
174 return &bootstrappedStorage{
175 backend: be,
176 st: st,
177 wal: wal,
178 }
179}
180
181func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter {
182 if err := fileutil.TouchDirAll(cfg.Logger, cfg.SnapDir()); err != nil {
183 cfg.Logger.Fatal(
184 "failed to create snapshot directory",
185 zap.String("path", cfg.SnapDir()),
186 zap.Error(err),
187 )
188 }
189
190 if err := fileutil.RemoveMatchFile(cfg.Logger, cfg.SnapDir(), func(fileName string) bool {
191 return strings.HasPrefix(fileName, "tmp")
192 }); err != nil {
193 cfg.Logger.Error(
194 "failed to remove temp file(s) in snapshot directory",
195 zap.String("path", cfg.SnapDir()),
196 zap.Error(err),
197 )
198 }
199 return snap.New(cfg.Logger, cfg.SnapDir())
200}
201
202func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, ss *snap.Snapshotter) (backend *bootstrappedBackend, err error) {
203 beExist := fileutil.Exist(cfg.BackendPath())
204 ci := cindex.NewConsistentIndex(nil)
205 beHooks := serverstorage.NewBackendHooks(cfg.Logger, ci)
206 be := serverstorage.OpenBackend(cfg, beHooks)
207 defer func() {
208 if err != nil && be != nil {
209 be.Close()
210 }
211 }()
212 ci.SetBackend(be)
213 schema.CreateMetaBucket(be.BatchTx())
214 if cfg.BootstrapDefragThresholdMegabytes != 0 {
215 err = maybeDefragBackend(cfg, be)
216 if err != nil {
217 return nil, err
218 }
219 }
220 cfg.Logger.Info("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex()))
221
222 // TODO(serathius): Implement schema setup in fresh storage
223 var snapshot *raftpb.Snapshot
224 if haveWAL {
225 snapshot, be, err = recoverSnapshot(cfg, st, be, beExist, beHooks, ci, ss)
226 if err != nil {
227 return nil, err
228 }
229 }
230 if beExist {
231 s1, s2 := be.Size(), be.SizeInUse()
232 cfg.Logger.Info(
233 "recovered v3 backend",
234 zap.Int64("backend-size-bytes", s1),
235 zap.String("backend-size", humanize.Bytes(uint64(s1))),
236 zap.Int64("backend-size-in-use-bytes", s2),
237 zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
238 )
239 if err = schema.Validate(cfg.Logger, be.ReadTx()); err != nil {
240 cfg.Logger.Error("Failed to validate schema", zap.Error(err))
241 return nil, err
242 }
243 }
244
245 return &bootstrappedBackend{
246 beHooks: beHooks,
247 be: be,
248 ci: ci,
249 beExist: beExist,
250 snapshot: snapshot,
251 }, nil
252}
253
254func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
255 size := be.Size()
256 sizeInUse := be.SizeInUse()
257 freeableMemory := uint(size - sizeInUse)
258 thresholdBytes := cfg.BootstrapDefragThresholdMegabytes * 1024 * 1024
259 if freeableMemory < thresholdBytes {
260 cfg.Logger.Info("Skipping defragmentation",
261 zap.Int64("current-db-size-bytes", size),
262 zap.String("current-db-size", humanize.Bytes(uint64(size))),
263 zap.Int64("current-db-size-in-use-bytes", sizeInUse),
264 zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse))),
265 zap.Uint("experimental-bootstrap-defrag-threshold-bytes", thresholdBytes),
266 zap.String("experimental-bootstrap-defrag-threshold", humanize.Bytes(uint64(thresholdBytes))),
267 )
268 return nil
269 }
270 return be.Defrag()
271}
272
273func bootstrapCluster(cfg config.ServerConfig, bwal *bootstrappedWAL, prt http.RoundTripper) (c *bootstrappedCluster, err error) {
274 switch {
275 case bwal == nil && !cfg.NewCluster:
276 c, err = bootstrapExistingClusterNoWAL(cfg, prt)
277 case bwal == nil && cfg.NewCluster:
278 c, err = bootstrapNewClusterNoWAL(cfg, prt)
279 case bwal != nil && bwal.haveWAL:
280 c, err = bootstrapClusterWithWAL(cfg, bwal.meta)
281 default:
282 return nil, fmt.Errorf("unsupported bootstrap config")
283 }
284 if err != nil {
285 return nil, err
286 }
287 return c, nil
288}
289
290func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*bootstrappedCluster, error) {
291 if err := cfg.VerifyJoinExisting(); err != nil {
292 return nil, err
293 }
294 cl, err := membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap, membership.WithMaxLearners(cfg.MaxLearners))
295 if err != nil {
296 return nil, err
297 }
298 existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt)
299 if gerr != nil {
300 return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %w", gerr)
301 }
302 if err := membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil {
303 return nil, fmt.Errorf("error validating peerURLs %s: %w", existingCluster, err)
304 }
305 if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt, cfg.ReqTimeout()) {
306 return nil, fmt.Errorf("incompatible with current running cluster")
307 }
308 scaleUpLearners := false
309 if err := membership.ValidateMaxLearnerConfig(cfg.MaxLearners, existingCluster.Members(), scaleUpLearners); err != nil {
310 return nil, err
311 }
312 remotes := existingCluster.Members()
313 cl.SetID(types.ID(0), existingCluster.ID())
314 member := cl.MemberByName(cfg.Name)
315 return &bootstrappedCluster{
316 remotes: remotes,
317 cl: cl,
318 nodeID: member.ID,
319 }, nil
320}
321
322func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*bootstrappedCluster, error) {
323 if err := cfg.VerifyBootstrap(); err != nil {
324 return nil, err
325 }
326 cl, err := membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap, membership.WithMaxLearners(cfg.MaxLearners))
327 if err != nil {
328 return nil, err
329 }
330 m := cl.MemberByName(cfg.Name)
331 if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.BootstrapTimeoutEffective()) {
332 return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
333 }
334 if cfg.ShouldDiscover() {
335 var str string
336 if cfg.DiscoveryURL != "" {
337 cfg.Logger.Warn("V2 discovery is deprecated!")
338 str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
339 } else {
340 cfg.Logger.Info("Bootstrapping cluster using v3 discovery.")
341 str, err = v3discovery.JoinCluster(cfg.Logger, &cfg.DiscoveryCfg, m.ID, cfg.InitialPeerURLsMap.String())
342 }
343 if err != nil {
344 return nil, &servererrors.DiscoveryError{Op: "join", Err: err}
345 }
346 var urlsmap types.URLsMap
347 urlsmap, err = types.NewURLsMap(str)
348 if err != nil {
349 return nil, err
350 }
351 if config.CheckDuplicateURL(urlsmap) {
352 return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
353 }
354 if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap, membership.WithMaxLearners(cfg.MaxLearners)); err != nil {
355 return nil, err
356 }
357 }
358 return &bootstrappedCluster{
359 remotes: nil,
360 cl: cl,
361 nodeID: m.ID,
362 }, nil
363}
364
365func bootstrapClusterWithWAL(cfg config.ServerConfig, meta *snapshotMetadata) (*bootstrappedCluster, error) {
366 if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
367 return nil, fmt.Errorf("cannot write to member directory: %w", err)
368 }
369
370 if cfg.ShouldDiscover() {
371 cfg.Logger.Warn(
372 "discovery token is ignored since cluster already initialized; valid logs are found",
373 zap.String("wal-dir", cfg.WALDir()),
374 )
375 }
376 cl := membership.NewCluster(cfg.Logger, membership.WithMaxLearners(cfg.MaxLearners))
377
378 scaleUpLearners := false
379 if err := membership.ValidateMaxLearnerConfig(cfg.MaxLearners, cl.Members(), scaleUpLearners); err != nil {
380 return nil, err
381 }
382
383 cl.SetID(meta.nodeID, meta.clusterID)
384 return &bootstrappedCluster{
385 cl: cl,
386 nodeID: meta.nodeID,
387 }, nil
388}
389
390func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backend, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer, ss *snap.Snapshotter) (*raftpb.Snapshot, backend.Backend, error) {
391 // Find a snapshot to start/restart a raft node
392 walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
393 if err != nil {
394 return nil, be, err
395 }
396 // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
397 // bwal log entries
398 snapshot, err := ss.LoadNewestAvailable(walSnaps)
399 if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
400 return nil, be, err
401 }
402
403 if snapshot != nil {
404 if err = st.Recovery(snapshot.Data); err != nil {
405 cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
406 }
407
408 if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
409 cfg.Logger.Error("illegal v2store content", zap.Error(err))
410 return nil, be, err
411 }
412
413 cfg.Logger.Info(
414 "recovered v2 store from snapshot",
415 zap.Uint64("snapshot-index", snapshot.Metadata.Index),
416 zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
417 )
418
419 if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
420 cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
421 }
422 // A snapshot db may have already been recovered, and the old db should have
423 // already been closed in this case, so we should set the backend again.
424 ci.SetBackend(be)
425
426 if beExist {
427 // TODO: remove kvindex != 0 checking when we do not expect users to upgrade
428 // etcd from pre-3.0 release.
429 kvindex := ci.ConsistentIndex()
430 if kvindex < snapshot.Metadata.Index {
431 if kvindex != 0 {
432 return nil, be, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index)
433 }
434 cfg.Logger.Warn(
435 "consistent index was never saved",
436 zap.Uint64("snapshot-index", snapshot.Metadata.Index),
437 )
438 }
439 }
440 } else {
441 cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
442 }
443 return snapshot, be, nil
444}
445
446func (c *bootstrappedCluster) Finalize(cfg config.ServerConfig, s *bootstrappedStorage) error {
447 if !s.wal.haveWAL {
448 c.cl.SetID(c.nodeID, c.cl.ID())
449 }
450 c.cl.SetStore(s.st)
451 c.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, s.backend.be))
452
453 // Workaround the issues which have already been affected
454 // by https://github.com/etcd-io/etcd/issues/19557.
455 c.cl.SyncLearnerPromotionIfNeeded()
456
457 if s.wal.haveWAL {
458 c.cl.Recover(api.UpdateCapability)
459 if c.databaseFileMissing(s) {
460 bepath := cfg.BackendPath()
461 os.RemoveAll(bepath)
462 return fmt.Errorf("database file (%v) of the backend is missing", bepath)
463 }
464 }
465 scaleUpLearners := false
466 return membership.ValidateMaxLearnerConfig(cfg.MaxLearners, c.cl.Members(), scaleUpLearners)
467}
468
469func (c *bootstrappedCluster) databaseFileMissing(s *bootstrappedStorage) bool {
470 v3Cluster := c.cl.Version() != nil && !c.cl.Version().LessThan(semver.Version{Major: 3})
471 return v3Cluster && !s.backend.beExist
472}
473
474func bootstrapRaft(cfg config.ServerConfig, cluster *bootstrappedCluster, bwal *bootstrappedWAL) *bootstrappedRaft {
475 switch {
476 case !bwal.haveWAL && !cfg.NewCluster:
477 return bootstrapRaftFromCluster(cfg, cluster.cl, nil, bwal)
478 case !bwal.haveWAL && cfg.NewCluster:
479 return bootstrapRaftFromCluster(cfg, cluster.cl, cluster.cl.MemberIDs(), bwal)
480 case bwal.haveWAL:
481 return bootstrapRaftFromWAL(cfg, bwal)
482 default:
483 cfg.Logger.Panic("unsupported bootstrap config")
484 return nil
485 }
486}
487
488func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft {
489 member := cl.MemberByName(cfg.Name)
490 peers := make([]raft.Peer, len(ids))
491 for i, id := range ids {
492 var ctx []byte
493 ctx, err := json.Marshal((*cl).Member(id))
494 if err != nil {
495 cfg.Logger.Panic("failed to marshal member", zap.Error(err))
496 }
497 peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
498 }
499 cfg.Logger.Info(
500 "starting local member",
501 zap.String("local-member-id", member.ID.String()),
502 zap.String("cluster-id", cl.ID().String()),
503 )
504 s := bwal.MemoryStorage()
505 return &bootstrappedRaft{
506 lg: cfg.Logger,
507 heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
508 config: raftConfig(cfg, uint64(member.ID), s),
509 peers: peers,
510 storage: s,
511 }
512}
513
514func bootstrapRaftFromWAL(cfg config.ServerConfig, bwal *bootstrappedWAL) *bootstrappedRaft {
515 s := bwal.MemoryStorage()
516 return &bootstrappedRaft{
517 lg: cfg.Logger,
518 heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
519 config: raftConfig(cfg, uint64(bwal.meta.nodeID), s),
520 storage: s,
521 }
522}
523
524func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft.Config {
525 return &raft.Config{
526 ID: id,
527 ElectionTick: cfg.ElectionTicks,
528 HeartbeatTick: 1,
529 Storage: s,
530 MaxSizePerMsg: maxSizePerMsg,
531 MaxInflightMsgs: maxInflightMsgs,
532 CheckQuorum: true,
533 PreVote: cfg.PreVote,
534 Logger: NewRaftLoggerZap(cfg.Logger.Named("raft")),
535 }
536}
537
538func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *membership.RaftCluster) *raftNode {
539 var n raft.Node
540 if len(b.peers) == 0 {
541 n = raft.RestartNode(b.config)
542 } else {
543 n = raft.StartNode(b.config, b.peers)
544 }
545 raftStatusMu.Lock()
546 raftStatus = n.Status
547 raftStatusMu.Unlock()
548 return newRaftNode(
549 raftNodeConfig{
550 lg: b.lg,
551 isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
552 Node: n,
553 heartbeat: b.heartbeat,
554 raftStorage: b.storage,
555 storage: serverstorage.NewStorage(b.lg, wal, ss),
556 },
557 )
558}
559
560func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot, ci cindex.ConsistentIndexer) *bootstrappedWAL {
561 wal, st, ents, snap, meta := openWALFromSnapshot(cfg, snapshot)
562 bwal := &bootstrappedWAL{
563 lg: cfg.Logger,
564 w: wal,
565 st: st,
566 ents: ents,
567 snapshot: snap,
568 meta: meta,
569 haveWAL: true,
570 }
571
572 if cfg.ForceNewCluster {
573 consistentIndex := ci.ConsistentIndex()
574 oldCommitIndex := bwal.st.Commit
575 // If only `HardState.Commit` increases, HardState won't be persisted
576 // to disk, even though the committed entries might have already been
577 // applied. This can result in consistent_index > CommitIndex.
578 //
579 // When restarting etcd with `--force-new-cluster`, all uncommitted
580 // entries are dropped. To avoid losing entries that were actually
581 // committed, we reset Commit to max(HardState.Commit, consistent_index).
582 //
583 // See: https://github.com/etcd-io/raft/pull/300 for more details.
584 bwal.st.Commit = max(oldCommitIndex, consistentIndex)
585
586 // discard the previously uncommitted entries
587 bwal.ents = bwal.CommitedEntries()
588 entries := bwal.NewConfigChangeEntries()
589 // force commit config change entries
590 bwal.AppendAndCommitEntries(entries)
591 cfg.Logger.Info(
592 "forcing restart member",
593 zap.String("cluster-id", meta.clusterID.String()),
594 zap.String("local-member-id", meta.nodeID.String()),
595 zap.Uint64("wal-commit-index", oldCommitIndex),
596 zap.Uint64("commit-index", bwal.st.Commit),
597 )
598 } else {
599 cfg.Logger.Info(
600 "restarting local member",
601 zap.String("cluster-id", meta.clusterID.String()),
602 zap.String("local-member-id", meta.nodeID.String()),
603 zap.Uint64("commit-index", bwal.st.Commit),
604 )
605 }
606 return bwal
607}
608
609// openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
610// after the position of the given snap in the WAL.
611// The snap must have been previously saved to the WAL, or this call will panic.
612func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*wal.WAL, *raftpb.HardState, []raftpb.Entry, *raftpb.Snapshot, *snapshotMetadata) {
613 var walsnap walpb.Snapshot
614 if snapshot != nil {
615 walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
616 }
617 repaired := false
618 for {
619 w, err := wal.Open(cfg.Logger, cfg.WALDir(), walsnap)
620 if err != nil {
621 cfg.Logger.Fatal("failed to open WAL", zap.Error(err))
622 }
623 if cfg.UnsafeNoFsync {
624 w.SetUnsafeNoFsync()
625 }
626 wmetadata, st, ents, err := w.ReadAll()
627 if err != nil {
628 w.Close()
629 // we can only repair ErrUnexpectedEOF and we never repair twice.
630 if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
631 cfg.Logger.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
632 }
633 if !wal.Repair(cfg.Logger, cfg.WALDir()) {
634 cfg.Logger.Fatal("failed to repair WAL", zap.Error(err))
635 } else {
636 cfg.Logger.Info("repaired WAL", zap.Error(err))
637 repaired = true
638 }
639 continue
640 }
641 var metadata etcdserverpb.Metadata
642 pbutil.MustUnmarshal(&metadata, wmetadata)
643 id := types.ID(metadata.NodeID)
644 cid := types.ID(metadata.ClusterID)
645 meta := &snapshotMetadata{clusterID: cid, nodeID: id}
646 return w, &st, ents, snapshot, meta
647 }
648}
649
650type snapshotMetadata struct {
651 nodeID, clusterID types.ID
652}
653
654func bootstrapNewWAL(cfg config.ServerConfig, cl *bootstrappedCluster) *bootstrappedWAL {
655 metadata := pbutil.MustMarshal(
656 &etcdserverpb.Metadata{
657 NodeID: uint64(cl.nodeID),
658 ClusterID: uint64(cl.cl.ID()),
659 },
660 )
661 w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata)
662 if err != nil {
663 cfg.Logger.Panic("failed to create WAL", zap.Error(err))
664 }
665 if cfg.UnsafeNoFsync {
666 w.SetUnsafeNoFsync()
667 }
668 return &bootstrappedWAL{
669 lg: cfg.Logger,
670 w: w,
671 }
672}
673
674type bootstrappedWAL struct {
675 lg *zap.Logger
676
677 haveWAL bool
678 w *wal.WAL
679 st *raftpb.HardState
680 ents []raftpb.Entry
681 snapshot *raftpb.Snapshot
682 meta *snapshotMetadata
683}
684
685func (wal *bootstrappedWAL) MemoryStorage() *raft.MemoryStorage {
686 s := raft.NewMemoryStorage()
687 if wal.snapshot != nil {
688 s.ApplySnapshot(*wal.snapshot)
689 }
690 if wal.st != nil {
691 s.SetHardState(*wal.st)
692 }
693 if len(wal.ents) != 0 {
694 s.Append(wal.ents)
695 }
696 return s
697}
698
699func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry {
700 for i, ent := range wal.ents {
701 if ent.Index > wal.st.Commit {
702 wal.lg.Info(
703 "discarding uncommitted WAL entries",
704 zap.Uint64("entry-index", ent.Index),
705 zap.Uint64("commit-index-from-wal", wal.st.Commit),
706 zap.Int("number-of-discarded-entries", len(wal.ents)-i),
707 )
708 return wal.ents[:i]
709 }
710 }
711 return wal.ents
712}
713
714func (wal *bootstrappedWAL) NewConfigChangeEntries() []raftpb.Entry {
715 return serverstorage.CreateConfigChangeEnts(
716 wal.lg,
717 serverstorage.GetEffectiveNodeIDsFromWALEntries(wal.lg, wal.snapshot, wal.ents),
718 uint64(wal.meta.nodeID),
719 wal.st.Term,
720 wal.st.Commit,
721 )
722}
723
724func (wal *bootstrappedWAL) AppendAndCommitEntries(ents []raftpb.Entry) {
725 wal.ents = append(wal.ents, ents...)
726 err := wal.w.Save(raftpb.HardState{}, ents)
727 if err != nil {
728 wal.lg.Fatal("failed to save hard state and entries", zap.Error(err))
729 }
730 if len(wal.ents) != 0 {
731 wal.st.Commit = wal.ents[len(wal.ents)-1].Index
732 }
733}