| Abhay Kumar | 40252eb | 2025-10-13 13:25:53 +0000 | [diff] [blame^] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package raft |
| 16 | |
| 17 | import ( |
| 18 | "fmt" |
| 19 | |
| 20 | pb "go.etcd.io/raft/v3/raftpb" |
| 21 | ) |
| 22 | |
| 23 | type raftLog struct { |
| 24 | // storage contains all stable entries since the last snapshot. |
| 25 | storage Storage |
| 26 | |
| 27 | // unstable contains all unstable entries and snapshot. |
| 28 | // they will be saved into storage. |
| 29 | unstable unstable |
| 30 | |
| 31 | // committed is the highest log position that is known to be in |
| 32 | // stable storage on a quorum of nodes. |
| 33 | committed uint64 |
| 34 | // applying is the highest log position that the application has |
| 35 | // been instructed to apply to its state machine. Some of these |
| 36 | // entries may be in the process of applying and have not yet |
| 37 | // reached applied. |
| 38 | // Use: The field is incremented when accepting a Ready struct. |
| 39 | // Invariant: applied <= applying && applying <= committed |
| 40 | applying uint64 |
| 41 | // applied is the highest log position that the application has |
| 42 | // successfully applied to its state machine. |
| 43 | // Use: The field is incremented when advancing after the committed |
| 44 | // entries in a Ready struct have been applied (either synchronously |
| 45 | // or asynchronously). |
| 46 | // Invariant: applied <= committed |
| 47 | applied uint64 |
| 48 | |
| 49 | logger Logger |
| 50 | |
| 51 | // maxApplyingEntsSize limits the outstanding byte size of the messages |
| 52 | // returned from calls to nextCommittedEnts that have not been acknowledged |
| 53 | // by a call to appliedTo. |
| 54 | maxApplyingEntsSize entryEncodingSize |
| 55 | // applyingEntsSize is the current outstanding byte size of the messages |
| 56 | // returned from calls to nextCommittedEnts that have not been acknowledged |
| 57 | // by a call to appliedTo. |
| 58 | applyingEntsSize entryEncodingSize |
| 59 | // applyingEntsPaused is true when entry application has been paused until |
| 60 | // enough progress is acknowledged. |
| 61 | applyingEntsPaused bool |
| 62 | } |
| 63 | |
| 64 | // newLog returns log using the given storage and default options. It |
| 65 | // recovers the log to the state that it just commits and applies the |
| 66 | // latest snapshot. |
| 67 | func newLog(storage Storage, logger Logger) *raftLog { |
| 68 | return newLogWithSize(storage, logger, noLimit) |
| 69 | } |
| 70 | |
| 71 | // newLogWithSize returns a log using the given storage and max |
| 72 | // message size. |
| 73 | func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEncodingSize) *raftLog { |
| 74 | firstIndex, err := storage.FirstIndex() |
| 75 | if err != nil { |
| 76 | panic(err) // TODO(bdarnell) |
| 77 | } |
| 78 | lastIndex, err := storage.LastIndex() |
| 79 | if err != nil { |
| 80 | panic(err) // TODO(bdarnell) |
| 81 | } |
| 82 | return &raftLog{ |
| 83 | storage: storage, |
| 84 | unstable: unstable{ |
| 85 | offset: lastIndex + 1, |
| 86 | offsetInProgress: lastIndex + 1, |
| 87 | logger: logger, |
| 88 | }, |
| 89 | maxApplyingEntsSize: maxApplyingEntsSize, |
| 90 | |
| 91 | // Initialize our committed and applied pointers to the time of the last compaction. |
| 92 | committed: firstIndex - 1, |
| 93 | applying: firstIndex - 1, |
| 94 | applied: firstIndex - 1, |
| 95 | |
| 96 | logger: logger, |
| 97 | } |
| 98 | } |
| 99 | |
| 100 | func (l *raftLog) String() string { |
| 101 | return fmt.Sprintf("committed=%d, applied=%d, applying=%d, unstable.offset=%d, unstable.offsetInProgress=%d, len(unstable.Entries)=%d", |
| 102 | l.committed, l.applied, l.applying, l.unstable.offset, l.unstable.offsetInProgress, len(l.unstable.entries)) |
| 103 | } |
| 104 | |
| 105 | // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, |
| 106 | // it returns (last index of new entries, true). |
| 107 | func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) { |
| 108 | if !l.matchTerm(a.prev) { |
| 109 | return 0, false |
| 110 | } |
| 111 | // TODO(pav-kv): propagate logSlice down the stack. It will be used all the |
| 112 | // way down in unstable, for safety checks, and for useful bookkeeping. |
| 113 | |
| 114 | lastnewi = a.prev.index + uint64(len(a.entries)) |
| 115 | ci := l.findConflict(a.entries) |
| 116 | switch { |
| 117 | case ci == 0: |
| 118 | case ci <= l.committed: |
| 119 | l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) |
| 120 | default: |
| 121 | offset := a.prev.index + 1 |
| 122 | if ci-offset > uint64(len(a.entries)) { |
| 123 | l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries)) |
| 124 | } |
| 125 | l.append(a.entries[ci-offset:]...) |
| 126 | } |
| 127 | l.commitTo(min(committed, lastnewi)) |
| 128 | return lastnewi, true |
| 129 | } |
| 130 | |
| 131 | func (l *raftLog) append(ents ...pb.Entry) uint64 { |
| 132 | if len(ents) == 0 { |
| 133 | return l.lastIndex() |
| 134 | } |
| 135 | if after := ents[0].Index - 1; after < l.committed { |
| 136 | l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) |
| 137 | } |
| 138 | l.unstable.truncateAndAppend(ents) |
| 139 | return l.lastIndex() |
| 140 | } |
| 141 | |
| 142 | // findConflict finds the index of the conflict. |
| 143 | // It returns the first pair of conflicting entries between the existing |
| 144 | // entries and the given entries, if there are any. |
| 145 | // If there is no conflicting entries, and the existing entries contains |
| 146 | // all the given entries, zero will be returned. |
| 147 | // If there is no conflicting entries, but the given entries contains new |
| 148 | // entries, the index of the first new entry will be returned. |
| 149 | // An entry is considered to be conflicting if it has the same index but |
| 150 | // a different term. |
| 151 | // The index of the given entries MUST be continuously increasing. |
| 152 | func (l *raftLog) findConflict(ents []pb.Entry) uint64 { |
| 153 | for i := range ents { |
| 154 | if id := pbEntryID(&ents[i]); !l.matchTerm(id) { |
| 155 | if id.index <= l.lastIndex() { |
| 156 | // TODO(pav-kv): can simply print %+v of the id. This will change the |
| 157 | // log format though. |
| 158 | l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", |
| 159 | id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term) |
| 160 | } |
| 161 | return id.index |
| 162 | } |
| 163 | } |
| 164 | return 0 |
| 165 | } |
| 166 | |
| 167 | // findConflictByTerm returns a best guess on where this log ends matching |
| 168 | // another log, given that the only information known about the other log is the |
| 169 | // (index, term) of its single entry. |
| 170 | // |
| 171 | // Specifically, the first returned value is the max guessIndex <= index, such |
| 172 | // that term(guessIndex) <= term or term(guessIndex) is not known (because this |
| 173 | // index is compacted or not yet stored). |
| 174 | // |
| 175 | // The second returned value is the term(guessIndex), or 0 if it is unknown. |
| 176 | // |
| 177 | // This function is used by a follower and leader to resolve log conflicts after |
| 178 | // an unsuccessful append to a follower, and ultimately restore the steady flow |
| 179 | // of appends. |
| 180 | func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64) { |
| 181 | for ; index > 0; index-- { |
| 182 | // If there is an error (likely ErrCompacted or ErrUnavailable), we don't |
| 183 | // know whether it's a match or not, so assume a possible match and return |
| 184 | // the index, with 0 term indicating an unknown term. |
| 185 | if ourTerm, err := l.term(index); err != nil { |
| 186 | return index, 0 |
| 187 | } else if ourTerm <= term { |
| 188 | return index, ourTerm |
| 189 | } |
| 190 | } |
| 191 | return 0, 0 |
| 192 | } |
| 193 | |
| 194 | // nextUnstableEnts returns all entries that are available to be written to the |
| 195 | // local stable log and are not already in-progress. |
| 196 | func (l *raftLog) nextUnstableEnts() []pb.Entry { |
| 197 | return l.unstable.nextEntries() |
| 198 | } |
| 199 | |
| 200 | // hasNextUnstableEnts returns if there are any entries that are available to be |
| 201 | // written to the local stable log and are not already in-progress. |
| 202 | func (l *raftLog) hasNextUnstableEnts() bool { |
| 203 | return len(l.nextUnstableEnts()) > 0 |
| 204 | } |
| 205 | |
| 206 | // hasNextOrInProgressUnstableEnts returns if there are any entries that are |
| 207 | // available to be written to the local stable log or in the process of being |
| 208 | // written to the local stable log. |
| 209 | func (l *raftLog) hasNextOrInProgressUnstableEnts() bool { |
| 210 | return len(l.unstable.entries) > 0 |
| 211 | } |
| 212 | |
| 213 | // nextCommittedEnts returns all the available entries for execution. |
| 214 | // Entries can be committed even when the local raft instance has not durably |
| 215 | // appended them to the local raft log yet. If allowUnstable is true, committed |
| 216 | // entries from the unstable log may be returned; otherwise, only entries known |
| 217 | // to reside locally on stable storage will be returned. |
| 218 | func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) { |
| 219 | if l.applyingEntsPaused { |
| 220 | // Entry application outstanding size limit reached. |
| 221 | return nil |
| 222 | } |
| 223 | if l.hasNextOrInProgressSnapshot() { |
| 224 | // See comment in hasNextCommittedEnts. |
| 225 | return nil |
| 226 | } |
| 227 | lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi) |
| 228 | if lo >= hi { |
| 229 | // Nothing to apply. |
| 230 | return nil |
| 231 | } |
| 232 | maxSize := l.maxApplyingEntsSize - l.applyingEntsSize |
| 233 | if maxSize <= 0 { |
| 234 | l.logger.Panicf("applying entry size (%d-%d)=%d not positive", |
| 235 | l.maxApplyingEntsSize, l.applyingEntsSize, maxSize) |
| 236 | } |
| 237 | ents, err := l.slice(lo, hi, maxSize) |
| 238 | if err != nil { |
| 239 | l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) |
| 240 | } |
| 241 | return ents |
| 242 | } |
| 243 | |
| 244 | // hasNextCommittedEnts returns if there is any available entries for execution. |
| 245 | // This is a fast check without heavy raftLog.slice() in nextCommittedEnts(). |
| 246 | func (l *raftLog) hasNextCommittedEnts(allowUnstable bool) bool { |
| 247 | if l.applyingEntsPaused { |
| 248 | // Entry application outstanding size limit reached. |
| 249 | return false |
| 250 | } |
| 251 | if l.hasNextOrInProgressSnapshot() { |
| 252 | // If we have a snapshot to apply, don't also return any committed |
| 253 | // entries. Doing so raises questions about what should be applied |
| 254 | // first. |
| 255 | return false |
| 256 | } |
| 257 | lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi) |
| 258 | return lo < hi |
| 259 | } |
| 260 | |
| 261 | // maxAppliableIndex returns the maximum committed index that can be applied. |
| 262 | // If allowUnstable is true, committed entries from the unstable log can be |
| 263 | // applied; otherwise, only entries known to reside locally on stable storage |
| 264 | // can be applied. |
| 265 | func (l *raftLog) maxAppliableIndex(allowUnstable bool) uint64 { |
| 266 | hi := l.committed |
| 267 | if !allowUnstable { |
| 268 | hi = min(hi, l.unstable.offset-1) |
| 269 | } |
| 270 | return hi |
| 271 | } |
| 272 | |
| 273 | // nextUnstableSnapshot returns the snapshot, if present, that is available to |
| 274 | // be applied to the local storage and is not already in-progress. |
| 275 | func (l *raftLog) nextUnstableSnapshot() *pb.Snapshot { |
| 276 | return l.unstable.nextSnapshot() |
| 277 | } |
| 278 | |
| 279 | // hasNextUnstableSnapshot returns if there is a snapshot that is available to |
| 280 | // be applied to the local storage and is not already in-progress. |
| 281 | func (l *raftLog) hasNextUnstableSnapshot() bool { |
| 282 | return l.unstable.nextSnapshot() != nil |
| 283 | } |
| 284 | |
| 285 | // hasNextOrInProgressSnapshot returns if there is pending snapshot waiting for |
| 286 | // applying or in the process of being applied. |
| 287 | func (l *raftLog) hasNextOrInProgressSnapshot() bool { |
| 288 | return l.unstable.snapshot != nil |
| 289 | } |
| 290 | |
| 291 | func (l *raftLog) snapshot() (pb.Snapshot, error) { |
| 292 | if l.unstable.snapshot != nil { |
| 293 | return *l.unstable.snapshot, nil |
| 294 | } |
| 295 | return l.storage.Snapshot() |
| 296 | } |
| 297 | |
| 298 | func (l *raftLog) firstIndex() uint64 { |
| 299 | if i, ok := l.unstable.maybeFirstIndex(); ok { |
| 300 | return i |
| 301 | } |
| 302 | index, err := l.storage.FirstIndex() |
| 303 | if err != nil { |
| 304 | panic(err) // TODO(bdarnell) |
| 305 | } |
| 306 | return index |
| 307 | } |
| 308 | |
| 309 | func (l *raftLog) lastIndex() uint64 { |
| 310 | if i, ok := l.unstable.maybeLastIndex(); ok { |
| 311 | return i |
| 312 | } |
| 313 | i, err := l.storage.LastIndex() |
| 314 | if err != nil { |
| 315 | panic(err) // TODO(bdarnell) |
| 316 | } |
| 317 | return i |
| 318 | } |
| 319 | |
| 320 | func (l *raftLog) commitTo(tocommit uint64) { |
| 321 | // never decrease commit |
| 322 | if l.committed < tocommit { |
| 323 | if l.lastIndex() < tocommit { |
| 324 | l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex()) |
| 325 | } |
| 326 | l.committed = tocommit |
| 327 | } |
| 328 | } |
| 329 | |
| 330 | func (l *raftLog) appliedTo(i uint64, size entryEncodingSize) { |
| 331 | if l.committed < i || i < l.applied { |
| 332 | l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) |
| 333 | } |
| 334 | l.applied = i |
| 335 | l.applying = max(l.applying, i) |
| 336 | if l.applyingEntsSize > size { |
| 337 | l.applyingEntsSize -= size |
| 338 | } else { |
| 339 | // Defense against underflow. |
| 340 | l.applyingEntsSize = 0 |
| 341 | } |
| 342 | l.applyingEntsPaused = l.applyingEntsSize >= l.maxApplyingEntsSize |
| 343 | } |
| 344 | |
| 345 | func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable bool) { |
| 346 | if l.committed < i { |
| 347 | l.logger.Panicf("applying(%d) is out of range [prevApplying(%d), committed(%d)]", i, l.applying, l.committed) |
| 348 | } |
| 349 | l.applying = i |
| 350 | l.applyingEntsSize += size |
| 351 | // Determine whether to pause entry application until some progress is |
| 352 | // acknowledged. We pause in two cases: |
| 353 | // 1. the outstanding entry size equals or exceeds the maximum size. |
| 354 | // 2. the outstanding entry size does not equal or exceed the maximum size, |
| 355 | // but we determine that the next entry in the log will push us over the |
| 356 | // limit. We determine this by comparing the last entry returned from |
| 357 | // raftLog.nextCommittedEnts to the maximum entry that the method was |
| 358 | // allowed to return had there been no size limit. If these indexes are |
| 359 | // not equal, then the returned entries slice must have been truncated to |
| 360 | // adhere to the memory limit. |
| 361 | l.applyingEntsPaused = l.applyingEntsSize >= l.maxApplyingEntsSize || |
| 362 | i < l.maxAppliableIndex(allowUnstable) |
| 363 | } |
| 364 | |
| 365 | func (l *raftLog) stableTo(id entryID) { l.unstable.stableTo(id) } |
| 366 | |
| 367 | func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } |
| 368 | |
| 369 | // acceptUnstable indicates that the application has started persisting the |
| 370 | // unstable entries in storage, and that the current unstable entries are thus |
| 371 | // to be marked as being in-progress, to avoid returning them with future calls |
| 372 | // to Ready(). |
| 373 | func (l *raftLog) acceptUnstable() { l.unstable.acceptInProgress() } |
| 374 | |
| 375 | // lastEntryID returns the ID of the last entry in the log. |
| 376 | func (l *raftLog) lastEntryID() entryID { |
| 377 | index := l.lastIndex() |
| 378 | t, err := l.term(index) |
| 379 | if err != nil { |
| 380 | l.logger.Panicf("unexpected error when getting the last term at %d: %v", index, err) |
| 381 | } |
| 382 | return entryID{term: t, index: index} |
| 383 | } |
| 384 | |
| 385 | func (l *raftLog) term(i uint64) (uint64, error) { |
| 386 | // Check the unstable log first, even before computing the valid term range, |
| 387 | // which may need to access stable Storage. If we find the entry's term in |
| 388 | // the unstable log, we know it was in the valid range. |
| 389 | if t, ok := l.unstable.maybeTerm(i); ok { |
| 390 | return t, nil |
| 391 | } |
| 392 | |
| 393 | // The valid term range is [firstIndex-1, lastIndex]. Even though the entry at |
| 394 | // firstIndex-1 is compacted away, its term is available for matching purposes |
| 395 | // when doing log appends. |
| 396 | if i+1 < l.firstIndex() { |
| 397 | return 0, ErrCompacted |
| 398 | } |
| 399 | if i > l.lastIndex() { |
| 400 | return 0, ErrUnavailable |
| 401 | } |
| 402 | |
| 403 | t, err := l.storage.Term(i) |
| 404 | if err == nil { |
| 405 | return t, nil |
| 406 | } |
| 407 | if err == ErrCompacted || err == ErrUnavailable { |
| 408 | return 0, err |
| 409 | } |
| 410 | panic(err) // TODO(bdarnell) |
| 411 | } |
| 412 | |
| 413 | func (l *raftLog) entries(i uint64, maxSize entryEncodingSize) ([]pb.Entry, error) { |
| 414 | if i > l.lastIndex() { |
| 415 | return nil, nil |
| 416 | } |
| 417 | return l.slice(i, l.lastIndex()+1, maxSize) |
| 418 | } |
| 419 | |
| 420 | // allEntries returns all entries in the log. |
| 421 | func (l *raftLog) allEntries() []pb.Entry { |
| 422 | ents, err := l.entries(l.firstIndex(), noLimit) |
| 423 | if err == nil { |
| 424 | return ents |
| 425 | } |
| 426 | if err == ErrCompacted { // try again if there was a racing compaction |
| 427 | return l.allEntries() |
| 428 | } |
| 429 | // TODO (xiangli): handle error? |
| 430 | panic(err) |
| 431 | } |
| 432 | |
| 433 | // isUpToDate determines if a log with the given last entry is more up-to-date |
| 434 | // by comparing the index and term of the last entries in the existing logs. |
| 435 | // |
| 436 | // If the logs have last entries with different terms, then the log with the |
| 437 | // later term is more up-to-date. If the logs end with the same term, then |
| 438 | // whichever log has the larger lastIndex is more up-to-date. If the logs are |
| 439 | // the same, the given log is up-to-date. |
| 440 | func (l *raftLog) isUpToDate(their entryID) bool { |
| 441 | our := l.lastEntryID() |
| 442 | return their.term > our.term || their.term == our.term && their.index >= our.index |
| 443 | } |
| 444 | |
| 445 | func (l *raftLog) matchTerm(id entryID) bool { |
| 446 | t, err := l.term(id.index) |
| 447 | if err != nil { |
| 448 | return false |
| 449 | } |
| 450 | return t == id.term |
| 451 | } |
| 452 | |
| 453 | func (l *raftLog) maybeCommit(at entryID) bool { |
| 454 | // NB: term should never be 0 on a commit because the leader campaigned at |
| 455 | // least at term 1. But if it is 0 for some reason, we don't consider this a |
| 456 | // term match. |
| 457 | if at.term != 0 && at.index > l.committed && l.matchTerm(at) { |
| 458 | l.commitTo(at.index) |
| 459 | return true |
| 460 | } |
| 461 | return false |
| 462 | } |
| 463 | |
| 464 | func (l *raftLog) restore(s pb.Snapshot) { |
| 465 | l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term) |
| 466 | l.committed = s.Metadata.Index |
| 467 | l.unstable.restore(s) |
| 468 | } |
| 469 | |
| 470 | // scan visits all log entries in the [lo, hi) range, returning them via the |
| 471 | // given callback. The callback can be invoked multiple times, with consecutive |
| 472 | // sub-ranges of the requested range. Returns up to pageSize bytes worth of |
| 473 | // entries at a time. May return more if a single entry size exceeds the limit. |
| 474 | // |
| 475 | // The entries in [lo, hi) must exist, otherwise scan() eventually returns an |
| 476 | // error (possibly after passing some entries through the callback). |
| 477 | // |
| 478 | // If the callback returns an error, scan terminates and returns this error |
| 479 | // immediately. This can be used to stop the scan early ("break" the loop). |
| 480 | func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.Entry) error) error { |
| 481 | for lo < hi { |
| 482 | ents, err := l.slice(lo, hi, pageSize) |
| 483 | if err != nil { |
| 484 | return err |
| 485 | } else if len(ents) == 0 { |
| 486 | return fmt.Errorf("got 0 entries in [%d, %d)", lo, hi) |
| 487 | } |
| 488 | if err := v(ents); err != nil { |
| 489 | return err |
| 490 | } |
| 491 | lo += uint64(len(ents)) |
| 492 | } |
| 493 | return nil |
| 494 | } |
| 495 | |
| 496 | // slice returns a slice of log entries from lo through hi-1, inclusive. |
| 497 | func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) { |
| 498 | if err := l.mustCheckOutOfBounds(lo, hi); err != nil { |
| 499 | return nil, err |
| 500 | } |
| 501 | if lo == hi { |
| 502 | return nil, nil |
| 503 | } |
| 504 | if lo >= l.unstable.offset { |
| 505 | ents := limitSize(l.unstable.slice(lo, hi), maxSize) |
| 506 | // NB: use the full slice expression to protect the unstable slice from |
| 507 | // appends to the returned ents slice. |
| 508 | return ents[:len(ents):len(ents)], nil |
| 509 | } |
| 510 | |
| 511 | cut := min(hi, l.unstable.offset) |
| 512 | ents, err := l.storage.Entries(lo, cut, uint64(maxSize)) |
| 513 | if err == ErrCompacted { |
| 514 | return nil, err |
| 515 | } else if err == ErrUnavailable { |
| 516 | l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, cut) |
| 517 | } else if err != nil { |
| 518 | panic(err) // TODO(pavelkalinnikov): handle errors uniformly |
| 519 | } |
| 520 | if hi <= l.unstable.offset { |
| 521 | return ents, nil |
| 522 | } |
| 523 | |
| 524 | // Fast path to check if ents has reached the size limitation. Either the |
| 525 | // returned slice is shorter than requested (which means the next entry would |
| 526 | // bring it over the limit), or a single entry reaches the limit. |
| 527 | if uint64(len(ents)) < cut-lo { |
| 528 | return ents, nil |
| 529 | } |
| 530 | // Slow path computes the actual total size, so that unstable entries are cut |
| 531 | // optimally before being copied to ents slice. |
| 532 | size := entsSize(ents) |
| 533 | if size >= maxSize { |
| 534 | return ents, nil |
| 535 | } |
| 536 | |
| 537 | unstable := limitSize(l.unstable.slice(l.unstable.offset, hi), maxSize-size) |
| 538 | // Total size of unstable may exceed maxSize-size only if len(unstable) == 1. |
| 539 | // If this happens, ignore this extra entry. |
| 540 | if len(unstable) == 1 && size+entsSize(unstable) > maxSize { |
| 541 | return ents, nil |
| 542 | } |
| 543 | // Otherwise, total size of unstable does not exceed maxSize-size, so total |
| 544 | // size of ents+unstable does not exceed maxSize. Simply concatenate them. |
| 545 | return extend(ents, unstable), nil |
| 546 | } |
| 547 | |
| 548 | // l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries) |
| 549 | func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { |
| 550 | if lo > hi { |
| 551 | l.logger.Panicf("invalid slice %d > %d", lo, hi) |
| 552 | } |
| 553 | fi := l.firstIndex() |
| 554 | if lo < fi { |
| 555 | return ErrCompacted |
| 556 | } |
| 557 | |
| 558 | length := l.lastIndex() + 1 - fi |
| 559 | if hi > fi+length { |
| 560 | l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex()) |
| 561 | } |
| 562 | return nil |
| 563 | } |
| 564 | |
| 565 | func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 { |
| 566 | if err == nil { |
| 567 | return t |
| 568 | } |
| 569 | if err == ErrCompacted || err == ErrUnavailable { |
| 570 | return 0 |
| 571 | } |
| 572 | l.logger.Panicf("unexpected error (%v)", err) |
| 573 | return 0 |
| 574 | } |