[VOL-5486] Upgrade library versions
Change-Id: I8b4e88699e03f44ee13e467867f45ae3f0a63c4b
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/go.etcd.io/raft/v3/log.go b/vendor/go.etcd.io/raft/v3/log.go
new file mode 100644
index 0000000..bd7c2fe
--- /dev/null
+++ b/vendor/go.etcd.io/raft/v3/log.go
@@ -0,0 +1,574 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+ "fmt"
+
+ pb "go.etcd.io/raft/v3/raftpb"
+)
+
+type raftLog struct {
+ // storage contains all stable entries since the last snapshot.
+ storage Storage
+
+ // unstable contains all unstable entries and snapshot.
+ // they will be saved into storage.
+ unstable unstable
+
+ // committed is the highest log position that is known to be in
+ // stable storage on a quorum of nodes.
+ committed uint64
+ // applying is the highest log position that the application has
+ // been instructed to apply to its state machine. Some of these
+ // entries may be in the process of applying and have not yet
+ // reached applied.
+ // Use: The field is incremented when accepting a Ready struct.
+ // Invariant: applied <= applying && applying <= committed
+ applying uint64
+ // applied is the highest log position that the application has
+ // successfully applied to its state machine.
+ // Use: The field is incremented when advancing after the committed
+ // entries in a Ready struct have been applied (either synchronously
+ // or asynchronously).
+ // Invariant: applied <= committed
+ applied uint64
+
+ logger Logger
+
+ // maxApplyingEntsSize limits the outstanding byte size of the messages
+ // returned from calls to nextCommittedEnts that have not been acknowledged
+ // by a call to appliedTo.
+ maxApplyingEntsSize entryEncodingSize
+ // applyingEntsSize is the current outstanding byte size of the messages
+ // returned from calls to nextCommittedEnts that have not been acknowledged
+ // by a call to appliedTo.
+ applyingEntsSize entryEncodingSize
+ // applyingEntsPaused is true when entry application has been paused until
+ // enough progress is acknowledged.
+ applyingEntsPaused bool
+}
+
+// newLog returns log using the given storage and default options. It
+// recovers the log to the state that it just commits and applies the
+// latest snapshot.
+func newLog(storage Storage, logger Logger) *raftLog {
+ return newLogWithSize(storage, logger, noLimit)
+}
+
+// newLogWithSize returns a log using the given storage and max
+// message size.
+func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEncodingSize) *raftLog {
+ firstIndex, err := storage.FirstIndex()
+ if err != nil {
+ panic(err) // TODO(bdarnell)
+ }
+ lastIndex, err := storage.LastIndex()
+ if err != nil {
+ panic(err) // TODO(bdarnell)
+ }
+ return &raftLog{
+ storage: storage,
+ unstable: unstable{
+ offset: lastIndex + 1,
+ offsetInProgress: lastIndex + 1,
+ logger: logger,
+ },
+ maxApplyingEntsSize: maxApplyingEntsSize,
+
+ // Initialize our committed and applied pointers to the time of the last compaction.
+ committed: firstIndex - 1,
+ applying: firstIndex - 1,
+ applied: firstIndex - 1,
+
+ logger: logger,
+ }
+}
+
+func (l *raftLog) String() string {
+ return fmt.Sprintf("committed=%d, applied=%d, applying=%d, unstable.offset=%d, unstable.offsetInProgress=%d, len(unstable.Entries)=%d",
+ l.committed, l.applied, l.applying, l.unstable.offset, l.unstable.offsetInProgress, len(l.unstable.entries))
+}
+
+// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
+// it returns (last index of new entries, true).
+func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) {
+ if !l.matchTerm(a.prev) {
+ return 0, false
+ }
+ // TODO(pav-kv): propagate logSlice down the stack. It will be used all the
+ // way down in unstable, for safety checks, and for useful bookkeeping.
+
+ lastnewi = a.prev.index + uint64(len(a.entries))
+ ci := l.findConflict(a.entries)
+ switch {
+ case ci == 0:
+ case ci <= l.committed:
+ l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
+ default:
+ offset := a.prev.index + 1
+ if ci-offset > uint64(len(a.entries)) {
+ l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries))
+ }
+ l.append(a.entries[ci-offset:]...)
+ }
+ l.commitTo(min(committed, lastnewi))
+ return lastnewi, true
+}
+
+func (l *raftLog) append(ents ...pb.Entry) uint64 {
+ if len(ents) == 0 {
+ return l.lastIndex()
+ }
+ if after := ents[0].Index - 1; after < l.committed {
+ l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
+ }
+ l.unstable.truncateAndAppend(ents)
+ return l.lastIndex()
+}
+
+// findConflict finds the index of the conflict.
+// It returns the first pair of conflicting entries between the existing
+// entries and the given entries, if there are any.
+// If there is no conflicting entries, and the existing entries contains
+// all the given entries, zero will be returned.
+// If there is no conflicting entries, but the given entries contains new
+// entries, the index of the first new entry will be returned.
+// An entry is considered to be conflicting if it has the same index but
+// a different term.
+// The index of the given entries MUST be continuously increasing.
+func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
+ for i := range ents {
+ if id := pbEntryID(&ents[i]); !l.matchTerm(id) {
+ if id.index <= l.lastIndex() {
+ // TODO(pav-kv): can simply print %+v of the id. This will change the
+ // log format though.
+ l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
+ id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
+ }
+ return id.index
+ }
+ }
+ return 0
+}
+
+// findConflictByTerm returns a best guess on where this log ends matching
+// another log, given that the only information known about the other log is the
+// (index, term) of its single entry.
+//
+// Specifically, the first returned value is the max guessIndex <= index, such
+// that term(guessIndex) <= term or term(guessIndex) is not known (because this
+// index is compacted or not yet stored).
+//
+// The second returned value is the term(guessIndex), or 0 if it is unknown.
+//
+// This function is used by a follower and leader to resolve log conflicts after
+// an unsuccessful append to a follower, and ultimately restore the steady flow
+// of appends.
+func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64) {
+ for ; index > 0; index-- {
+ // If there is an error (likely ErrCompacted or ErrUnavailable), we don't
+ // know whether it's a match or not, so assume a possible match and return
+ // the index, with 0 term indicating an unknown term.
+ if ourTerm, err := l.term(index); err != nil {
+ return index, 0
+ } else if ourTerm <= term {
+ return index, ourTerm
+ }
+ }
+ return 0, 0
+}
+
+// nextUnstableEnts returns all entries that are available to be written to the
+// local stable log and are not already in-progress.
+func (l *raftLog) nextUnstableEnts() []pb.Entry {
+ return l.unstable.nextEntries()
+}
+
+// hasNextUnstableEnts returns if there are any entries that are available to be
+// written to the local stable log and are not already in-progress.
+func (l *raftLog) hasNextUnstableEnts() bool {
+ return len(l.nextUnstableEnts()) > 0
+}
+
+// hasNextOrInProgressUnstableEnts returns if there are any entries that are
+// available to be written to the local stable log or in the process of being
+// written to the local stable log.
+func (l *raftLog) hasNextOrInProgressUnstableEnts() bool {
+ return len(l.unstable.entries) > 0
+}
+
+// nextCommittedEnts returns all the available entries for execution.
+// Entries can be committed even when the local raft instance has not durably
+// appended them to the local raft log yet. If allowUnstable is true, committed
+// entries from the unstable log may be returned; otherwise, only entries known
+// to reside locally on stable storage will be returned.
+func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
+ if l.applyingEntsPaused {
+ // Entry application outstanding size limit reached.
+ return nil
+ }
+ if l.hasNextOrInProgressSnapshot() {
+ // See comment in hasNextCommittedEnts.
+ return nil
+ }
+ lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi)
+ if lo >= hi {
+ // Nothing to apply.
+ return nil
+ }
+ maxSize := l.maxApplyingEntsSize - l.applyingEntsSize
+ if maxSize <= 0 {
+ l.logger.Panicf("applying entry size (%d-%d)=%d not positive",
+ l.maxApplyingEntsSize, l.applyingEntsSize, maxSize)
+ }
+ ents, err := l.slice(lo, hi, maxSize)
+ if err != nil {
+ l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
+ }
+ return ents
+}
+
+// hasNextCommittedEnts returns if there is any available entries for execution.
+// This is a fast check without heavy raftLog.slice() in nextCommittedEnts().
+func (l *raftLog) hasNextCommittedEnts(allowUnstable bool) bool {
+ if l.applyingEntsPaused {
+ // Entry application outstanding size limit reached.
+ return false
+ }
+ if l.hasNextOrInProgressSnapshot() {
+ // If we have a snapshot to apply, don't also return any committed
+ // entries. Doing so raises questions about what should be applied
+ // first.
+ return false
+ }
+ lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi)
+ return lo < hi
+}
+
+// maxAppliableIndex returns the maximum committed index that can be applied.
+// If allowUnstable is true, committed entries from the unstable log can be
+// applied; otherwise, only entries known to reside locally on stable storage
+// can be applied.
+func (l *raftLog) maxAppliableIndex(allowUnstable bool) uint64 {
+ hi := l.committed
+ if !allowUnstable {
+ hi = min(hi, l.unstable.offset-1)
+ }
+ return hi
+}
+
+// nextUnstableSnapshot returns the snapshot, if present, that is available to
+// be applied to the local storage and is not already in-progress.
+func (l *raftLog) nextUnstableSnapshot() *pb.Snapshot {
+ return l.unstable.nextSnapshot()
+}
+
+// hasNextUnstableSnapshot returns if there is a snapshot that is available to
+// be applied to the local storage and is not already in-progress.
+func (l *raftLog) hasNextUnstableSnapshot() bool {
+ return l.unstable.nextSnapshot() != nil
+}
+
+// hasNextOrInProgressSnapshot returns if there is pending snapshot waiting for
+// applying or in the process of being applied.
+func (l *raftLog) hasNextOrInProgressSnapshot() bool {
+ return l.unstable.snapshot != nil
+}
+
+func (l *raftLog) snapshot() (pb.Snapshot, error) {
+ if l.unstable.snapshot != nil {
+ return *l.unstable.snapshot, nil
+ }
+ return l.storage.Snapshot()
+}
+
+func (l *raftLog) firstIndex() uint64 {
+ if i, ok := l.unstable.maybeFirstIndex(); ok {
+ return i
+ }
+ index, err := l.storage.FirstIndex()
+ if err != nil {
+ panic(err) // TODO(bdarnell)
+ }
+ return index
+}
+
+func (l *raftLog) lastIndex() uint64 {
+ if i, ok := l.unstable.maybeLastIndex(); ok {
+ return i
+ }
+ i, err := l.storage.LastIndex()
+ if err != nil {
+ panic(err) // TODO(bdarnell)
+ }
+ return i
+}
+
+func (l *raftLog) commitTo(tocommit uint64) {
+ // never decrease commit
+ if l.committed < tocommit {
+ if l.lastIndex() < tocommit {
+ l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
+ }
+ l.committed = tocommit
+ }
+}
+
+func (l *raftLog) appliedTo(i uint64, size entryEncodingSize) {
+ if l.committed < i || i < l.applied {
+ l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
+ }
+ l.applied = i
+ l.applying = max(l.applying, i)
+ if l.applyingEntsSize > size {
+ l.applyingEntsSize -= size
+ } else {
+ // Defense against underflow.
+ l.applyingEntsSize = 0
+ }
+ l.applyingEntsPaused = l.applyingEntsSize >= l.maxApplyingEntsSize
+}
+
+func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable bool) {
+ if l.committed < i {
+ l.logger.Panicf("applying(%d) is out of range [prevApplying(%d), committed(%d)]", i, l.applying, l.committed)
+ }
+ l.applying = i
+ l.applyingEntsSize += size
+ // Determine whether to pause entry application until some progress is
+ // acknowledged. We pause in two cases:
+ // 1. the outstanding entry size equals or exceeds the maximum size.
+ // 2. the outstanding entry size does not equal or exceed the maximum size,
+ // but we determine that the next entry in the log will push us over the
+ // limit. We determine this by comparing the last entry returned from
+ // raftLog.nextCommittedEnts to the maximum entry that the method was
+ // allowed to return had there been no size limit. If these indexes are
+ // not equal, then the returned entries slice must have been truncated to
+ // adhere to the memory limit.
+ l.applyingEntsPaused = l.applyingEntsSize >= l.maxApplyingEntsSize ||
+ i < l.maxAppliableIndex(allowUnstable)
+}
+
+func (l *raftLog) stableTo(id entryID) { l.unstable.stableTo(id) }
+
+func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
+
+// acceptUnstable indicates that the application has started persisting the
+// unstable entries in storage, and that the current unstable entries are thus
+// to be marked as being in-progress, to avoid returning them with future calls
+// to Ready().
+func (l *raftLog) acceptUnstable() { l.unstable.acceptInProgress() }
+
+// lastEntryID returns the ID of the last entry in the log.
+func (l *raftLog) lastEntryID() entryID {
+ index := l.lastIndex()
+ t, err := l.term(index)
+ if err != nil {
+ l.logger.Panicf("unexpected error when getting the last term at %d: %v", index, err)
+ }
+ return entryID{term: t, index: index}
+}
+
+func (l *raftLog) term(i uint64) (uint64, error) {
+ // Check the unstable log first, even before computing the valid term range,
+ // which may need to access stable Storage. If we find the entry's term in
+ // the unstable log, we know it was in the valid range.
+ if t, ok := l.unstable.maybeTerm(i); ok {
+ return t, nil
+ }
+
+ // The valid term range is [firstIndex-1, lastIndex]. Even though the entry at
+ // firstIndex-1 is compacted away, its term is available for matching purposes
+ // when doing log appends.
+ if i+1 < l.firstIndex() {
+ return 0, ErrCompacted
+ }
+ if i > l.lastIndex() {
+ return 0, ErrUnavailable
+ }
+
+ t, err := l.storage.Term(i)
+ if err == nil {
+ return t, nil
+ }
+ if err == ErrCompacted || err == ErrUnavailable {
+ return 0, err
+ }
+ panic(err) // TODO(bdarnell)
+}
+
+func (l *raftLog) entries(i uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
+ if i > l.lastIndex() {
+ return nil, nil
+ }
+ return l.slice(i, l.lastIndex()+1, maxSize)
+}
+
+// allEntries returns all entries in the log.
+func (l *raftLog) allEntries() []pb.Entry {
+ ents, err := l.entries(l.firstIndex(), noLimit)
+ if err == nil {
+ return ents
+ }
+ if err == ErrCompacted { // try again if there was a racing compaction
+ return l.allEntries()
+ }
+ // TODO (xiangli): handle error?
+ panic(err)
+}
+
+// isUpToDate determines if a log with the given last entry is more up-to-date
+// by comparing the index and term of the last entries in the existing logs.
+//
+// If the logs have last entries with different terms, then the log with the
+// later term is more up-to-date. If the logs end with the same term, then
+// whichever log has the larger lastIndex is more up-to-date. If the logs are
+// the same, the given log is up-to-date.
+func (l *raftLog) isUpToDate(their entryID) bool {
+ our := l.lastEntryID()
+ return their.term > our.term || their.term == our.term && their.index >= our.index
+}
+
+func (l *raftLog) matchTerm(id entryID) bool {
+ t, err := l.term(id.index)
+ if err != nil {
+ return false
+ }
+ return t == id.term
+}
+
+func (l *raftLog) maybeCommit(at entryID) bool {
+ // NB: term should never be 0 on a commit because the leader campaigned at
+ // least at term 1. But if it is 0 for some reason, we don't consider this a
+ // term match.
+ if at.term != 0 && at.index > l.committed && l.matchTerm(at) {
+ l.commitTo(at.index)
+ return true
+ }
+ return false
+}
+
+func (l *raftLog) restore(s pb.Snapshot) {
+ l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
+ l.committed = s.Metadata.Index
+ l.unstable.restore(s)
+}
+
+// scan visits all log entries in the [lo, hi) range, returning them via the
+// given callback. The callback can be invoked multiple times, with consecutive
+// sub-ranges of the requested range. Returns up to pageSize bytes worth of
+// entries at a time. May return more if a single entry size exceeds the limit.
+//
+// The entries in [lo, hi) must exist, otherwise scan() eventually returns an
+// error (possibly after passing some entries through the callback).
+//
+// If the callback returns an error, scan terminates and returns this error
+// immediately. This can be used to stop the scan early ("break" the loop).
+func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.Entry) error) error {
+ for lo < hi {
+ ents, err := l.slice(lo, hi, pageSize)
+ if err != nil {
+ return err
+ } else if len(ents) == 0 {
+ return fmt.Errorf("got 0 entries in [%d, %d)", lo, hi)
+ }
+ if err := v(ents); err != nil {
+ return err
+ }
+ lo += uint64(len(ents))
+ }
+ return nil
+}
+
+// slice returns a slice of log entries from lo through hi-1, inclusive.
+func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
+ if err := l.mustCheckOutOfBounds(lo, hi); err != nil {
+ return nil, err
+ }
+ if lo == hi {
+ return nil, nil
+ }
+ if lo >= l.unstable.offset {
+ ents := limitSize(l.unstable.slice(lo, hi), maxSize)
+ // NB: use the full slice expression to protect the unstable slice from
+ // appends to the returned ents slice.
+ return ents[:len(ents):len(ents)], nil
+ }
+
+ cut := min(hi, l.unstable.offset)
+ ents, err := l.storage.Entries(lo, cut, uint64(maxSize))
+ if err == ErrCompacted {
+ return nil, err
+ } else if err == ErrUnavailable {
+ l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, cut)
+ } else if err != nil {
+ panic(err) // TODO(pavelkalinnikov): handle errors uniformly
+ }
+ if hi <= l.unstable.offset {
+ return ents, nil
+ }
+
+ // Fast path to check if ents has reached the size limitation. Either the
+ // returned slice is shorter than requested (which means the next entry would
+ // bring it over the limit), or a single entry reaches the limit.
+ if uint64(len(ents)) < cut-lo {
+ return ents, nil
+ }
+ // Slow path computes the actual total size, so that unstable entries are cut
+ // optimally before being copied to ents slice.
+ size := entsSize(ents)
+ if size >= maxSize {
+ return ents, nil
+ }
+
+ unstable := limitSize(l.unstable.slice(l.unstable.offset, hi), maxSize-size)
+ // Total size of unstable may exceed maxSize-size only if len(unstable) == 1.
+ // If this happens, ignore this extra entry.
+ if len(unstable) == 1 && size+entsSize(unstable) > maxSize {
+ return ents, nil
+ }
+ // Otherwise, total size of unstable does not exceed maxSize-size, so total
+ // size of ents+unstable does not exceed maxSize. Simply concatenate them.
+ return extend(ents, unstable), nil
+}
+
+// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
+func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
+ if lo > hi {
+ l.logger.Panicf("invalid slice %d > %d", lo, hi)
+ }
+ fi := l.firstIndex()
+ if lo < fi {
+ return ErrCompacted
+ }
+
+ length := l.lastIndex() + 1 - fi
+ if hi > fi+length {
+ l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
+ }
+ return nil
+}
+
+func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 {
+ if err == nil {
+ return t
+ }
+ if err == ErrCompacted || err == ErrUnavailable {
+ return 0
+ }
+ l.logger.Panicf("unexpected error (%v)", err)
+ return 0
+}