blob: bd7c2febb61c56f7af2d703635cdbc55e30e2c25 [file] [log] [blame]
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raft
import (
"fmt"
pb "go.etcd.io/raft/v3/raftpb"
)
type raftLog struct {
// storage contains all stable entries since the last snapshot.
storage Storage
// unstable contains all unstable entries and snapshot.
// they will be saved into storage.
unstable unstable
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
// applying is the highest log position that the application has
// been instructed to apply to its state machine. Some of these
// entries may be in the process of applying and have not yet
// reached applied.
// Use: The field is incremented when accepting a Ready struct.
// Invariant: applied <= applying && applying <= committed
applying uint64
// applied is the highest log position that the application has
// successfully applied to its state machine.
// Use: The field is incremented when advancing after the committed
// entries in a Ready struct have been applied (either synchronously
// or asynchronously).
// Invariant: applied <= committed
applied uint64
logger Logger
// maxApplyingEntsSize limits the outstanding byte size of the messages
// returned from calls to nextCommittedEnts that have not been acknowledged
// by a call to appliedTo.
maxApplyingEntsSize entryEncodingSize
// applyingEntsSize is the current outstanding byte size of the messages
// returned from calls to nextCommittedEnts that have not been acknowledged
// by a call to appliedTo.
applyingEntsSize entryEncodingSize
// applyingEntsPaused is true when entry application has been paused until
// enough progress is acknowledged.
applyingEntsPaused bool
}
// newLog returns log using the given storage and default options. It
// recovers the log to the state that it just commits and applies the
// latest snapshot.
func newLog(storage Storage, logger Logger) *raftLog {
return newLogWithSize(storage, logger, noLimit)
}
// newLogWithSize returns a log using the given storage and max
// message size.
func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEncodingSize) *raftLog {
firstIndex, err := storage.FirstIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
lastIndex, err := storage.LastIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
return &raftLog{
storage: storage,
unstable: unstable{
offset: lastIndex + 1,
offsetInProgress: lastIndex + 1,
logger: logger,
},
maxApplyingEntsSize: maxApplyingEntsSize,
// Initialize our committed and applied pointers to the time of the last compaction.
committed: firstIndex - 1,
applying: firstIndex - 1,
applied: firstIndex - 1,
logger: logger,
}
}
func (l *raftLog) String() string {
return fmt.Sprintf("committed=%d, applied=%d, applying=%d, unstable.offset=%d, unstable.offsetInProgress=%d, len(unstable.Entries)=%d",
l.committed, l.applied, l.applying, l.unstable.offset, l.unstable.offsetInProgress, len(l.unstable.entries))
}
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) {
if !l.matchTerm(a.prev) {
return 0, false
}
// TODO(pav-kv): propagate logSlice down the stack. It will be used all the
// way down in unstable, for safety checks, and for useful bookkeeping.
lastnewi = a.prev.index + uint64(len(a.entries))
ci := l.findConflict(a.entries)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := a.prev.index + 1
if ci-offset > uint64(len(a.entries)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries))
}
l.append(a.entries[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
func (l *raftLog) append(ents ...pb.Entry) uint64 {
if len(ents) == 0 {
return l.lastIndex()
}
if after := ents[0].Index - 1; after < l.committed {
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
}
l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}
// findConflict finds the index of the conflict.
// It returns the first pair of conflicting entries between the existing
// entries and the given entries, if there are any.
// If there is no conflicting entries, and the existing entries contains
// all the given entries, zero will be returned.
// If there is no conflicting entries, but the given entries contains new
// entries, the index of the first new entry will be returned.
// An entry is considered to be conflicting if it has the same index but
// a different term.
// The index of the given entries MUST be continuously increasing.
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
for i := range ents {
if id := pbEntryID(&ents[i]); !l.matchTerm(id) {
if id.index <= l.lastIndex() {
// TODO(pav-kv): can simply print %+v of the id. This will change the
// log format though.
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
}
return id.index
}
}
return 0
}
// findConflictByTerm returns a best guess on where this log ends matching
// another log, given that the only information known about the other log is the
// (index, term) of its single entry.
//
// Specifically, the first returned value is the max guessIndex <= index, such
// that term(guessIndex) <= term or term(guessIndex) is not known (because this
// index is compacted or not yet stored).
//
// The second returned value is the term(guessIndex), or 0 if it is unknown.
//
// This function is used by a follower and leader to resolve log conflicts after
// an unsuccessful append to a follower, and ultimately restore the steady flow
// of appends.
func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64) {
for ; index > 0; index-- {
// If there is an error (likely ErrCompacted or ErrUnavailable), we don't
// know whether it's a match or not, so assume a possible match and return
// the index, with 0 term indicating an unknown term.
if ourTerm, err := l.term(index); err != nil {
return index, 0
} else if ourTerm <= term {
return index, ourTerm
}
}
return 0, 0
}
// nextUnstableEnts returns all entries that are available to be written to the
// local stable log and are not already in-progress.
func (l *raftLog) nextUnstableEnts() []pb.Entry {
return l.unstable.nextEntries()
}
// hasNextUnstableEnts returns if there are any entries that are available to be
// written to the local stable log and are not already in-progress.
func (l *raftLog) hasNextUnstableEnts() bool {
return len(l.nextUnstableEnts()) > 0
}
// hasNextOrInProgressUnstableEnts returns if there are any entries that are
// available to be written to the local stable log or in the process of being
// written to the local stable log.
func (l *raftLog) hasNextOrInProgressUnstableEnts() bool {
return len(l.unstable.entries) > 0
}
// nextCommittedEnts returns all the available entries for execution.
// Entries can be committed even when the local raft instance has not durably
// appended them to the local raft log yet. If allowUnstable is true, committed
// entries from the unstable log may be returned; otherwise, only entries known
// to reside locally on stable storage will be returned.
func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
if l.applyingEntsPaused {
// Entry application outstanding size limit reached.
return nil
}
if l.hasNextOrInProgressSnapshot() {
// See comment in hasNextCommittedEnts.
return nil
}
lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi)
if lo >= hi {
// Nothing to apply.
return nil
}
maxSize := l.maxApplyingEntsSize - l.applyingEntsSize
if maxSize <= 0 {
l.logger.Panicf("applying entry size (%d-%d)=%d not positive",
l.maxApplyingEntsSize, l.applyingEntsSize, maxSize)
}
ents, err := l.slice(lo, hi, maxSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
return ents
}
// hasNextCommittedEnts returns if there is any available entries for execution.
// This is a fast check without heavy raftLog.slice() in nextCommittedEnts().
func (l *raftLog) hasNextCommittedEnts(allowUnstable bool) bool {
if l.applyingEntsPaused {
// Entry application outstanding size limit reached.
return false
}
if l.hasNextOrInProgressSnapshot() {
// If we have a snapshot to apply, don't also return any committed
// entries. Doing so raises questions about what should be applied
// first.
return false
}
lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi)
return lo < hi
}
// maxAppliableIndex returns the maximum committed index that can be applied.
// If allowUnstable is true, committed entries from the unstable log can be
// applied; otherwise, only entries known to reside locally on stable storage
// can be applied.
func (l *raftLog) maxAppliableIndex(allowUnstable bool) uint64 {
hi := l.committed
if !allowUnstable {
hi = min(hi, l.unstable.offset-1)
}
return hi
}
// nextUnstableSnapshot returns the snapshot, if present, that is available to
// be applied to the local storage and is not already in-progress.
func (l *raftLog) nextUnstableSnapshot() *pb.Snapshot {
return l.unstable.nextSnapshot()
}
// hasNextUnstableSnapshot returns if there is a snapshot that is available to
// be applied to the local storage and is not already in-progress.
func (l *raftLog) hasNextUnstableSnapshot() bool {
return l.unstable.nextSnapshot() != nil
}
// hasNextOrInProgressSnapshot returns if there is pending snapshot waiting for
// applying or in the process of being applied.
func (l *raftLog) hasNextOrInProgressSnapshot() bool {
return l.unstable.snapshot != nil
}
func (l *raftLog) snapshot() (pb.Snapshot, error) {
if l.unstable.snapshot != nil {
return *l.unstable.snapshot, nil
}
return l.storage.Snapshot()
}
func (l *raftLog) firstIndex() uint64 {
if i, ok := l.unstable.maybeFirstIndex(); ok {
return i
}
index, err := l.storage.FirstIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
return index
}
func (l *raftLog) lastIndex() uint64 {
if i, ok := l.unstable.maybeLastIndex(); ok {
return i
}
i, err := l.storage.LastIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
return i
}
func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {
if l.lastIndex() < tocommit {
l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
}
l.committed = tocommit
}
}
func (l *raftLog) appliedTo(i uint64, size entryEncodingSize) {
if l.committed < i || i < l.applied {
l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
}
l.applied = i
l.applying = max(l.applying, i)
if l.applyingEntsSize > size {
l.applyingEntsSize -= size
} else {
// Defense against underflow.
l.applyingEntsSize = 0
}
l.applyingEntsPaused = l.applyingEntsSize >= l.maxApplyingEntsSize
}
func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable bool) {
if l.committed < i {
l.logger.Panicf("applying(%d) is out of range [prevApplying(%d), committed(%d)]", i, l.applying, l.committed)
}
l.applying = i
l.applyingEntsSize += size
// Determine whether to pause entry application until some progress is
// acknowledged. We pause in two cases:
// 1. the outstanding entry size equals or exceeds the maximum size.
// 2. the outstanding entry size does not equal or exceed the maximum size,
// but we determine that the next entry in the log will push us over the
// limit. We determine this by comparing the last entry returned from
// raftLog.nextCommittedEnts to the maximum entry that the method was
// allowed to return had there been no size limit. If these indexes are
// not equal, then the returned entries slice must have been truncated to
// adhere to the memory limit.
l.applyingEntsPaused = l.applyingEntsSize >= l.maxApplyingEntsSize ||
i < l.maxAppliableIndex(allowUnstable)
}
func (l *raftLog) stableTo(id entryID) { l.unstable.stableTo(id) }
func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
// acceptUnstable indicates that the application has started persisting the
// unstable entries in storage, and that the current unstable entries are thus
// to be marked as being in-progress, to avoid returning them with future calls
// to Ready().
func (l *raftLog) acceptUnstable() { l.unstable.acceptInProgress() }
// lastEntryID returns the ID of the last entry in the log.
func (l *raftLog) lastEntryID() entryID {
index := l.lastIndex()
t, err := l.term(index)
if err != nil {
l.logger.Panicf("unexpected error when getting the last term at %d: %v", index, err)
}
return entryID{term: t, index: index}
}
func (l *raftLog) term(i uint64) (uint64, error) {
// Check the unstable log first, even before computing the valid term range,
// which may need to access stable Storage. If we find the entry's term in
// the unstable log, we know it was in the valid range.
if t, ok := l.unstable.maybeTerm(i); ok {
return t, nil
}
// The valid term range is [firstIndex-1, lastIndex]. Even though the entry at
// firstIndex-1 is compacted away, its term is available for matching purposes
// when doing log appends.
if i+1 < l.firstIndex() {
return 0, ErrCompacted
}
if i > l.lastIndex() {
return 0, ErrUnavailable
}
t, err := l.storage.Term(i)
if err == nil {
return t, nil
}
if err == ErrCompacted || err == ErrUnavailable {
return 0, err
}
panic(err) // TODO(bdarnell)
}
func (l *raftLog) entries(i uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
if i > l.lastIndex() {
return nil, nil
}
return l.slice(i, l.lastIndex()+1, maxSize)
}
// allEntries returns all entries in the log.
func (l *raftLog) allEntries() []pb.Entry {
ents, err := l.entries(l.firstIndex(), noLimit)
if err == nil {
return ents
}
if err == ErrCompacted { // try again if there was a racing compaction
return l.allEntries()
}
// TODO (xiangli): handle error?
panic(err)
}
// isUpToDate determines if a log with the given last entry is more up-to-date
// by comparing the index and term of the last entries in the existing logs.
//
// If the logs have last entries with different terms, then the log with the
// later term is more up-to-date. If the logs end with the same term, then
// whichever log has the larger lastIndex is more up-to-date. If the logs are
// the same, the given log is up-to-date.
func (l *raftLog) isUpToDate(their entryID) bool {
our := l.lastEntryID()
return their.term > our.term || their.term == our.term && their.index >= our.index
}
func (l *raftLog) matchTerm(id entryID) bool {
t, err := l.term(id.index)
if err != nil {
return false
}
return t == id.term
}
func (l *raftLog) maybeCommit(at entryID) bool {
// NB: term should never be 0 on a commit because the leader campaigned at
// least at term 1. But if it is 0 for some reason, we don't consider this a
// term match.
if at.term != 0 && at.index > l.committed && l.matchTerm(at) {
l.commitTo(at.index)
return true
}
return false
}
func (l *raftLog) restore(s pb.Snapshot) {
l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
l.committed = s.Metadata.Index
l.unstable.restore(s)
}
// scan visits all log entries in the [lo, hi) range, returning them via the
// given callback. The callback can be invoked multiple times, with consecutive
// sub-ranges of the requested range. Returns up to pageSize bytes worth of
// entries at a time. May return more if a single entry size exceeds the limit.
//
// The entries in [lo, hi) must exist, otherwise scan() eventually returns an
// error (possibly after passing some entries through the callback).
//
// If the callback returns an error, scan terminates and returns this error
// immediately. This can be used to stop the scan early ("break" the loop).
func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.Entry) error) error {
for lo < hi {
ents, err := l.slice(lo, hi, pageSize)
if err != nil {
return err
} else if len(ents) == 0 {
return fmt.Errorf("got 0 entries in [%d, %d)", lo, hi)
}
if err := v(ents); err != nil {
return err
}
lo += uint64(len(ents))
}
return nil
}
// slice returns a slice of log entries from lo through hi-1, inclusive.
func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
if err := l.mustCheckOutOfBounds(lo, hi); err != nil {
return nil, err
}
if lo == hi {
return nil, nil
}
if lo >= l.unstable.offset {
ents := limitSize(l.unstable.slice(lo, hi), maxSize)
// NB: use the full slice expression to protect the unstable slice from
// appends to the returned ents slice.
return ents[:len(ents):len(ents)], nil
}
cut := min(hi, l.unstable.offset)
ents, err := l.storage.Entries(lo, cut, uint64(maxSize))
if err == ErrCompacted {
return nil, err
} else if err == ErrUnavailable {
l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, cut)
} else if err != nil {
panic(err) // TODO(pavelkalinnikov): handle errors uniformly
}
if hi <= l.unstable.offset {
return ents, nil
}
// Fast path to check if ents has reached the size limitation. Either the
// returned slice is shorter than requested (which means the next entry would
// bring it over the limit), or a single entry reaches the limit.
if uint64(len(ents)) < cut-lo {
return ents, nil
}
// Slow path computes the actual total size, so that unstable entries are cut
// optimally before being copied to ents slice.
size := entsSize(ents)
if size >= maxSize {
return ents, nil
}
unstable := limitSize(l.unstable.slice(l.unstable.offset, hi), maxSize-size)
// Total size of unstable may exceed maxSize-size only if len(unstable) == 1.
// If this happens, ignore this extra entry.
if len(unstable) == 1 && size+entsSize(unstable) > maxSize {
return ents, nil
}
// Otherwise, total size of unstable does not exceed maxSize-size, so total
// size of ents+unstable does not exceed maxSize. Simply concatenate them.
return extend(ents, unstable), nil
}
// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
if lo > hi {
l.logger.Panicf("invalid slice %d > %d", lo, hi)
}
fi := l.firstIndex()
if lo < fi {
return ErrCompacted
}
length := l.lastIndex() + 1 - fi
if hi > fi+length {
l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
}
return nil
}
func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 {
if err == nil {
return t
}
if err == ErrCompacted || err == ErrUnavailable {
return 0
}
l.logger.Panicf("unexpected error (%v)", err)
return 0
}