blob: bd7c2febb61c56f7af2d703635cdbc55e30e2c25 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18 "fmt"
19
20 pb "go.etcd.io/raft/v3/raftpb"
21)
22
23type raftLog struct {
24 // storage contains all stable entries since the last snapshot.
25 storage Storage
26
27 // unstable contains all unstable entries and snapshot.
28 // they will be saved into storage.
29 unstable unstable
30
31 // committed is the highest log position that is known to be in
32 // stable storage on a quorum of nodes.
33 committed uint64
34 // applying is the highest log position that the application has
35 // been instructed to apply to its state machine. Some of these
36 // entries may be in the process of applying and have not yet
37 // reached applied.
38 // Use: The field is incremented when accepting a Ready struct.
39 // Invariant: applied <= applying && applying <= committed
40 applying uint64
41 // applied is the highest log position that the application has
42 // successfully applied to its state machine.
43 // Use: The field is incremented when advancing after the committed
44 // entries in a Ready struct have been applied (either synchronously
45 // or asynchronously).
46 // Invariant: applied <= committed
47 applied uint64
48
49 logger Logger
50
51 // maxApplyingEntsSize limits the outstanding byte size of the messages
52 // returned from calls to nextCommittedEnts that have not been acknowledged
53 // by a call to appliedTo.
54 maxApplyingEntsSize entryEncodingSize
55 // applyingEntsSize is the current outstanding byte size of the messages
56 // returned from calls to nextCommittedEnts that have not been acknowledged
57 // by a call to appliedTo.
58 applyingEntsSize entryEncodingSize
59 // applyingEntsPaused is true when entry application has been paused until
60 // enough progress is acknowledged.
61 applyingEntsPaused bool
62}
63
64// newLog returns log using the given storage and default options. It
65// recovers the log to the state that it just commits and applies the
66// latest snapshot.
67func newLog(storage Storage, logger Logger) *raftLog {
68 return newLogWithSize(storage, logger, noLimit)
69}
70
71// newLogWithSize returns a log using the given storage and max
72// message size.
73func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEncodingSize) *raftLog {
74 firstIndex, err := storage.FirstIndex()
75 if err != nil {
76 panic(err) // TODO(bdarnell)
77 }
78 lastIndex, err := storage.LastIndex()
79 if err != nil {
80 panic(err) // TODO(bdarnell)
81 }
82 return &raftLog{
83 storage: storage,
84 unstable: unstable{
85 offset: lastIndex + 1,
86 offsetInProgress: lastIndex + 1,
87 logger: logger,
88 },
89 maxApplyingEntsSize: maxApplyingEntsSize,
90
91 // Initialize our committed and applied pointers to the time of the last compaction.
92 committed: firstIndex - 1,
93 applying: firstIndex - 1,
94 applied: firstIndex - 1,
95
96 logger: logger,
97 }
98}
99
100func (l *raftLog) String() string {
101 return fmt.Sprintf("committed=%d, applied=%d, applying=%d, unstable.offset=%d, unstable.offsetInProgress=%d, len(unstable.Entries)=%d",
102 l.committed, l.applied, l.applying, l.unstable.offset, l.unstable.offsetInProgress, len(l.unstable.entries))
103}
104
105// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
106// it returns (last index of new entries, true).
107func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) {
108 if !l.matchTerm(a.prev) {
109 return 0, false
110 }
111 // TODO(pav-kv): propagate logSlice down the stack. It will be used all the
112 // way down in unstable, for safety checks, and for useful bookkeeping.
113
114 lastnewi = a.prev.index + uint64(len(a.entries))
115 ci := l.findConflict(a.entries)
116 switch {
117 case ci == 0:
118 case ci <= l.committed:
119 l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
120 default:
121 offset := a.prev.index + 1
122 if ci-offset > uint64(len(a.entries)) {
123 l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries))
124 }
125 l.append(a.entries[ci-offset:]...)
126 }
127 l.commitTo(min(committed, lastnewi))
128 return lastnewi, true
129}
130
131func (l *raftLog) append(ents ...pb.Entry) uint64 {
132 if len(ents) == 0 {
133 return l.lastIndex()
134 }
135 if after := ents[0].Index - 1; after < l.committed {
136 l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
137 }
138 l.unstable.truncateAndAppend(ents)
139 return l.lastIndex()
140}
141
142// findConflict finds the index of the conflict.
143// It returns the first pair of conflicting entries between the existing
144// entries and the given entries, if there are any.
145// If there is no conflicting entries, and the existing entries contains
146// all the given entries, zero will be returned.
147// If there is no conflicting entries, but the given entries contains new
148// entries, the index of the first new entry will be returned.
149// An entry is considered to be conflicting if it has the same index but
150// a different term.
151// The index of the given entries MUST be continuously increasing.
152func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
153 for i := range ents {
154 if id := pbEntryID(&ents[i]); !l.matchTerm(id) {
155 if id.index <= l.lastIndex() {
156 // TODO(pav-kv): can simply print %+v of the id. This will change the
157 // log format though.
158 l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
159 id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
160 }
161 return id.index
162 }
163 }
164 return 0
165}
166
167// findConflictByTerm returns a best guess on where this log ends matching
168// another log, given that the only information known about the other log is the
169// (index, term) of its single entry.
170//
171// Specifically, the first returned value is the max guessIndex <= index, such
172// that term(guessIndex) <= term or term(guessIndex) is not known (because this
173// index is compacted or not yet stored).
174//
175// The second returned value is the term(guessIndex), or 0 if it is unknown.
176//
177// This function is used by a follower and leader to resolve log conflicts after
178// an unsuccessful append to a follower, and ultimately restore the steady flow
179// of appends.
180func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64) {
181 for ; index > 0; index-- {
182 // If there is an error (likely ErrCompacted or ErrUnavailable), we don't
183 // know whether it's a match or not, so assume a possible match and return
184 // the index, with 0 term indicating an unknown term.
185 if ourTerm, err := l.term(index); err != nil {
186 return index, 0
187 } else if ourTerm <= term {
188 return index, ourTerm
189 }
190 }
191 return 0, 0
192}
193
194// nextUnstableEnts returns all entries that are available to be written to the
195// local stable log and are not already in-progress.
196func (l *raftLog) nextUnstableEnts() []pb.Entry {
197 return l.unstable.nextEntries()
198}
199
200// hasNextUnstableEnts returns if there are any entries that are available to be
201// written to the local stable log and are not already in-progress.
202func (l *raftLog) hasNextUnstableEnts() bool {
203 return len(l.nextUnstableEnts()) > 0
204}
205
206// hasNextOrInProgressUnstableEnts returns if there are any entries that are
207// available to be written to the local stable log or in the process of being
208// written to the local stable log.
209func (l *raftLog) hasNextOrInProgressUnstableEnts() bool {
210 return len(l.unstable.entries) > 0
211}
212
213// nextCommittedEnts returns all the available entries for execution.
214// Entries can be committed even when the local raft instance has not durably
215// appended them to the local raft log yet. If allowUnstable is true, committed
216// entries from the unstable log may be returned; otherwise, only entries known
217// to reside locally on stable storage will be returned.
218func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
219 if l.applyingEntsPaused {
220 // Entry application outstanding size limit reached.
221 return nil
222 }
223 if l.hasNextOrInProgressSnapshot() {
224 // See comment in hasNextCommittedEnts.
225 return nil
226 }
227 lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi)
228 if lo >= hi {
229 // Nothing to apply.
230 return nil
231 }
232 maxSize := l.maxApplyingEntsSize - l.applyingEntsSize
233 if maxSize <= 0 {
234 l.logger.Panicf("applying entry size (%d-%d)=%d not positive",
235 l.maxApplyingEntsSize, l.applyingEntsSize, maxSize)
236 }
237 ents, err := l.slice(lo, hi, maxSize)
238 if err != nil {
239 l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
240 }
241 return ents
242}
243
244// hasNextCommittedEnts returns if there is any available entries for execution.
245// This is a fast check without heavy raftLog.slice() in nextCommittedEnts().
246func (l *raftLog) hasNextCommittedEnts(allowUnstable bool) bool {
247 if l.applyingEntsPaused {
248 // Entry application outstanding size limit reached.
249 return false
250 }
251 if l.hasNextOrInProgressSnapshot() {
252 // If we have a snapshot to apply, don't also return any committed
253 // entries. Doing so raises questions about what should be applied
254 // first.
255 return false
256 }
257 lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi)
258 return lo < hi
259}
260
261// maxAppliableIndex returns the maximum committed index that can be applied.
262// If allowUnstable is true, committed entries from the unstable log can be
263// applied; otherwise, only entries known to reside locally on stable storage
264// can be applied.
265func (l *raftLog) maxAppliableIndex(allowUnstable bool) uint64 {
266 hi := l.committed
267 if !allowUnstable {
268 hi = min(hi, l.unstable.offset-1)
269 }
270 return hi
271}
272
273// nextUnstableSnapshot returns the snapshot, if present, that is available to
274// be applied to the local storage and is not already in-progress.
275func (l *raftLog) nextUnstableSnapshot() *pb.Snapshot {
276 return l.unstable.nextSnapshot()
277}
278
279// hasNextUnstableSnapshot returns if there is a snapshot that is available to
280// be applied to the local storage and is not already in-progress.
281func (l *raftLog) hasNextUnstableSnapshot() bool {
282 return l.unstable.nextSnapshot() != nil
283}
284
285// hasNextOrInProgressSnapshot returns if there is pending snapshot waiting for
286// applying or in the process of being applied.
287func (l *raftLog) hasNextOrInProgressSnapshot() bool {
288 return l.unstable.snapshot != nil
289}
290
291func (l *raftLog) snapshot() (pb.Snapshot, error) {
292 if l.unstable.snapshot != nil {
293 return *l.unstable.snapshot, nil
294 }
295 return l.storage.Snapshot()
296}
297
298func (l *raftLog) firstIndex() uint64 {
299 if i, ok := l.unstable.maybeFirstIndex(); ok {
300 return i
301 }
302 index, err := l.storage.FirstIndex()
303 if err != nil {
304 panic(err) // TODO(bdarnell)
305 }
306 return index
307}
308
309func (l *raftLog) lastIndex() uint64 {
310 if i, ok := l.unstable.maybeLastIndex(); ok {
311 return i
312 }
313 i, err := l.storage.LastIndex()
314 if err != nil {
315 panic(err) // TODO(bdarnell)
316 }
317 return i
318}
319
320func (l *raftLog) commitTo(tocommit uint64) {
321 // never decrease commit
322 if l.committed < tocommit {
323 if l.lastIndex() < tocommit {
324 l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
325 }
326 l.committed = tocommit
327 }
328}
329
330func (l *raftLog) appliedTo(i uint64, size entryEncodingSize) {
331 if l.committed < i || i < l.applied {
332 l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
333 }
334 l.applied = i
335 l.applying = max(l.applying, i)
336 if l.applyingEntsSize > size {
337 l.applyingEntsSize -= size
338 } else {
339 // Defense against underflow.
340 l.applyingEntsSize = 0
341 }
342 l.applyingEntsPaused = l.applyingEntsSize >= l.maxApplyingEntsSize
343}
344
345func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable bool) {
346 if l.committed < i {
347 l.logger.Panicf("applying(%d) is out of range [prevApplying(%d), committed(%d)]", i, l.applying, l.committed)
348 }
349 l.applying = i
350 l.applyingEntsSize += size
351 // Determine whether to pause entry application until some progress is
352 // acknowledged. We pause in two cases:
353 // 1. the outstanding entry size equals or exceeds the maximum size.
354 // 2. the outstanding entry size does not equal or exceed the maximum size,
355 // but we determine that the next entry in the log will push us over the
356 // limit. We determine this by comparing the last entry returned from
357 // raftLog.nextCommittedEnts to the maximum entry that the method was
358 // allowed to return had there been no size limit. If these indexes are
359 // not equal, then the returned entries slice must have been truncated to
360 // adhere to the memory limit.
361 l.applyingEntsPaused = l.applyingEntsSize >= l.maxApplyingEntsSize ||
362 i < l.maxAppliableIndex(allowUnstable)
363}
364
365func (l *raftLog) stableTo(id entryID) { l.unstable.stableTo(id) }
366
367func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
368
369// acceptUnstable indicates that the application has started persisting the
370// unstable entries in storage, and that the current unstable entries are thus
371// to be marked as being in-progress, to avoid returning them with future calls
372// to Ready().
373func (l *raftLog) acceptUnstable() { l.unstable.acceptInProgress() }
374
375// lastEntryID returns the ID of the last entry in the log.
376func (l *raftLog) lastEntryID() entryID {
377 index := l.lastIndex()
378 t, err := l.term(index)
379 if err != nil {
380 l.logger.Panicf("unexpected error when getting the last term at %d: %v", index, err)
381 }
382 return entryID{term: t, index: index}
383}
384
385func (l *raftLog) term(i uint64) (uint64, error) {
386 // Check the unstable log first, even before computing the valid term range,
387 // which may need to access stable Storage. If we find the entry's term in
388 // the unstable log, we know it was in the valid range.
389 if t, ok := l.unstable.maybeTerm(i); ok {
390 return t, nil
391 }
392
393 // The valid term range is [firstIndex-1, lastIndex]. Even though the entry at
394 // firstIndex-1 is compacted away, its term is available for matching purposes
395 // when doing log appends.
396 if i+1 < l.firstIndex() {
397 return 0, ErrCompacted
398 }
399 if i > l.lastIndex() {
400 return 0, ErrUnavailable
401 }
402
403 t, err := l.storage.Term(i)
404 if err == nil {
405 return t, nil
406 }
407 if err == ErrCompacted || err == ErrUnavailable {
408 return 0, err
409 }
410 panic(err) // TODO(bdarnell)
411}
412
413func (l *raftLog) entries(i uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
414 if i > l.lastIndex() {
415 return nil, nil
416 }
417 return l.slice(i, l.lastIndex()+1, maxSize)
418}
419
420// allEntries returns all entries in the log.
421func (l *raftLog) allEntries() []pb.Entry {
422 ents, err := l.entries(l.firstIndex(), noLimit)
423 if err == nil {
424 return ents
425 }
426 if err == ErrCompacted { // try again if there was a racing compaction
427 return l.allEntries()
428 }
429 // TODO (xiangli): handle error?
430 panic(err)
431}
432
433// isUpToDate determines if a log with the given last entry is more up-to-date
434// by comparing the index and term of the last entries in the existing logs.
435//
436// If the logs have last entries with different terms, then the log with the
437// later term is more up-to-date. If the logs end with the same term, then
438// whichever log has the larger lastIndex is more up-to-date. If the logs are
439// the same, the given log is up-to-date.
440func (l *raftLog) isUpToDate(their entryID) bool {
441 our := l.lastEntryID()
442 return their.term > our.term || their.term == our.term && their.index >= our.index
443}
444
445func (l *raftLog) matchTerm(id entryID) bool {
446 t, err := l.term(id.index)
447 if err != nil {
448 return false
449 }
450 return t == id.term
451}
452
453func (l *raftLog) maybeCommit(at entryID) bool {
454 // NB: term should never be 0 on a commit because the leader campaigned at
455 // least at term 1. But if it is 0 for some reason, we don't consider this a
456 // term match.
457 if at.term != 0 && at.index > l.committed && l.matchTerm(at) {
458 l.commitTo(at.index)
459 return true
460 }
461 return false
462}
463
464func (l *raftLog) restore(s pb.Snapshot) {
465 l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
466 l.committed = s.Metadata.Index
467 l.unstable.restore(s)
468}
469
470// scan visits all log entries in the [lo, hi) range, returning them via the
471// given callback. The callback can be invoked multiple times, with consecutive
472// sub-ranges of the requested range. Returns up to pageSize bytes worth of
473// entries at a time. May return more if a single entry size exceeds the limit.
474//
475// The entries in [lo, hi) must exist, otherwise scan() eventually returns an
476// error (possibly after passing some entries through the callback).
477//
478// If the callback returns an error, scan terminates and returns this error
479// immediately. This can be used to stop the scan early ("break" the loop).
480func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.Entry) error) error {
481 for lo < hi {
482 ents, err := l.slice(lo, hi, pageSize)
483 if err != nil {
484 return err
485 } else if len(ents) == 0 {
486 return fmt.Errorf("got 0 entries in [%d, %d)", lo, hi)
487 }
488 if err := v(ents); err != nil {
489 return err
490 }
491 lo += uint64(len(ents))
492 }
493 return nil
494}
495
496// slice returns a slice of log entries from lo through hi-1, inclusive.
497func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
498 if err := l.mustCheckOutOfBounds(lo, hi); err != nil {
499 return nil, err
500 }
501 if lo == hi {
502 return nil, nil
503 }
504 if lo >= l.unstable.offset {
505 ents := limitSize(l.unstable.slice(lo, hi), maxSize)
506 // NB: use the full slice expression to protect the unstable slice from
507 // appends to the returned ents slice.
508 return ents[:len(ents):len(ents)], nil
509 }
510
511 cut := min(hi, l.unstable.offset)
512 ents, err := l.storage.Entries(lo, cut, uint64(maxSize))
513 if err == ErrCompacted {
514 return nil, err
515 } else if err == ErrUnavailable {
516 l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, cut)
517 } else if err != nil {
518 panic(err) // TODO(pavelkalinnikov): handle errors uniformly
519 }
520 if hi <= l.unstable.offset {
521 return ents, nil
522 }
523
524 // Fast path to check if ents has reached the size limitation. Either the
525 // returned slice is shorter than requested (which means the next entry would
526 // bring it over the limit), or a single entry reaches the limit.
527 if uint64(len(ents)) < cut-lo {
528 return ents, nil
529 }
530 // Slow path computes the actual total size, so that unstable entries are cut
531 // optimally before being copied to ents slice.
532 size := entsSize(ents)
533 if size >= maxSize {
534 return ents, nil
535 }
536
537 unstable := limitSize(l.unstable.slice(l.unstable.offset, hi), maxSize-size)
538 // Total size of unstable may exceed maxSize-size only if len(unstable) == 1.
539 // If this happens, ignore this extra entry.
540 if len(unstable) == 1 && size+entsSize(unstable) > maxSize {
541 return ents, nil
542 }
543 // Otherwise, total size of unstable does not exceed maxSize-size, so total
544 // size of ents+unstable does not exceed maxSize. Simply concatenate them.
545 return extend(ents, unstable), nil
546}
547
548// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
549func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
550 if lo > hi {
551 l.logger.Panicf("invalid slice %d > %d", lo, hi)
552 }
553 fi := l.firstIndex()
554 if lo < fi {
555 return ErrCompacted
556 }
557
558 length := l.lastIndex() + 1 - fi
559 if hi > fi+length {
560 l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
561 }
562 return nil
563}
564
565func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 {
566 if err == nil {
567 return t
568 }
569 if err == ErrCompacted || err == ErrUnavailable {
570 return 0
571 }
572 l.logger.Panicf("unexpected error (%v)", err)
573 return 0
574}