blob: 94c2363d59e0de4195c9a78af42cfc5c4bf1512b [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18 "bytes"
19 "crypto/rand"
20 "errors"
21 "fmt"
22 "math"
23 "math/big"
24 "slices"
25 "strings"
26 "sync"
27
28 "go.etcd.io/raft/v3/confchange"
29 "go.etcd.io/raft/v3/quorum"
30 pb "go.etcd.io/raft/v3/raftpb"
31 "go.etcd.io/raft/v3/tracker"
32)
33
34const (
35 // None is a placeholder node ID used when there is no leader.
36 None uint64 = 0
37 // LocalAppendThread is a reference to a local thread that saves unstable
38 // log entries and snapshots to stable storage. The identifier is used as a
39 // target for MsgStorageAppend messages when AsyncStorageWrites is enabled.
40 LocalAppendThread uint64 = math.MaxUint64
41 // LocalApplyThread is a reference to a local thread that applies committed
42 // log entries to the local state machine. The identifier is used as a
43 // target for MsgStorageApply messages when AsyncStorageWrites is enabled.
44 LocalApplyThread uint64 = math.MaxUint64 - 1
45)
46
47// Possible values for StateType.
48const (
49 StateFollower StateType = iota
50 StateCandidate
51 StateLeader
52 StatePreCandidate
53 numStates
54)
55
56type ReadOnlyOption int
57
58const (
59 // ReadOnlySafe guarantees the linearizability of the read only request by
60 // communicating with the quorum. It is the default and suggested option.
61 ReadOnlySafe ReadOnlyOption = iota
62 // ReadOnlyLeaseBased ensures linearizability of the read only request by
63 // relying on the leader lease. It can be affected by clock drift.
64 // If the clock drift is unbounded, leader might keep the lease longer than it
65 // should (clock can move backward/pause without any bound). ReadIndex is not safe
66 // in that case.
67 ReadOnlyLeaseBased
68)
69
70// Possible values for CampaignType
71const (
72 // campaignPreElection represents the first phase of a normal election when
73 // Config.PreVote is true.
74 campaignPreElection CampaignType = "CampaignPreElection"
75 // campaignElection represents a normal (time-based) election (the second phase
76 // of the election when Config.PreVote is true).
77 campaignElection CampaignType = "CampaignElection"
78 // campaignTransfer represents the type of leader transfer
79 campaignTransfer CampaignType = "CampaignTransfer"
80)
81
82const noLimit = math.MaxUint64
83
84// ErrProposalDropped is returned when the proposal is ignored by some cases,
85// so that the proposer can be notified and fail fast.
86var ErrProposalDropped = errors.New("raft proposal dropped")
87
88// lockedRand is a small wrapper around rand.Rand to provide
89// synchronization among multiple raft groups. Only the methods needed
90// by the code are exposed (e.g. Intn).
91type lockedRand struct {
92 mu sync.Mutex
93}
94
95func (r *lockedRand) Intn(n int) int {
96 r.mu.Lock()
97 v, _ := rand.Int(rand.Reader, big.NewInt(int64(n)))
98 r.mu.Unlock()
99 return int(v.Int64())
100}
101
102var globalRand = &lockedRand{}
103
104// CampaignType represents the type of campaigning
105// the reason we use the type of string instead of uint64
106// is because it's simpler to compare and fill in raft entries
107type CampaignType string
108
109// StateType represents the role of a node in a cluster.
110type StateType uint64
111
112var stmap = [...]string{
113 "StateFollower",
114 "StateCandidate",
115 "StateLeader",
116 "StatePreCandidate",
117}
118
119func (st StateType) String() string {
120 return stmap[st]
121}
122
123// Config contains the parameters to start a raft.
124type Config struct {
125 // ID is the identity of the local raft. ID cannot be 0.
126 ID uint64
127
128 // ElectionTick is the number of Node.Tick invocations that must pass between
129 // elections. That is, if a follower does not receive any message from the
130 // leader of current term before ElectionTick has elapsed, it will become
131 // candidate and start an election. ElectionTick must be greater than
132 // HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
133 // unnecessary leader switching.
134 ElectionTick int
135 // HeartbeatTick is the number of Node.Tick invocations that must pass between
136 // heartbeats. That is, a leader sends heartbeat messages to maintain its
137 // leadership every HeartbeatTick ticks.
138 HeartbeatTick int
139
140 // Storage is the storage for raft. raft generates entries and states to be
141 // stored in storage. raft reads the persisted entries and states out of
142 // Storage when it needs. raft reads out the previous state and configuration
143 // out of storage when restarting.
144 Storage Storage
145 // Applied is the last applied index. It should only be set when restarting
146 // raft. raft will not return entries to the application smaller or equal to
147 // Applied. If Applied is unset when restarting, raft might return previous
148 // applied entries. This is a very application dependent configuration.
149 Applied uint64
150
151 // AsyncStorageWrites configures the raft node to write to its local storage
152 // (raft log and state machine) using a request/response message passing
153 // interface instead of the default Ready/Advance function call interface.
154 // Local storage messages can be pipelined and processed asynchronously
155 // (with respect to Ready iteration), facilitating reduced interference
156 // between Raft proposals and increased batching of log appends and state
157 // machine application. As a result, use of asynchronous storage writes can
158 // reduce end-to-end commit latency and increase maximum throughput.
159 //
160 // When true, the Ready.Message slice will include MsgStorageAppend and
161 // MsgStorageApply messages. The messages will target a LocalAppendThread
162 // and a LocalApplyThread, respectively. Messages to the same target must be
163 // reliably processed in order. In other words, they can't be dropped (like
164 // messages over the network) and those targeted at the same thread can't be
165 // reordered. Messages to different targets can be processed in any order.
166 //
167 // MsgStorageAppend carries Raft log entries to append, election votes /
168 // term changes / updated commit indexes to persist, and snapshots to apply.
169 // All writes performed in service of a MsgStorageAppend must be durable
170 // before response messages are delivered. However, if the MsgStorageAppend
171 // carries no response messages, durability is not required. The message
172 // assumes the role of the Entries, HardState, and Snapshot fields in Ready.
173 //
174 // MsgStorageApply carries committed entries to apply. Writes performed in
175 // service of a MsgStorageApply need not be durable before response messages
176 // are delivered. The message assumes the role of the CommittedEntries field
177 // in Ready.
178 //
179 // Local messages each carry one or more response messages which should be
180 // delivered after the corresponding storage write has been completed. These
181 // responses may target the same node or may target other nodes. The storage
182 // threads are not responsible for understanding the response messages, only
183 // for delivering them to the correct target after performing the storage
184 // write.
185 AsyncStorageWrites bool
186
187 // MaxSizePerMsg limits the max byte size of each append message. Smaller
188 // value lowers the raft recovery cost(initial probing and message lost
189 // during normal operation). On the other side, it might affect the
190 // throughput during normal replication. Note: math.MaxUint64 for unlimited,
191 // 0 for at most one entry per message.
192 MaxSizePerMsg uint64
193 // MaxCommittedSizePerReady limits the size of the committed entries which
194 // can be applying at the same time.
195 //
196 // Despite its name (preserved for compatibility), this quota applies across
197 // Ready structs to encompass all outstanding entries in unacknowledged
198 // MsgStorageApply messages when AsyncStorageWrites is enabled.
199 MaxCommittedSizePerReady uint64
200 // MaxUncommittedEntriesSize limits the aggregate byte size of the
201 // uncommitted entries that may be appended to a leader's log. Once this
202 // limit is exceeded, proposals will begin to return ErrProposalDropped
203 // errors. Note: 0 for no limit.
204 MaxUncommittedEntriesSize uint64
205 // MaxInflightMsgs limits the max number of in-flight append messages during
206 // optimistic replication phase. The application transportation layer usually
207 // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
208 // overflowing that sending buffer. TODO (xiangli): feedback to application to
209 // limit the proposal rate?
210 MaxInflightMsgs int
211 // MaxInflightBytes limits the number of in-flight bytes in append messages.
212 // Complements MaxInflightMsgs. Ignored if zero.
213 //
214 // This effectively bounds the bandwidth-delay product. Note that especially
215 // in high-latency deployments setting this too low can lead to a dramatic
216 // reduction in throughput. For example, with a peer that has a round-trip
217 // latency of 100ms to the leader and this setting is set to 1 MB, there is a
218 // throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops
219 // to 2.5 MB/s. See Little's law to understand the maths behind.
220 MaxInflightBytes uint64
221
222 // CheckQuorum specifies if the leader should check quorum activity. Leader
223 // steps down when quorum is not active for an electionTimeout.
224 CheckQuorum bool
225
226 // PreVote enables the Pre-Vote algorithm described in raft thesis section
227 // 9.6. This prevents disruption when a node that has been partitioned away
228 // rejoins the cluster.
229 PreVote bool
230
231 // ReadOnlyOption specifies how the read only request is processed.
232 //
233 // ReadOnlySafe guarantees the linearizability of the read only request by
234 // communicating with the quorum. It is the default and suggested option.
235 //
236 // ReadOnlyLeaseBased ensures linearizability of the read only request by
237 // relying on the leader lease. It can be affected by clock drift.
238 // If the clock drift is unbounded, leader might keep the lease longer than it
239 // should (clock can move backward/pause without any bound). ReadIndex is not safe
240 // in that case.
241 // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
242 ReadOnlyOption ReadOnlyOption
243
244 // Logger is the logger used for raft log. For multinode which can host
245 // multiple raft group, each raft group can have its own logger
246 Logger Logger
247
248 // DisableProposalForwarding set to true means that followers will drop
249 // proposals, rather than forwarding them to the leader. One use case for
250 // this feature would be in a situation where the Raft leader is used to
251 // compute the data of a proposal, for example, adding a timestamp from a
252 // hybrid logical clock to data in a monotonically increasing way. Forwarding
253 // should be disabled to prevent a follower with an inaccurate hybrid
254 // logical clock from assigning the timestamp and then forwarding the data
255 // to the leader.
256 DisableProposalForwarding bool
257
258 // DisableConfChangeValidation turns off propose-time verification of
259 // configuration changes against the currently active configuration of the
260 // raft instance. These checks are generally sensible (cannot leave a joint
261 // config unless in a joint config, et cetera) but they have false positives
262 // because the active configuration may not be the most recent
263 // configuration. This is because configurations are activated during log
264 // application, and even the leader can trail log application by an
265 // unbounded number of entries.
266 // Symmetrically, the mechanism has false negatives - because the check may
267 // not run against the "actual" config that will be the predecessor of the
268 // newly proposed one, the check may pass but the new config may be invalid
269 // when it is being applied. In other words, the checks are best-effort.
270 //
271 // Users should *not* use this option unless they have a reliable mechanism
272 // (above raft) that serializes and verifies configuration changes. If an
273 // invalid configuration change enters the log and gets applied, a panic
274 // will result.
275 //
276 // This option may be removed once false positives are no longer possible.
277 // See: https://github.com/etcd-io/raft/issues/80
278 DisableConfChangeValidation bool
279
280 // StepDownOnRemoval makes the leader step down when it is removed from the
281 // group or demoted to a learner.
282 //
283 // This behavior will become unconditional in the future. See:
284 // https://github.com/etcd-io/raft/issues/83
285 StepDownOnRemoval bool
286
287 // raft state tracer
288 TraceLogger TraceLogger
289}
290
291func (c *Config) validate() error {
292 if c.ID == None {
293 return errors.New("cannot use none as id")
294 }
295 if IsLocalMsgTarget(c.ID) {
296 return errors.New("cannot use local target as id")
297 }
298
299 if c.HeartbeatTick <= 0 {
300 return errors.New("heartbeat tick must be greater than 0")
301 }
302
303 if c.ElectionTick <= c.HeartbeatTick {
304 return errors.New("election tick must be greater than heartbeat tick")
305 }
306
307 if c.Storage == nil {
308 return errors.New("storage cannot be nil")
309 }
310
311 if c.MaxUncommittedEntriesSize == 0 {
312 c.MaxUncommittedEntriesSize = noLimit
313 }
314
315 // default MaxCommittedSizePerReady to MaxSizePerMsg because they were
316 // previously the same parameter.
317 if c.MaxCommittedSizePerReady == 0 {
318 c.MaxCommittedSizePerReady = c.MaxSizePerMsg
319 }
320
321 if c.MaxInflightMsgs <= 0 {
322 return errors.New("max inflight messages must be greater than 0")
323 }
324 if c.MaxInflightBytes == 0 {
325 c.MaxInflightBytes = noLimit
326 } else if c.MaxInflightBytes < c.MaxSizePerMsg {
327 return errors.New("max inflight bytes must be >= max message size")
328 }
329
330 if c.Logger == nil {
331 c.Logger = getLogger()
332 }
333
334 if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum {
335 return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased")
336 }
337
338 return nil
339}
340
341type raft struct {
342 id uint64
343
344 Term uint64
345 Vote uint64
346
347 readStates []ReadState
348
349 // the log
350 raftLog *raftLog
351
352 maxMsgSize entryEncodingSize
353 maxUncommittedSize entryPayloadSize
354
355 trk tracker.ProgressTracker
356
357 state StateType
358
359 // isLearner is true if the local raft node is a learner.
360 isLearner bool
361
362 // msgs contains the list of messages that should be sent out immediately to
363 // other nodes.
364 //
365 // Messages in this list must target other nodes.
366 msgs []pb.Message
367 // msgsAfterAppend contains the list of messages that should be sent after
368 // the accumulated unstable state (e.g. term, vote, []entry, and snapshot)
369 // has been persisted to durable storage. This includes waiting for any
370 // unstable state that is already in the process of being persisted (i.e.
371 // has already been handed out in a prior Ready struct) to complete.
372 //
373 // Messages in this list may target other nodes or may target this node.
374 //
375 // Messages in this list have the type MsgAppResp, MsgVoteResp, or
376 // MsgPreVoteResp. See the comment in raft.send for details.
377 msgsAfterAppend []pb.Message
378
379 // the leader id
380 lead uint64
381 // leadTransferee is id of the leader transfer target when its value is not zero.
382 // Follow the procedure defined in raft thesis 3.10.
383 leadTransferee uint64
384 // Only one conf change may be pending (in the log, but not yet
385 // applied) at a time. This is enforced via pendingConfIndex, which
386 // is set to a value >= the log index of the latest pending
387 // configuration change (if any). Config changes are only allowed to
388 // be proposed if the leader's applied index is greater than this
389 // value.
390 pendingConfIndex uint64
391 // disableConfChangeValidation is Config.DisableConfChangeValidation,
392 // see there for details.
393 disableConfChangeValidation bool
394 // an estimate of the size of the uncommitted tail of the Raft log. Used to
395 // prevent unbounded log growth. Only maintained by the leader. Reset on
396 // term changes.
397 uncommittedSize entryPayloadSize
398
399 readOnly *readOnly
400
401 // number of ticks since it reached last electionTimeout when it is leader
402 // or candidate.
403 // number of ticks since it reached last electionTimeout or received a
404 // valid message from current leader when it is a follower.
405 electionElapsed int
406
407 // number of ticks since it reached last heartbeatTimeout.
408 // only leader keeps heartbeatElapsed.
409 heartbeatElapsed int
410
411 checkQuorum bool
412 preVote bool
413
414 heartbeatTimeout int
415 electionTimeout int
416 // randomizedElectionTimeout is a random number between
417 // [electiontimeout, 2 * electiontimeout - 1]. It gets reset
418 // when raft changes its state to follower or candidate.
419 randomizedElectionTimeout int
420 disableProposalForwarding bool
421 stepDownOnRemoval bool
422
423 tick func()
424 step stepFunc
425
426 logger Logger
427
428 // pendingReadIndexMessages is used to store messages of type MsgReadIndex
429 // that can't be answered as new leader didn't committed any log in
430 // current term. Those will be handled as fast as first log is committed in
431 // current term.
432 pendingReadIndexMessages []pb.Message
433
434 traceLogger TraceLogger
435}
436
437func newRaft(c *Config) *raft {
438 if err := c.validate(); err != nil {
439 panic(err.Error())
440 }
441 raftlog := newLogWithSize(c.Storage, c.Logger, entryEncodingSize(c.MaxCommittedSizePerReady))
442 hs, cs, err := c.Storage.InitialState()
443 if err != nil {
444 panic(err) // TODO(bdarnell)
445 }
446
447 r := &raft{
448 id: c.ID,
449 lead: None,
450 isLearner: false,
451 raftLog: raftlog,
452 maxMsgSize: entryEncodingSize(c.MaxSizePerMsg),
453 maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize),
454 trk: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes),
455 electionTimeout: c.ElectionTick,
456 heartbeatTimeout: c.HeartbeatTick,
457 logger: c.Logger,
458 checkQuorum: c.CheckQuorum,
459 preVote: c.PreVote,
460 readOnly: newReadOnly(c.ReadOnlyOption),
461 disableProposalForwarding: c.DisableProposalForwarding,
462 disableConfChangeValidation: c.DisableConfChangeValidation,
463 stepDownOnRemoval: c.StepDownOnRemoval,
464 traceLogger: c.TraceLogger,
465 }
466
467 traceInitState(r)
468
469 lastID := r.raftLog.lastEntryID()
470 cfg, trk, err := confchange.Restore(confchange.Changer{
471 Tracker: r.trk,
472 LastIndex: lastID.index,
473 }, cs)
474 if err != nil {
475 panic(err)
476 }
477 assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, trk))
478
479 if !IsEmptyHardState(hs) {
480 r.loadState(hs)
481 }
482 if c.Applied > 0 {
483 raftlog.appliedTo(c.Applied, 0 /* size */)
484 }
485 r.becomeFollower(r.Term, None)
486
487 var nodesStrs []string
488 for _, n := range r.trk.VoterNodes() {
489 nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
490 }
491
492 // TODO(pav-kv): it should be ok to simply print %+v for lastID.
493 r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
494 r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, lastID.index, lastID.term)
495 return r
496}
497
498func (r *raft) hasLeader() bool { return r.lead != None }
499
500func (r *raft) softState() SoftState { return SoftState{Lead: r.lead, RaftState: r.state} }
501
502func (r *raft) hardState() pb.HardState {
503 return pb.HardState{
504 Term: r.Term,
505 Vote: r.Vote,
506 Commit: r.raftLog.committed,
507 }
508}
509
510// send schedules persisting state to a stable storage and AFTER that
511// sending the message (as part of next Ready message processing).
512func (r *raft) send(m pb.Message) {
513 if m.From == None {
514 m.From = r.id
515 }
516 if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
517 if m.Term == 0 {
518 // All {pre-,}campaign messages need to have the term set when
519 // sending.
520 // - MsgVote: m.Term is the term the node is campaigning for,
521 // non-zero as we increment the term when campaigning.
522 // - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
523 // granted, non-zero for the same reason MsgVote is
524 // - MsgPreVote: m.Term is the term the node will campaign,
525 // non-zero as we use m.Term to indicate the next term we'll be
526 // campaigning for
527 // - MsgPreVoteResp: m.Term is the term received in the original
528 // MsgPreVote if the pre-vote was granted, non-zero for the
529 // same reasons MsgPreVote is
530 r.logger.Panicf("term should be set when sending %s", m.Type)
531 }
532 } else {
533 if m.Term != 0 {
534 r.logger.Panicf("term should not be set when sending %s (was %d)", m.Type, m.Term)
535 }
536 // do not attach term to MsgProp, MsgReadIndex
537 // proposals are a way to forward to the leader and
538 // should be treated as local message.
539 // MsgReadIndex is also forwarded to leader.
540 if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
541 m.Term = r.Term
542 }
543 }
544 if m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVoteResp {
545 // If async storage writes are enabled, messages added to the msgs slice
546 // are allowed to be sent out before unstable state (e.g. log entry
547 // writes and election votes) have been durably synced to the local
548 // disk.
549 //
550 // For most message types, this is not an issue. However, response
551 // messages that relate to "voting" on either leader election or log
552 // appends require durability before they can be sent. It would be
553 // incorrect to publish a vote in an election before that vote has been
554 // synced to stable storage locally. Similarly, it would be incorrect to
555 // acknowledge a log append to the leader before that entry has been
556 // synced to stable storage locally.
557 //
558 // Per the Raft thesis, section 3.8 Persisted state and server restarts:
559 //
560 // > Raft servers must persist enough information to stable storage to
561 // > survive server restarts safely. In particular, each server persists
562 // > its current term and vote; this is necessary to prevent the server
563 // > from voting twice in the same term or replacing log entries from a
564 // > newer leader with those from a deposed leader. Each server also
565 // > persists new log entries before they are counted towards the entries’
566 // > commitment; this prevents committed entries from being lost or
567 // > “uncommitted” when servers restart
568 //
569 // To enforce this durability requirement, these response messages are
570 // queued to be sent out as soon as the current collection of unstable
571 // state (the state that the response message was predicated upon) has
572 // been durably persisted. This unstable state may have already been
573 // passed to a Ready struct whose persistence is in progress or may be
574 // waiting for the next Ready struct to begin being written to Storage.
575 // These messages must wait for all of this state to be durable before
576 // being published.
577 //
578 // Rejected responses (m.Reject == true) present an interesting case
579 // where the durability requirement is less unambiguous. A rejection may
580 // be predicated upon unstable state. For instance, a node may reject a
581 // vote for one peer because it has already begun syncing its vote for
582 // another peer. Or it may reject a vote from one peer because it has
583 // unstable log entries that indicate that the peer is behind on its
584 // log. In these cases, it is likely safe to send out the rejection
585 // response immediately without compromising safety in the presence of a
586 // server restart. However, because these rejections are rare and
587 // because the safety of such behavior has not been formally verified,
588 // we err on the side of safety and omit a `&& !m.Reject` condition
589 // above.
590 r.msgsAfterAppend = append(r.msgsAfterAppend, m)
591 traceSendMessage(r, &m)
592 } else {
593 if m.To == r.id {
594 r.logger.Panicf("message should not be self-addressed when sending %s", m.Type)
595 }
596 r.msgs = append(r.msgs, m)
597 traceSendMessage(r, &m)
598 }
599}
600
601// sendAppend sends an append RPC with new entries (if any) and the
602// current commit index to the given peer.
603func (r *raft) sendAppend(to uint64) {
604 r.maybeSendAppend(to, true)
605}
606
607// maybeSendAppend sends an append RPC with new entries to the given peer,
608// if necessary. Returns true if a message was sent. The sendIfEmpty
609// argument controls whether messages with no entries will be sent
610// ("empty" messages are useful to convey updated Commit indexes, but
611// are undesirable when we're sending multiple messages in a batch).
612//
613// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
614// struct contains all the state necessary for deciding whether to send a
615// message.
616func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
617 pr := r.trk.Progress[to]
618 if pr.IsPaused() {
619 return false
620 }
621
622 prevIndex := pr.Next - 1
623 prevTerm, err := r.raftLog.term(prevIndex)
624 if err != nil {
625 // The log probably got truncated at >= pr.Next, so we can't catch up the
626 // follower log anymore. Send a snapshot instead.
627 return r.maybeSendSnapshot(to, pr)
628 }
629
630 var ents []pb.Entry
631 // In a throttled StateReplicate only send empty MsgApp, to ensure progress.
632 // Otherwise, if we had a full Inflights and all inflight messages were in
633 // fact dropped, replication to that follower would stall. Instead, an empty
634 // MsgApp will eventually reach the follower (heartbeats responses prompt the
635 // leader to send an append), allowing it to be acked or rejected, both of
636 // which will clear out Inflights.
637 if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
638 ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize)
639 }
640 if len(ents) == 0 && !sendIfEmpty {
641 return false
642 }
643 // TODO(pav-kv): move this check up to where err is returned.
644 if err != nil { // send a snapshot if we failed to get the entries
645 return r.maybeSendSnapshot(to, pr)
646 }
647
648 // Send the actual MsgApp otherwise, and update the progress accordingly.
649 r.send(pb.Message{
650 To: to,
651 Type: pb.MsgApp,
652 Index: prevIndex,
653 LogTerm: prevTerm,
654 Entries: ents,
655 Commit: r.raftLog.committed,
656 })
657 pr.SentEntries(len(ents), uint64(payloadsSize(ents)))
658 pr.SentCommit(r.raftLog.committed)
659 return true
660}
661
662// maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given
663// node. Returns true iff the snapshot message has been emitted successfully.
664func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {
665 if !pr.RecentActive {
666 r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
667 return false
668 }
669
670 snapshot, err := r.raftLog.snapshot()
671 if err != nil {
672 if err == ErrSnapshotTemporarilyUnavailable {
673 r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
674 return false
675 }
676 panic(err) // TODO(bdarnell)
677 }
678 if IsEmptySnap(snapshot) {
679 panic("need non-empty snapshot")
680 }
681 sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
682 r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
683 r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
684 pr.BecomeSnapshot(sindex)
685 r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
686
687 r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
688 return true
689}
690
691// sendHeartbeat sends a heartbeat RPC to the given peer.
692func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
693 pr := r.trk.Progress[to]
694 // Attach the commit as min(to.matched, r.committed).
695 // When the leader sends out heartbeat message,
696 // the receiver(follower) might not be matched with the leader
697 // or it might not have all the committed entries.
698 // The leader MUST NOT forward the follower's commit to
699 // an unmatched index.
700 commit := min(pr.Match, r.raftLog.committed)
701 r.send(pb.Message{
702 To: to,
703 Type: pb.MsgHeartbeat,
704 Commit: commit,
705 Context: ctx,
706 })
707 pr.SentCommit(commit)
708}
709
710// bcastAppend sends RPC, with entries to all peers that are not up-to-date
711// according to the progress recorded in r.trk.
712func (r *raft) bcastAppend() {
713 r.trk.Visit(func(id uint64, _ *tracker.Progress) {
714 if id == r.id {
715 return
716 }
717 r.sendAppend(id)
718 })
719}
720
721// bcastHeartbeat sends RPC, without entries to all the peers.
722func (r *raft) bcastHeartbeat() {
723 lastCtx := r.readOnly.lastPendingRequestCtx()
724 if len(lastCtx) == 0 {
725 r.bcastHeartbeatWithCtx(nil)
726 } else {
727 r.bcastHeartbeatWithCtx([]byte(lastCtx))
728 }
729}
730
731func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
732 r.trk.Visit(func(id uint64, _ *tracker.Progress) {
733 if id == r.id {
734 return
735 }
736 r.sendHeartbeat(id, ctx)
737 })
738}
739
740func (r *raft) appliedTo(index uint64, size entryEncodingSize) {
741 oldApplied := r.raftLog.applied
742 newApplied := max(index, oldApplied)
743 r.raftLog.appliedTo(newApplied, size)
744
745 if r.trk.Config.AutoLeave && newApplied >= r.pendingConfIndex && r.state == StateLeader {
746 // If the current (and most recent, at least for this leader's term)
747 // configuration should be auto-left, initiate that now. We use a
748 // nil Data which unmarshals into an empty ConfChangeV2 and has the
749 // benefit that appendEntry can never refuse it based on its size
750 // (which registers as zero).
751 m, err := confChangeToMsg(nil)
752 if err != nil {
753 panic(err)
754 }
755 // NB: this proposal can't be dropped due to size, but can be
756 // dropped if a leadership transfer is in progress. We'll keep
757 // checking this condition on each applied entry, so either the
758 // leadership transfer will succeed and the new leader will leave
759 // the joint configuration, or the leadership transfer will fail,
760 // and we will propose the config change on the next advance.
761 if err := r.Step(m); err != nil {
762 r.logger.Debugf("not initiating automatic transition out of joint configuration %s: %v", r.trk.Config, err)
763 } else {
764 r.logger.Infof("initiating automatic transition out of joint configuration %s", r.trk.Config)
765 }
766 }
767}
768
769func (r *raft) appliedSnap(snap *pb.Snapshot) {
770 index := snap.Metadata.Index
771 r.raftLog.stableSnapTo(index)
772 r.appliedTo(index, 0 /* size */)
773}
774
775// maybeCommit attempts to advance the commit index. Returns true if the commit
776// index changed (in which case the caller should call r.bcastAppend). This can
777// only be called in StateLeader.
778func (r *raft) maybeCommit() bool {
779 defer traceCommit(r)
780
781 return r.raftLog.maybeCommit(entryID{term: r.Term, index: r.trk.Committed()})
782}
783
784func (r *raft) reset(term uint64) {
785 if r.Term != term {
786 r.Term = term
787 r.Vote = None
788 }
789 r.lead = None
790
791 r.electionElapsed = 0
792 r.heartbeatElapsed = 0
793 r.resetRandomizedElectionTimeout()
794
795 r.abortLeaderTransfer()
796
797 r.trk.ResetVotes()
798 r.trk.Visit(func(id uint64, pr *tracker.Progress) {
799 *pr = tracker.Progress{
800 Match: 0,
801 Next: r.raftLog.lastIndex() + 1,
802 Inflights: tracker.NewInflights(r.trk.MaxInflight, r.trk.MaxInflightBytes),
803 IsLearner: pr.IsLearner,
804 }
805 if id == r.id {
806 pr.Match = r.raftLog.lastIndex()
807 }
808 })
809
810 r.pendingConfIndex = 0
811 r.uncommittedSize = 0
812 r.readOnly = newReadOnly(r.readOnly.option)
813}
814
815func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
816 li := r.raftLog.lastIndex()
817 for i := range es {
818 es[i].Term = r.Term
819 es[i].Index = li + 1 + uint64(i)
820 }
821 // Track the size of this uncommitted proposal.
822 if !r.increaseUncommittedSize(es) {
823 r.logger.Warningf(
824 "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
825 r.id,
826 )
827 // Drop the proposal.
828 return false
829 }
830
831 traceReplicate(r, es...)
832
833 // use latest "last" index after truncate/append
834 li = r.raftLog.append(es...)
835 // The leader needs to self-ack the entries just appended once they have
836 // been durably persisted (since it doesn't send an MsgApp to itself). This
837 // response message will be added to msgsAfterAppend and delivered back to
838 // this node after these entries have been written to stable storage. When
839 // handled, this is roughly equivalent to:
840 //
841 // r.trk.Progress[r.id].MaybeUpdate(e.Index)
842 // if r.maybeCommit() {
843 // r.bcastAppend()
844 // }
845 r.send(pb.Message{To: r.id, Type: pb.MsgAppResp, Index: li})
846 return true
847}
848
849// tickElection is run by followers and candidates after r.electionTimeout.
850func (r *raft) tickElection() {
851 r.electionElapsed++
852
853 if r.promotable() && r.pastElectionTimeout() {
854 r.electionElapsed = 0
855 if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil {
856 r.logger.Debugf("error occurred during election: %v", err)
857 }
858 }
859}
860
861// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
862func (r *raft) tickHeartbeat() {
863 r.heartbeatElapsed++
864 r.electionElapsed++
865
866 if r.electionElapsed >= r.electionTimeout {
867 r.electionElapsed = 0
868 if r.checkQuorum {
869 if err := r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}); err != nil {
870 r.logger.Debugf("error occurred during checking sending heartbeat: %v", err)
871 }
872 }
873 // If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
874 if r.state == StateLeader && r.leadTransferee != None {
875 r.abortLeaderTransfer()
876 }
877 }
878
879 if r.state != StateLeader {
880 return
881 }
882
883 if r.heartbeatElapsed >= r.heartbeatTimeout {
884 r.heartbeatElapsed = 0
885 if err := r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}); err != nil {
886 r.logger.Debugf("error occurred during checking sending heartbeat: %v", err)
887 }
888 }
889}
890
891func (r *raft) becomeFollower(term uint64, lead uint64) {
892 r.step = stepFollower
893 r.reset(term)
894 r.tick = r.tickElection
895 r.lead = lead
896 r.state = StateFollower
897 r.logger.Infof("%x became follower at term %d", r.id, r.Term)
898
899 traceBecomeFollower(r)
900}
901
902func (r *raft) becomeCandidate() {
903 // TODO(xiangli) remove the panic when the raft implementation is stable
904 if r.state == StateLeader {
905 panic("invalid transition [leader -> candidate]")
906 }
907 r.step = stepCandidate
908 r.reset(r.Term + 1)
909 r.tick = r.tickElection
910 r.Vote = r.id
911 r.state = StateCandidate
912 r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
913
914 traceBecomeCandidate(r)
915}
916
917func (r *raft) becomePreCandidate() {
918 // TODO(xiangli) remove the panic when the raft implementation is stable
919 if r.state == StateLeader {
920 panic("invalid transition [leader -> pre-candidate]")
921 }
922 // Becoming a pre-candidate changes our step functions and state,
923 // but doesn't change anything else. In particular it does not increase
924 // r.Term or change r.Vote.
925 r.step = stepCandidate
926 r.trk.ResetVotes()
927 r.tick = r.tickElection
928 r.lead = None
929 r.state = StatePreCandidate
930 r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
931}
932
933func (r *raft) becomeLeader() {
934 // TODO(xiangli) remove the panic when the raft implementation is stable
935 if r.state == StateFollower {
936 panic("invalid transition [follower -> leader]")
937 }
938 r.step = stepLeader
939 r.reset(r.Term)
940 r.tick = r.tickHeartbeat
941 r.lead = r.id
942 r.state = StateLeader
943 // Followers enter replicate mode when they've been successfully probed
944 // (perhaps after having received a snapshot as a result). The leader is
945 // trivially in this state. Note that r.reset() has initialized this
946 // progress with the last index already.
947 pr := r.trk.Progress[r.id]
948 pr.BecomeReplicate()
949 // The leader always has RecentActive == true; MsgCheckQuorum makes sure to
950 // preserve this.
951 pr.RecentActive = true
952
953 // Conservatively set the pendingConfIndex to the last index in the
954 // log. There may or may not be a pending config change, but it's
955 // safe to delay any future proposals until we commit all our
956 // pending log entries, and scanning the entire tail of the log
957 // could be expensive.
958 r.pendingConfIndex = r.raftLog.lastIndex()
959
960 traceBecomeLeader(r)
961 emptyEnt := pb.Entry{Data: nil}
962 if !r.appendEntry(emptyEnt) {
963 // This won't happen because we just called reset() above.
964 r.logger.Panic("empty entry was dropped")
965 }
966 // The payloadSize of an empty entry is 0 (see TestPayloadSizeOfEmptyEntry),
967 // so the preceding log append does not count against the uncommitted log
968 // quota of the new leader. In other words, after the call to appendEntry,
969 // r.uncommittedSize is still 0.
970 r.logger.Infof("%x became leader at term %d", r.id, r.Term)
971}
972
973func (r *raft) hup(t CampaignType) {
974 if r.state == StateLeader {
975 r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
976 return
977 }
978
979 if !r.promotable() {
980 r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
981 return
982 }
983 if r.hasUnappliedConfChanges() {
984 r.logger.Warningf("%x cannot campaign at term %d since there are still pending configuration changes to apply", r.id, r.Term)
985 return
986 }
987
988 r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
989 r.campaign(t)
990}
991
992// errBreak is a sentinel error used to break a callback-based loop.
993var errBreak = errors.New("break")
994
995func (r *raft) hasUnappliedConfChanges() bool {
996 if r.raftLog.applied >= r.raftLog.committed { // in fact applied == committed
997 return false
998 }
999 found := false
1000 // Scan all unapplied committed entries to find a config change. Paginate the
1001 // scan, to avoid a potentially unlimited memory spike.
1002 lo, hi := r.raftLog.applied+1, r.raftLog.committed+1
1003 // Reuse the maxApplyingEntsSize limit because it is used for similar purposes
1004 // (limiting the read of unapplied committed entries) when raft sends entries
1005 // via the Ready struct for application.
1006 // TODO(pavelkalinnikov): find a way to budget memory/bandwidth for this scan
1007 // outside the raft package.
1008 pageSize := r.raftLog.maxApplyingEntsSize
1009 if err := r.raftLog.scan(lo, hi, pageSize, func(ents []pb.Entry) error {
1010 for i := range ents {
1011 if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 {
1012 found = true
1013 return errBreak
1014 }
1015 }
1016 return nil
1017 }); err != nil && err != errBreak {
1018 r.logger.Panicf("error scanning unapplied entries [%d, %d): %v", lo, hi, err)
1019 }
1020 return found
1021}
1022
1023// campaign transitions the raft instance to candidate state. This must only be
1024// called after verifying that this is a legitimate transition.
1025func (r *raft) campaign(t CampaignType) {
1026 if !r.promotable() {
1027 // This path should not be hit (callers are supposed to check), but
1028 // better safe than sorry.
1029 r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
1030 }
1031 var term uint64
1032 var voteMsg pb.MessageType
1033 if t == campaignPreElection {
1034 r.becomePreCandidate()
1035 voteMsg = pb.MsgPreVote
1036 // PreVote RPCs are sent for the next term before we've incremented r.Term.
1037 term = r.Term + 1
1038 } else {
1039 r.becomeCandidate()
1040 voteMsg = pb.MsgVote
1041 term = r.Term
1042 }
1043 var ids []uint64
1044 {
1045 idMap := r.trk.Voters.IDs()
1046 ids = make([]uint64, 0, len(idMap))
1047 for id := range idMap {
1048 ids = append(ids, id)
1049 }
1050 slices.Sort(ids)
1051 }
1052 for _, id := range ids {
1053 if id == r.id {
1054 // The candidate votes for itself and should account for this self
1055 // vote once the vote has been durably persisted (since it doesn't
1056 // send a MsgVote to itself). This response message will be added to
1057 // msgsAfterAppend and delivered back to this node after the vote
1058 // has been written to stable storage.
1059 r.send(pb.Message{To: id, Term: term, Type: voteRespMsgType(voteMsg)})
1060 continue
1061 }
1062 // TODO(pav-kv): it should be ok to simply print %+v for the lastEntryID.
1063 last := r.raftLog.lastEntryID()
1064 r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
1065 r.id, last.term, last.index, voteMsg, id, r.Term)
1066
1067 var ctx []byte
1068 if t == campaignTransfer {
1069 ctx = []byte(t)
1070 }
1071 r.send(pb.Message{To: id, Term: term, Type: voteMsg, Index: last.index, LogTerm: last.term, Context: ctx})
1072 }
1073}
1074
1075func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
1076 if v {
1077 r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
1078 } else {
1079 r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
1080 }
1081 r.trk.RecordVote(id, v)
1082 return r.trk.TallyVotes()
1083}
1084
1085func (r *raft) Step(m pb.Message) error {
1086 traceReceiveMessage(r, &m)
1087
1088 // Handle the message term, which may result in our stepping down to a follower.
1089 switch {
1090 case m.Term == 0:
1091 // local message
1092 case m.Term > r.Term:
1093 if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
1094 force := bytes.Equal(m.Context, []byte(campaignTransfer))
1095 inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
1096 if !force && inLease {
1097 // If a server receives a RequestVote request within the minimum election timeout
1098 // of hearing from a current leader, it does not update its term or grant its vote
1099 last := r.raftLog.lastEntryID()
1100 // TODO(pav-kv): it should be ok to simply print the %+v of the lastEntryID.
1101 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
1102 r.id, last.term, last.index, r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
1103 return nil
1104 }
1105 }
1106 switch {
1107 case m.Type == pb.MsgPreVote:
1108 // Never change our term in response to a PreVote
1109 case m.Type == pb.MsgPreVoteResp && !m.Reject:
1110 // We send pre-vote requests with a term in our future. If the
1111 // pre-vote is granted, we will increment our term when we get a
1112 // quorum. If it is not, the term comes from the node that
1113 // rejected our vote so we should become a follower at the new
1114 // term.
1115 default:
1116 r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
1117 r.id, r.Term, m.Type, m.From, m.Term)
1118 if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
1119 r.becomeFollower(m.Term, m.From)
1120 } else {
1121 r.becomeFollower(m.Term, None)
1122 }
1123 }
1124
1125 case m.Term < r.Term:
1126 if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
1127 // We have received messages from a leader at a lower term. It is possible
1128 // that these messages were simply delayed in the network, but this could
1129 // also mean that this node has advanced its term number during a network
1130 // partition, and it is now unable to either win an election or to rejoin
1131 // the majority on the old term. If checkQuorum is false, this will be
1132 // handled by incrementing term numbers in response to MsgVote with a
1133 // higher term, but if checkQuorum is true we may not advance the term on
1134 // MsgVote and must generate other messages to advance the term. The net
1135 // result of these two features is to minimize the disruption caused by
1136 // nodes that have been removed from the cluster's configuration: a
1137 // removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
1138 // but it will not receive MsgApp or MsgHeartbeat, so it will not create
1139 // disruptive term increases, by notifying leader of this node's activeness.
1140 // The above comments also true for Pre-Vote
1141 //
1142 // When follower gets isolated, it soon starts an election ending
1143 // up with a higher term than leader, although it won't receive enough
1144 // votes to win the election. When it regains connectivity, this response
1145 // with "pb.MsgAppResp" of higher term would force leader to step down.
1146 // However, this disruption is inevitable to free this stuck node with
1147 // fresh election. This can be prevented with Pre-Vote phase.
1148 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
1149 } else if m.Type == pb.MsgPreVote {
1150 // Before Pre-Vote enable, there may have candidate with higher term,
1151 // but less log. After update to Pre-Vote, the cluster may deadlock if
1152 // we drop messages with a lower term.
1153 last := r.raftLog.lastEntryID()
1154 // TODO(pav-kv): it should be ok to simply print %+v of the lastEntryID.
1155 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
1156 r.id, last.term, last.index, r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
1157 r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
1158 } else if m.Type == pb.MsgStorageAppendResp {
1159 if m.Index != 0 {
1160 // Don't consider the appended log entries to be stable because
1161 // they may have been overwritten in the unstable log during a
1162 // later term. See the comment in newStorageAppendResp for more
1163 // about this race.
1164 r.logger.Infof("%x [term: %d] ignored entry appends from a %s message with lower term [term: %d]",
1165 r.id, r.Term, m.Type, m.Term)
1166 }
1167 if m.Snapshot != nil {
1168 // Even if the snapshot applied under a different term, its
1169 // application is still valid. Snapshots carry committed
1170 // (term-independent) state.
1171 r.appliedSnap(m.Snapshot)
1172 }
1173 } else {
1174 // ignore other cases
1175 r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
1176 r.id, r.Term, m.Type, m.From, m.Term)
1177 }
1178 return nil
1179 }
1180
1181 switch m.Type {
1182 case pb.MsgHup:
1183 if r.preVote {
1184 r.hup(campaignPreElection)
1185 } else {
1186 r.hup(campaignElection)
1187 }
1188
1189 case pb.MsgStorageAppendResp:
1190 if m.Index != 0 {
1191 r.raftLog.stableTo(entryID{term: m.LogTerm, index: m.Index})
1192 }
1193 if m.Snapshot != nil {
1194 r.appliedSnap(m.Snapshot)
1195 }
1196
1197 case pb.MsgStorageApplyResp:
1198 if len(m.Entries) > 0 {
1199 index := m.Entries[len(m.Entries)-1].Index
1200 r.appliedTo(index, entsSize(m.Entries))
1201 r.reduceUncommittedSize(payloadsSize(m.Entries))
1202 }
1203
1204 case pb.MsgVote, pb.MsgPreVote:
1205 // We can vote if this is a repeat of a vote we've already cast...
1206 canVote := r.Vote == m.From ||
1207 // ...we haven't voted and we don't think there's a leader yet in this term...
1208 (r.Vote == None && r.lead == None) ||
1209 // ...or this is a PreVote for a future term...
1210 (m.Type == pb.MsgPreVote && m.Term > r.Term)
1211 // ...and we believe the candidate is up to date.
1212 lastID := r.raftLog.lastEntryID()
1213 candLastID := entryID{term: m.LogTerm, index: m.Index}
1214 if canVote && r.raftLog.isUpToDate(candLastID) {
1215 // Note: it turns out that that learners must be allowed to cast votes.
1216 // This seems counter- intuitive but is necessary in the situation in which
1217 // a learner has been promoted (i.e. is now a voter) but has not learned
1218 // about this yet.
1219 // For example, consider a group in which id=1 is a learner and id=2 and
1220 // id=3 are voters. A configuration change promoting 1 can be committed on
1221 // the quorum `{2,3}` without the config change being appended to the
1222 // learner's log. If the leader (say 2) fails, there are de facto two
1223 // voters remaining. Only 3 can win an election (due to its log containing
1224 // all committed entries), but to do so it will need 1 to vote. But 1
1225 // considers itself a learner and will continue to do so until 3 has
1226 // stepped up as leader, replicates the conf change to 1, and 1 applies it.
1227 // Ultimately, by receiving a request to vote, the learner realizes that
1228 // the candidate believes it to be a voter, and that it should act
1229 // accordingly. The candidate's config may be stale, too; but in that case
1230 // it won't win the election, at least in the absence of the bug discussed
1231 // in:
1232 // https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
1233 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
1234 r.id, lastID.term, lastID.index, r.Vote, m.Type, m.From, candLastID.term, candLastID.index, r.Term)
1235 // When responding to Msg{Pre,}Vote messages we include the term
1236 // from the message, not the local term. To see why, consider the
1237 // case where a single node was previously partitioned away and
1238 // it's local term is now out of date. If we include the local term
1239 // (recall that for pre-votes we don't update the local term), the
1240 // (pre-)campaigning node on the other end will proceed to ignore
1241 // the message (it ignores all out of date messages).
1242 // The term in the original message and current local term are the
1243 // same in the case of regular votes, but different for pre-votes.
1244 r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
1245 if m.Type == pb.MsgVote {
1246 // Only record real votes.
1247 r.electionElapsed = 0
1248 r.Vote = m.From
1249 }
1250 } else {
1251 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
1252 r.id, lastID.term, lastID.index, r.Vote, m.Type, m.From, candLastID.term, candLastID.index, r.Term)
1253 r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
1254 }
1255
1256 default:
1257 err := r.step(r, m)
1258 if err != nil {
1259 return err
1260 }
1261 }
1262 return nil
1263}
1264
1265type stepFunc func(r *raft, m pb.Message) error
1266
1267func stepLeader(r *raft, m pb.Message) error {
1268 // These message types do not require any progress for m.From.
1269 switch m.Type {
1270 case pb.MsgBeat:
1271 r.bcastHeartbeat()
1272 return nil
1273 case pb.MsgCheckQuorum:
1274 if !r.trk.QuorumActive() {
1275 r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
1276 r.becomeFollower(r.Term, None)
1277 }
1278 // Mark everyone (but ourselves) as inactive in preparation for the next
1279 // CheckQuorum.
1280 r.trk.Visit(func(id uint64, pr *tracker.Progress) {
1281 if id != r.id {
1282 pr.RecentActive = false
1283 }
1284 })
1285 return nil
1286 case pb.MsgProp:
1287 if len(m.Entries) == 0 {
1288 r.logger.Panicf("%x stepped empty MsgProp", r.id)
1289 }
1290 if r.trk.Progress[r.id] == nil {
1291 // If we are not currently a member of the range (i.e. this node
1292 // was removed from the configuration while serving as leader),
1293 // drop any new proposals.
1294 return ErrProposalDropped
1295 }
1296 if r.leadTransferee != None {
1297 r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
1298 return ErrProposalDropped
1299 }
1300
1301 for i := range m.Entries {
1302 e := &m.Entries[i]
1303 var cc pb.ConfChangeI
1304 if e.Type == pb.EntryConfChange {
1305 var ccc pb.ConfChange
1306 if err := ccc.Unmarshal(e.Data); err != nil {
1307 panic(err)
1308 }
1309 cc = ccc
1310 } else if e.Type == pb.EntryConfChangeV2 {
1311 var ccc pb.ConfChangeV2
1312 if err := ccc.Unmarshal(e.Data); err != nil {
1313 panic(err)
1314 }
1315 cc = ccc
1316 }
1317 if cc != nil {
1318 alreadyPending := r.pendingConfIndex > r.raftLog.applied
1319 alreadyJoint := len(r.trk.Config.Voters[1]) > 0
1320 wantsLeaveJoint := len(cc.AsV2().Changes) == 0
1321
1322 var failedCheck string
1323 if alreadyPending {
1324 failedCheck = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
1325 } else if alreadyJoint && !wantsLeaveJoint {
1326 failedCheck = "must transition out of joint config first"
1327 } else if !alreadyJoint && wantsLeaveJoint {
1328 failedCheck = "not in joint state; refusing empty conf change"
1329 }
1330
1331 if failedCheck != "" && !r.disableConfChangeValidation {
1332 r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.trk.Config, failedCheck)
1333 m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
1334 } else {
1335 r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
1336 traceChangeConfEvent(cc, r)
1337 }
1338 }
1339 }
1340
1341 if !r.appendEntry(m.Entries...) {
1342 return ErrProposalDropped
1343 }
1344 r.bcastAppend()
1345 return nil
1346 case pb.MsgReadIndex:
1347 // only one voting member (the leader) in the cluster
1348 if r.trk.IsSingleton() {
1349 if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
1350 r.send(resp)
1351 }
1352 return nil
1353 }
1354
1355 // Postpone read only request when this leader has not committed
1356 // any log entry at its term.
1357 if !r.committedEntryInCurrentTerm() {
1358 r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m)
1359 return nil
1360 }
1361
1362 sendMsgReadIndexResponse(r, m)
1363
1364 return nil
1365 case pb.MsgForgetLeader:
1366 return nil // noop on leader
1367 }
1368
1369 // All other message types require a progress for m.From (pr).
1370 pr := r.trk.Progress[m.From]
1371 if pr == nil {
1372 r.logger.Debugf("%x no progress available for %x", r.id, m.From)
1373 return nil
1374 }
1375 switch m.Type {
1376 case pb.MsgAppResp:
1377 // NB: this code path is also hit from (*raft).advance, where the leader steps
1378 // an MsgAppResp to acknowledge the appended entries in the last Ready.
1379
1380 pr.RecentActive = true
1381
1382 if m.Reject {
1383 // RejectHint is the suggested next base entry for appending (i.e.
1384 // we try to append entry RejectHint+1 next), and LogTerm is the
1385 // term that the follower has at index RejectHint. Older versions
1386 // of this library did not populate LogTerm for rejections and it
1387 // is zero for followers with an empty log.
1388 //
1389 // Under normal circumstances, the leader's log is longer than the
1390 // follower's and the follower's log is a prefix of the leader's
1391 // (i.e. there is no divergent uncommitted suffix of the log on the
1392 // follower). In that case, the first probe reveals where the
1393 // follower's log ends (RejectHint=follower's last index) and the
1394 // subsequent probe succeeds.
1395 //
1396 // However, when networks are partitioned or systems overloaded,
1397 // large divergent log tails can occur. The naive attempt, probing
1398 // entry by entry in decreasing order, will be the product of the
1399 // length of the diverging tails and the network round-trip latency,
1400 // which can easily result in hours of time spent probing and can
1401 // even cause outright outages. The probes are thus optimized as
1402 // described below.
1403 r.logger.Debugf("%x received MsgAppResp(rejected, hint: (index %d, term %d)) from %x for index %d",
1404 r.id, m.RejectHint, m.LogTerm, m.From, m.Index)
1405 nextProbeIdx := m.RejectHint
1406 if m.LogTerm > 0 {
1407 // If the follower has an uncommitted log tail, we would end up
1408 // probing one by one until we hit the common prefix.
1409 //
1410 // For example, if the leader has:
1411 //
1412 // idx 1 2 3 4 5 6 7 8 9
1413 // -----------------
1414 // term (L) 1 3 3 3 5 5 5 5 5
1415 // term (F) 1 1 1 1 2 2
1416 //
1417 // Then, after sending an append anchored at (idx=9,term=5) we
1418 // would receive a RejectHint of 6 and LogTerm of 2. Without the
1419 // code below, we would try an append at index 6, which would
1420 // fail again.
1421 //
1422 // However, looking only at what the leader knows about its own
1423 // log and the rejection hint, it is clear that a probe at index
1424 // 6, 5, 4, 3, and 2 must fail as well:
1425 //
1426 // For all of these indexes, the leader's log term is larger than
1427 // the rejection's log term. If a probe at one of these indexes
1428 // succeeded, its log term at that index would match the leader's,
1429 // i.e. 3 or 5 in this example. But the follower already told the
1430 // leader that it is still at term 2 at index 6, and since the
1431 // log term only ever goes up (within a log), this is a contradiction.
1432 //
1433 // At index 1, however, the leader can draw no such conclusion,
1434 // as its term 1 is not larger than the term 2 from the
1435 // follower's rejection. We thus probe at 1, which will succeed
1436 // in this example. In general, with this approach we probe at
1437 // most once per term found in the leader's log.
1438 //
1439 // There is a similar mechanism on the follower (implemented in
1440 // handleAppendEntries via a call to findConflictByTerm) that is
1441 // useful if the follower has a large divergent uncommitted log
1442 // tail[1], as in this example:
1443 //
1444 // idx 1 2 3 4 5 6 7 8 9
1445 // -----------------
1446 // term (L) 1 3 3 3 3 3 3 3 7
1447 // term (F) 1 3 3 4 4 5 5 5 6
1448 //
1449 // Naively, the leader would probe at idx=9, receive a rejection
1450 // revealing the log term of 6 at the follower. Since the leader's
1451 // term at the previous index is already smaller than 6, the leader-
1452 // side optimization discussed above is ineffective. The leader thus
1453 // probes at index 8 and, naively, receives a rejection for the same
1454 // index and log term 5. Again, the leader optimization does not improve
1455 // over linear probing as term 5 is above the leader's term 3 for that
1456 // and many preceding indexes; the leader would have to probe linearly
1457 // until it would finally hit index 3, where the probe would succeed.
1458 //
1459 // Instead, we apply a similar optimization on the follower. When the
1460 // follower receives the probe at index 8 (log term 3), it concludes
1461 // that all of the leader's log preceding that index has log terms of
1462 // 3 or below. The largest index in the follower's log with a log term
1463 // of 3 or below is index 3. The follower will thus return a rejection
1464 // for index=3, log term=3 instead. The leader's next probe will then
1465 // succeed at that index.
1466 //
1467 // [1]: more precisely, if the log terms in the large uncommitted
1468 // tail on the follower are larger than the leader's. At first,
1469 // it may seem unintuitive that a follower could even have such
1470 // a large tail, but it can happen:
1471 //
1472 // 1. Leader appends (but does not commit) entries 2 and 3, crashes.
1473 // idx 1 2 3 4 5 6 7 8 9
1474 // -----------------
1475 // term (L) 1 2 2 [crashes]
1476 // term (F) 1
1477 // term (F) 1
1478 //
1479 // 2. a follower becomes leader and appends entries at term 3.
1480 // -----------------
1481 // term (x) 1 2 2 [down]
1482 // term (F) 1 3 3 3 3
1483 // term (F) 1
1484 //
1485 // 3. term 3 leader goes down, term 2 leader returns as term 4
1486 // leader. It commits the log & entries at term 4.
1487 //
1488 // -----------------
1489 // term (L) 1 2 2 2
1490 // term (x) 1 3 3 3 3 [down]
1491 // term (F) 1
1492 // -----------------
1493 // term (L) 1 2 2 2 4 4 4
1494 // term (F) 1 3 3 3 3 [gets probed]
1495 // term (F) 1 2 2 2 4 4 4
1496 //
1497 // 4. the leader will now probe the returning follower at index
1498 // 7, the rejection points it at the end of the follower's log
1499 // which is at a higher log term than the actually committed
1500 // log.
1501 nextProbeIdx, _ = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
1502 }
1503 if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
1504 r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
1505 if pr.State == tracker.StateReplicate {
1506 pr.BecomeProbe()
1507 }
1508 r.sendAppend(m.From)
1509 }
1510 } else {
1511 // We want to update our tracking if the response updates our
1512 // matched index or if the response can move a probing peer back
1513 // into StateReplicate (see heartbeat_rep_recovers_from_probing.txt
1514 // for an example of the latter case).
1515 // NB: the same does not make sense for StateSnapshot - if `m.Index`
1516 // equals pr.Match we know we don't m.Index+1 in our log, so moving
1517 // back to replicating state is not useful; besides pr.PendingSnapshot
1518 // would prevent it.
1519 if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) {
1520 switch {
1521 case pr.State == tracker.StateProbe:
1522 pr.BecomeReplicate()
1523 case pr.State == tracker.StateSnapshot && pr.Match+1 >= r.raftLog.firstIndex():
1524 // Note that we don't take into account PendingSnapshot to
1525 // enter this branch. No matter at which index a snapshot
1526 // was actually applied, as long as this allows catching up
1527 // the follower from the log, we will accept it. This gives
1528 // systems more flexibility in how they implement snapshots;
1529 // see the comments on PendingSnapshot.
1530 r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1531 // Transition back to replicating state via probing state
1532 // (which takes the snapshot into account). If we didn't
1533 // move to replicating state, that would only happen with
1534 // the next round of appends (but there may not be a next
1535 // round for a while, exposing an inconsistent RaftStatus).
1536 pr.BecomeProbe()
1537 pr.BecomeReplicate()
1538 case pr.State == tracker.StateReplicate:
1539 pr.Inflights.FreeLE(m.Index)
1540 }
1541
1542 if r.maybeCommit() {
1543 // committed index has progressed for the term, so it is safe
1544 // to respond to pending read index requests
1545 releasePendingReadIndexMessages(r)
1546 r.bcastAppend()
1547 } else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
1548 // This node may be missing the latest commit index, so send it.
1549 // NB: this is not strictly necessary because the periodic heartbeat
1550 // messages deliver commit indices too. However, a message sent now
1551 // may arrive earlier than the next heartbeat fires.
1552 r.sendAppend(m.From)
1553 }
1554 // We've updated flow control information above, which may
1555 // allow us to send multiple (size-limited) in-flight messages
1556 // at once (such as when transitioning from probe to
1557 // replicate, or when freeTo() covers multiple messages). If
1558 // we have more entries to send, send as many messages as we
1559 // can (without sending empty messages for the commit index)
1560 if r.id != m.From {
1561 for r.maybeSendAppend(m.From, false /* sendIfEmpty */) {
1562 }
1563 }
1564 // Transfer leadership is in progress.
1565 if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
1566 r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
1567 r.sendTimeoutNow(m.From)
1568 }
1569 }
1570 }
1571 case pb.MsgHeartbeatResp:
1572 pr.RecentActive = true
1573 pr.MsgAppFlowPaused = false
1574
1575 // NB: if the follower is paused (full Inflights), this will still send an
1576 // empty append, allowing it to recover from situations in which all the
1577 // messages that filled up Inflights in the first place were dropped. Note
1578 // also that the outgoing heartbeat already communicated the commit index.
1579 //
1580 // If the follower is fully caught up but also in StateProbe (as can happen
1581 // if ReportUnreachable was called), we also want to send an append (it will
1582 // be empty) to allow the follower to transition back to StateReplicate once
1583 // it responds.
1584 //
1585 // Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
1586 // `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
1587 // no-op.
1588 if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
1589 r.sendAppend(m.From)
1590 }
1591
1592 if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
1593 return nil
1594 }
1595
1596 if r.trk.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
1597 return nil
1598 }
1599
1600 rss := r.readOnly.advance(m)
1601 for _, rs := range rss {
1602 if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
1603 r.send(resp)
1604 }
1605 }
1606 case pb.MsgSnapStatus:
1607 if pr.State != tracker.StateSnapshot {
1608 return nil
1609 }
1610 if !m.Reject {
1611 pr.BecomeProbe()
1612 r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1613 } else {
1614 // NB: the order here matters or we'll be probing erroneously from
1615 // the snapshot index, but the snapshot never applied.
1616 pr.PendingSnapshot = 0
1617 pr.BecomeProbe()
1618 r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1619 }
1620 // If snapshot finish, wait for the MsgAppResp from the remote node before sending
1621 // out the next MsgApp.
1622 // If snapshot failure, wait for a heartbeat interval before next try
1623 pr.MsgAppFlowPaused = true
1624 case pb.MsgUnreachable:
1625 // During optimistic replication, if the remote becomes unreachable,
1626 // there is huge probability that a MsgApp is lost.
1627 if pr.State == tracker.StateReplicate {
1628 pr.BecomeProbe()
1629 }
1630 r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
1631 case pb.MsgTransferLeader:
1632 if pr.IsLearner {
1633 r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
1634 return nil
1635 }
1636 leadTransferee := m.From
1637 lastLeadTransferee := r.leadTransferee
1638 if lastLeadTransferee != None {
1639 if lastLeadTransferee == leadTransferee {
1640 r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
1641 r.id, r.Term, leadTransferee, leadTransferee)
1642 return nil
1643 }
1644 r.abortLeaderTransfer()
1645 r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
1646 }
1647 if leadTransferee == r.id {
1648 r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
1649 return nil
1650 }
1651 // Transfer leadership to third party.
1652 r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
1653 // Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
1654 r.electionElapsed = 0
1655 r.leadTransferee = leadTransferee
1656 if pr.Match == r.raftLog.lastIndex() {
1657 r.sendTimeoutNow(leadTransferee)
1658 r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
1659 } else {
1660 r.sendAppend(leadTransferee)
1661 }
1662 }
1663 return nil
1664}
1665
1666// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
1667// whether they respond to MsgVoteResp or MsgPreVoteResp.
1668func stepCandidate(r *raft, m pb.Message) error {
1669 // Only handle vote responses corresponding to our candidacy (while in
1670 // StateCandidate, we may get stale MsgPreVoteResp messages in this term from
1671 // our pre-candidate state).
1672 var myVoteRespType pb.MessageType
1673 if r.state == StatePreCandidate {
1674 myVoteRespType = pb.MsgPreVoteResp
1675 } else {
1676 myVoteRespType = pb.MsgVoteResp
1677 }
1678 switch m.Type {
1679 case pb.MsgProp:
1680 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
1681 return ErrProposalDropped
1682 case pb.MsgApp:
1683 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1684 r.handleAppendEntries(m)
1685 case pb.MsgHeartbeat:
1686 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1687 r.handleHeartbeat(m)
1688 case pb.MsgSnap:
1689 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1690 r.handleSnapshot(m)
1691 case myVoteRespType:
1692 gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
1693 r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
1694 switch res {
1695 case quorum.VoteWon:
1696 if r.state == StatePreCandidate {
1697 r.campaign(campaignElection)
1698 } else {
1699 r.becomeLeader()
1700 r.bcastAppend()
1701 }
1702 case quorum.VoteLost:
1703 // pb.MsgPreVoteResp contains future term of pre-candidate
1704 // m.Term > r.Term; reuse r.Term
1705 r.becomeFollower(r.Term, None)
1706 }
1707 case pb.MsgTimeoutNow:
1708 r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
1709 }
1710 return nil
1711}
1712
1713func stepFollower(r *raft, m pb.Message) error {
1714 switch m.Type {
1715 case pb.MsgProp:
1716 if r.lead == None {
1717 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
1718 return ErrProposalDropped
1719 } else if r.disableProposalForwarding {
1720 r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
1721 return ErrProposalDropped
1722 }
1723 m.To = r.lead
1724 r.send(m)
1725 case pb.MsgApp:
1726 r.electionElapsed = 0
1727 r.lead = m.From
1728 r.handleAppendEntries(m)
1729 case pb.MsgHeartbeat:
1730 r.electionElapsed = 0
1731 r.lead = m.From
1732 r.handleHeartbeat(m)
1733 case pb.MsgSnap:
1734 r.electionElapsed = 0
1735 r.lead = m.From
1736 r.handleSnapshot(m)
1737 case pb.MsgTransferLeader:
1738 if r.lead == None {
1739 r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
1740 return nil
1741 }
1742 m.To = r.lead
1743 r.send(m)
1744 case pb.MsgForgetLeader:
1745 if r.readOnly.option == ReadOnlyLeaseBased {
1746 r.logger.Error("ignoring MsgForgetLeader due to ReadOnlyLeaseBased")
1747 return nil
1748 }
1749 if r.lead != None {
1750 r.logger.Infof("%x forgetting leader %x at term %d", r.id, r.lead, r.Term)
1751 r.lead = None
1752 }
1753 case pb.MsgTimeoutNow:
1754 r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
1755 // Leadership transfers never use pre-vote even if r.preVote is true; we
1756 // know we are not recovering from a partition so there is no need for the
1757 // extra round trip.
1758 r.hup(campaignTransfer)
1759 case pb.MsgReadIndex:
1760 if r.lead == None {
1761 r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
1762 return nil
1763 }
1764 m.To = r.lead
1765 r.send(m)
1766 case pb.MsgReadIndexResp:
1767 if len(m.Entries) != 1 {
1768 r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
1769 return nil
1770 }
1771 r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
1772 }
1773 return nil
1774}
1775
1776// logSliceFromMsgApp extracts the appended logSlice from a MsgApp message.
1777func logSliceFromMsgApp(m *pb.Message) logSlice {
1778 // TODO(pav-kv): consider also validating the logSlice here.
1779 return logSlice{
1780 term: m.Term,
1781 prev: entryID{term: m.LogTerm, index: m.Index},
1782 entries: m.Entries,
1783 }
1784}
1785
1786func (r *raft) handleAppendEntries(m pb.Message) {
1787 // TODO(pav-kv): construct logSlice up the stack next to receiving the
1788 // message, and validate it before taking any action (e.g. bumping term).
1789 a := logSliceFromMsgApp(&m)
1790
1791 if a.prev.index < r.raftLog.committed {
1792 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
1793 return
1794 }
1795 if mlastIndex, ok := r.raftLog.maybeAppend(a, m.Commit); ok {
1796 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
1797 return
1798 }
1799 r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
1800 r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
1801
1802 // Our log does not match the leader's at index m.Index. Return a hint to the
1803 // leader - a guess on the maximal (index, term) at which the logs match. Do
1804 // this by searching through the follower's log for the maximum (index, term)
1805 // pair with a term <= the MsgApp's LogTerm and an index <= the MsgApp's
1806 // Index. This can help skip all indexes in the follower's uncommitted tail
1807 // with terms greater than the MsgApp's LogTerm.
1808 //
1809 // See the other caller for findConflictByTerm (in stepLeader) for a much more
1810 // detailed explanation of this mechanism.
1811
1812 // NB: m.Index >= raftLog.committed by now (see the early return above), and
1813 // raftLog.lastIndex() >= raftLog.committed by invariant, so min of the two is
1814 // also >= raftLog.committed. Hence, the findConflictByTerm argument is within
1815 // the valid interval, which then will return a valid (index, term) pair with
1816 // a non-zero term (unless the log is empty). However, it is safe to send a zero
1817 // LogTerm in this response in any case, so we don't verify it here.
1818 hintIndex := min(m.Index, r.raftLog.lastIndex())
1819 hintIndex, hintTerm := r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
1820 r.send(pb.Message{
1821 To: m.From,
1822 Type: pb.MsgAppResp,
1823 Index: m.Index,
1824 Reject: true,
1825 RejectHint: hintIndex,
1826 LogTerm: hintTerm,
1827 })
1828}
1829
1830func (r *raft) handleHeartbeat(m pb.Message) {
1831 r.raftLog.commitTo(m.Commit)
1832 r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
1833}
1834
1835func (r *raft) handleSnapshot(m pb.Message) {
1836 // MsgSnap messages should always carry a non-nil Snapshot, but err on the
1837 // side of safety and treat a nil Snapshot as a zero-valued Snapshot.
1838 var s pb.Snapshot
1839 if m.Snapshot != nil {
1840 s = *m.Snapshot
1841 }
1842 sindex, sterm := s.Metadata.Index, s.Metadata.Term
1843 if r.restore(s) {
1844 r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
1845 r.id, r.raftLog.committed, sindex, sterm)
1846 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
1847 } else {
1848 r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
1849 r.id, r.raftLog.committed, sindex, sterm)
1850 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
1851 }
1852}
1853
1854// restore recovers the state machine from a snapshot. It restores the log and the
1855// configuration of state machine. If this method returns false, the snapshot was
1856// ignored, either because it was obsolete or because of an error.
1857func (r *raft) restore(s pb.Snapshot) bool {
1858 if s.Metadata.Index <= r.raftLog.committed {
1859 return false
1860 }
1861 if r.state != StateFollower {
1862 // This is defense-in-depth: if the leader somehow ended up applying a
1863 // snapshot, it could move into a new term without moving into a
1864 // follower state. This should never fire, but if it did, we'd have
1865 // prevented damage by returning early, so log only a loud warning.
1866 //
1867 // At the time of writing, the instance is guaranteed to be in follower
1868 // state when this method is called.
1869 r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
1870 r.becomeFollower(r.Term+1, None)
1871 return false
1872 }
1873
1874 // More defense-in-depth: throw away snapshot if recipient is not in the
1875 // config. This shouldn't ever happen (at the time of writing) but lots of
1876 // code here and there assumes that r.id is in the progress tracker.
1877 found := false
1878 cs := s.Metadata.ConfState
1879
1880 for _, set := range [][]uint64{
1881 cs.Voters,
1882 cs.Learners,
1883 cs.VotersOutgoing,
1884 // `LearnersNext` doesn't need to be checked. According to the rules, if a peer in
1885 // `LearnersNext`, it has to be in `VotersOutgoing`.
1886 } {
1887 for _, id := range set {
1888 if id == r.id {
1889 found = true
1890 break
1891 }
1892 }
1893 if found {
1894 break
1895 }
1896 }
1897 if !found {
1898 r.logger.Warningf(
1899 "%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
1900 r.id, cs,
1901 )
1902 return false
1903 }
1904
1905 // Now go ahead and actually restore.
1906
1907 id := entryID{term: s.Metadata.Term, index: s.Metadata.Index}
1908 if r.raftLog.matchTerm(id) {
1909 // TODO(pav-kv): can print %+v of the id, but it will change the format.
1910 last := r.raftLog.lastEntryID()
1911 r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
1912 r.id, r.raftLog.committed, last.index, last.term, id.index, id.term)
1913 r.raftLog.commitTo(s.Metadata.Index)
1914 return false
1915 }
1916
1917 r.raftLog.restore(s)
1918
1919 // Reset the configuration and add the (potentially updated) peers in anew.
1920 r.trk = tracker.MakeProgressTracker(r.trk.MaxInflight, r.trk.MaxInflightBytes)
1921 cfg, trk, err := confchange.Restore(confchange.Changer{
1922 Tracker: r.trk,
1923 LastIndex: r.raftLog.lastIndex(),
1924 }, cs)
1925
1926 if err != nil {
1927 // This should never happen. Either there's a bug in our config change
1928 // handling or the client corrupted the conf change.
1929 panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err))
1930 }
1931
1932 assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, trk))
1933
1934 last := r.raftLog.lastEntryID()
1935 r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
1936 r.id, r.raftLog.committed, last.index, last.term, id.index, id.term)
1937 return true
1938}
1939
1940// promotable indicates whether state machine can be promoted to leader,
1941// which is true when its own id is in progress list.
1942func (r *raft) promotable() bool {
1943 pr := r.trk.Progress[r.id]
1944 return pr != nil && !pr.IsLearner && !r.raftLog.hasNextOrInProgressSnapshot()
1945}
1946
1947func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
1948 cfg, trk, err := func() (tracker.Config, tracker.ProgressMap, error) {
1949 changer := confchange.Changer{
1950 Tracker: r.trk,
1951 LastIndex: r.raftLog.lastIndex(),
1952 }
1953 if cc.LeaveJoint() {
1954 return changer.LeaveJoint()
1955 } else if autoLeave, ok := cc.EnterJoint(); ok {
1956 return changer.EnterJoint(autoLeave, cc.Changes...)
1957 }
1958 return changer.Simple(cc.Changes...)
1959 }()
1960
1961 if err != nil {
1962 // TODO(tbg): return the error to the caller.
1963 panic(err)
1964 }
1965
1966 return r.switchToConfig(cfg, trk)
1967}
1968
1969// switchToConfig reconfigures this node to use the provided configuration. It
1970// updates the in-memory state and, when necessary, carries out additional
1971// actions such as reacting to the removal of nodes or changed quorum
1972// requirements.
1973//
1974// The inputs usually result from restoring a ConfState or applying a ConfChange.
1975func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.ConfState {
1976 traceConfChangeEvent(cfg, r)
1977
1978 r.trk.Config = cfg
1979 r.trk.Progress = trk
1980
1981 r.logger.Infof("%x switched to configuration %s", r.id, r.trk.Config)
1982 cs := r.trk.ConfState()
1983 pr, ok := r.trk.Progress[r.id]
1984
1985 // Update whether the node itself is a learner, resetting to false when the
1986 // node is removed.
1987 r.isLearner = ok && pr.IsLearner
1988
1989 if (!ok || r.isLearner) && r.state == StateLeader {
1990 // This node is leader and was removed or demoted, step down if requested.
1991 //
1992 // We prevent demotions at the time writing but hypothetically we handle
1993 // them the same way as removing the leader.
1994 //
1995 // TODO(tbg): ask follower with largest Match to TimeoutNow (to avoid
1996 // interruption). This might still drop some proposals but it's better than
1997 // nothing.
1998 if r.stepDownOnRemoval {
1999 r.becomeFollower(r.Term, None)
2000 }
2001 return cs
2002 }
2003
2004 // The remaining steps only make sense if this node is the leader and there
2005 // are other nodes.
2006 if r.state != StateLeader || len(cs.Voters) == 0 {
2007 return cs
2008 }
2009
2010 if r.maybeCommit() {
2011 // If the configuration change means that more entries are committed now,
2012 // broadcast/append to everyone in the updated config.
2013 r.bcastAppend()
2014 } else {
2015 // Otherwise, still probe the newly added replicas; there's no reason to
2016 // let them wait out a heartbeat interval (or the next incoming
2017 // proposal).
2018 r.trk.Visit(func(id uint64, _ *tracker.Progress) {
2019 if id == r.id {
2020 return
2021 }
2022 r.maybeSendAppend(id, false /* sendIfEmpty */)
2023 })
2024 }
2025 // If the leadTransferee was removed or demoted, abort the leadership transfer.
2026 if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
2027 r.abortLeaderTransfer()
2028 }
2029
2030 return cs
2031}
2032
2033func (r *raft) loadState(state pb.HardState) {
2034 if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
2035 r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
2036 }
2037 r.raftLog.committed = state.Commit
2038 r.Term = state.Term
2039 r.Vote = state.Vote
2040}
2041
2042// pastElectionTimeout returns true if r.electionElapsed is greater
2043// than or equal to the randomized election timeout in
2044// [electiontimeout, 2 * electiontimeout - 1].
2045func (r *raft) pastElectionTimeout() bool {
2046 return r.electionElapsed >= r.randomizedElectionTimeout
2047}
2048
2049func (r *raft) resetRandomizedElectionTimeout() {
2050 r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
2051}
2052
2053func (r *raft) sendTimeoutNow(to uint64) {
2054 r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
2055}
2056
2057func (r *raft) abortLeaderTransfer() {
2058 r.leadTransferee = None
2059}
2060
2061// committedEntryInCurrentTerm return true if the peer has committed an entry in its term.
2062func (r *raft) committedEntryInCurrentTerm() bool {
2063 // NB: r.Term is never 0 on a leader, so if zeroTermOnOutOfBounds returns 0,
2064 // we won't see it as a match with r.Term.
2065 return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) == r.Term
2066}
2067
2068// responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer
2069// itself, a blank value will be returned.
2070func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
2071 if req.From == None || req.From == r.id {
2072 r.readStates = append(r.readStates, ReadState{
2073 Index: readIndex,
2074 RequestCtx: req.Entries[0].Data,
2075 })
2076 return pb.Message{}
2077 }
2078 return pb.Message{
2079 Type: pb.MsgReadIndexResp,
2080 To: req.From,
2081 Index: readIndex,
2082 Entries: req.Entries,
2083 }
2084}
2085
2086// increaseUncommittedSize computes the size of the proposed entries and
2087// determines whether they would push leader over its maxUncommittedSize limit.
2088// If the new entries would exceed the limit, the method returns false. If not,
2089// the increase in uncommitted entry size is recorded and the method returns
2090// true.
2091//
2092// Empty payloads are never refused. This is used both for appending an empty
2093// entry at a new leader's term, as well as leaving a joint configuration.
2094func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
2095 s := payloadsSize(ents)
2096 if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
2097 // If the uncommitted tail of the Raft log is empty, allow any size
2098 // proposal. Otherwise, limit the size of the uncommitted tail of the
2099 // log and drop any proposal that would push the size over the limit.
2100 // Note the added requirement s>0 which is used to make sure that
2101 // appending single empty entries to the log always succeeds, used both
2102 // for replicating a new leader's initial empty entry, and for
2103 // auto-leaving joint configurations.
2104 return false
2105 }
2106 r.uncommittedSize += s
2107 return true
2108}
2109
2110// reduceUncommittedSize accounts for the newly committed entries by decreasing
2111// the uncommitted entry size limit.
2112func (r *raft) reduceUncommittedSize(s entryPayloadSize) {
2113 if s > r.uncommittedSize {
2114 // uncommittedSize may underestimate the size of the uncommitted Raft
2115 // log tail but will never overestimate it. Saturate at 0 instead of
2116 // allowing overflow.
2117 r.uncommittedSize = 0
2118 } else {
2119 r.uncommittedSize -= s
2120 }
2121}
2122
2123func releasePendingReadIndexMessages(r *raft) {
2124 if len(r.pendingReadIndexMessages) == 0 {
2125 // Fast path for the common case to avoid a call to storage.LastIndex()
2126 // via committedEntryInCurrentTerm.
2127 return
2128 }
2129 if !r.committedEntryInCurrentTerm() {
2130 r.logger.Error("pending MsgReadIndex should be released only after first commit in current term")
2131 return
2132 }
2133
2134 msgs := r.pendingReadIndexMessages
2135 r.pendingReadIndexMessages = nil
2136
2137 for _, m := range msgs {
2138 sendMsgReadIndexResponse(r, m)
2139 }
2140}
2141
2142func sendMsgReadIndexResponse(r *raft, m pb.Message) {
2143 // thinking: use an internally defined context instead of the user given context.
2144 // We can express this in terms of the term and index instead of a user-supplied value.
2145 // This would allow multiple reads to piggyback on the same message.
2146 switch r.readOnly.option {
2147 // If more than the local vote is needed, go through a full broadcast.
2148 case ReadOnlySafe:
2149 r.readOnly.addRequest(r.raftLog.committed, m)
2150 // The local node automatically acks the request.
2151 r.readOnly.recvAck(r.id, m.Entries[0].Data)
2152 r.bcastHeartbeatWithCtx(m.Entries[0].Data)
2153 case ReadOnlyLeaseBased:
2154 if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
2155 r.send(resp)
2156 }
2157 }
2158}