blob: 2629aae86b99cf026ef671d3dfd8013c21e0ff15 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import pb "go.etcd.io/raft/v3/raftpb"
18
19// unstable contains "unstable" log entries and snapshot state that has
20// not yet been written to Storage. The type serves two roles. First, it
21// holds on to new log entries and an optional snapshot until they are
22// handed to a Ready struct for persistence. Second, it continues to
23// hold on to this state after it has been handed off to provide raftLog
24// with a view of the in-progress log entries and snapshot until their
25// writes have been stabilized and are guaranteed to be reflected in
26// queries of Storage. After this point, the corresponding log entries
27// and/or snapshot can be cleared from unstable.
28//
29// unstable.entries[i] has raft log position i+unstable.offset.
30// Note that unstable.offset may be less than the highest log
31// position in storage; this means that the next write to storage
32// might need to truncate the log before persisting unstable.entries.
33type unstable struct {
34 // the incoming unstable snapshot, if any.
35 snapshot *pb.Snapshot
36 // all entries that have not yet been written to storage.
37 entries []pb.Entry
38 // entries[i] has raft log position i+offset.
39 offset uint64
40
41 // if true, snapshot is being written to storage.
42 snapshotInProgress bool
43 // entries[:offsetInProgress-offset] are being written to storage.
44 // Like offset, offsetInProgress is exclusive, meaning that it
45 // contains the index following the largest in-progress entry.
46 // Invariant: offset <= offsetInProgress
47 offsetInProgress uint64
48
49 logger Logger
50}
51
52// maybeFirstIndex returns the index of the first possible entry in entries
53// if it has a snapshot.
54func (u *unstable) maybeFirstIndex() (uint64, bool) {
55 if u.snapshot != nil {
56 return u.snapshot.Metadata.Index + 1, true
57 }
58 return 0, false
59}
60
61// maybeLastIndex returns the last index if it has at least one
62// unstable entry or snapshot.
63func (u *unstable) maybeLastIndex() (uint64, bool) {
64 if l := len(u.entries); l != 0 {
65 return u.offset + uint64(l) - 1, true
66 }
67 if u.snapshot != nil {
68 return u.snapshot.Metadata.Index, true
69 }
70 return 0, false
71}
72
73// maybeTerm returns the term of the entry at index i, if there
74// is any.
75func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
76 if i < u.offset {
77 if u.snapshot != nil && u.snapshot.Metadata.Index == i {
78 return u.snapshot.Metadata.Term, true
79 }
80 return 0, false
81 }
82
83 last, ok := u.maybeLastIndex()
84 if !ok {
85 return 0, false
86 }
87 if i > last {
88 return 0, false
89 }
90
91 return u.entries[i-u.offset].Term, true
92}
93
94// nextEntries returns the unstable entries that are not already in the process
95// of being written to storage.
96func (u *unstable) nextEntries() []pb.Entry {
97 inProgress := int(u.offsetInProgress - u.offset)
98 if len(u.entries) == inProgress {
99 return nil
100 }
101 return u.entries[inProgress:]
102}
103
104// nextSnapshot returns the unstable snapshot, if one exists that is not already
105// in the process of being written to storage.
106func (u *unstable) nextSnapshot() *pb.Snapshot {
107 if u.snapshot == nil || u.snapshotInProgress {
108 return nil
109 }
110 return u.snapshot
111}
112
113// acceptInProgress marks all entries and the snapshot, if any, in the unstable
114// as having begun the process of being written to storage. The entries/snapshot
115// will no longer be returned from nextEntries/nextSnapshot. However, new
116// entries/snapshots added after a call to acceptInProgress will be returned
117// from those methods, until the next call to acceptInProgress.
118func (u *unstable) acceptInProgress() {
119 if len(u.entries) > 0 {
120 // NOTE: +1 because offsetInProgress is exclusive, like offset.
121 u.offsetInProgress = u.entries[len(u.entries)-1].Index + 1
122 }
123 if u.snapshot != nil {
124 u.snapshotInProgress = true
125 }
126}
127
128// stableTo marks entries up to the entry with the specified (index, term) as
129// being successfully written to stable storage.
130//
131// The method should only be called when the caller can attest that the entries
132// can not be overwritten by an in-progress log append. See the related comment
133// in newStorageAppendRespMsg.
134func (u *unstable) stableTo(id entryID) {
135 gt, ok := u.maybeTerm(id.index)
136 if !ok {
137 // Unstable entry missing. Ignore.
138 u.logger.Infof("entry at index %d missing from unstable log; ignoring", id.index)
139 return
140 }
141 if id.index < u.offset {
142 // Index matched unstable snapshot, not unstable entry. Ignore.
143 u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", id.index)
144 return
145 }
146 if gt != id.term {
147 // Term mismatch between unstable entry and specified entry. Ignore.
148 // This is possible if part or all of the unstable log was replaced
149 // between that time that a set of entries started to be written to
150 // stable storage and when they finished.
151 u.logger.Infof("entry at (index,term)=(%d,%d) mismatched with "+
152 "entry at (%d,%d) in unstable log; ignoring", id.index, id.term, id.index, gt)
153 return
154 }
155 num := int(id.index + 1 - u.offset)
156 u.entries = u.entries[num:]
157 u.offset = id.index + 1
158 u.offsetInProgress = max(u.offsetInProgress, u.offset)
159 u.shrinkEntriesArray()
160}
161
162// shrinkEntriesArray discards the underlying array used by the entries slice
163// if most of it isn't being used. This avoids holding references to a bunch of
164// potentially large entries that aren't needed anymore. Simply clearing the
165// entries wouldn't be safe because clients might still be using them.
166func (u *unstable) shrinkEntriesArray() {
167 // We replace the array if we're using less than half of the space in
168 // it. This number is fairly arbitrary, chosen as an attempt to balance
169 // memory usage vs number of allocations. It could probably be improved
170 // with some focused tuning.
171 const lenMultiple = 2
172 if len(u.entries) == 0 {
173 u.entries = nil
174 } else if len(u.entries)*lenMultiple < cap(u.entries) {
175 newEntries := make([]pb.Entry, len(u.entries))
176 copy(newEntries, u.entries)
177 u.entries = newEntries
178 }
179}
180
181func (u *unstable) stableSnapTo(i uint64) {
182 if u.snapshot != nil && u.snapshot.Metadata.Index == i {
183 u.snapshot = nil
184 u.snapshotInProgress = false
185 }
186}
187
188func (u *unstable) restore(s pb.Snapshot) {
189 u.offset = s.Metadata.Index + 1
190 u.offsetInProgress = u.offset
191 u.entries = nil
192 u.snapshot = &s
193 u.snapshotInProgress = false
194}
195
196func (u *unstable) truncateAndAppend(ents []pb.Entry) {
197 fromIndex := ents[0].Index
198 switch {
199 case fromIndex == u.offset+uint64(len(u.entries)):
200 // fromIndex is the next index in the u.entries, so append directly.
201 u.entries = append(u.entries, ents...)
202 case fromIndex <= u.offset:
203 u.logger.Infof("replace the unstable entries from index %d", fromIndex)
204 // The log is being truncated to before our current offset
205 // portion, so set the offset and replace the entries.
206 u.entries = ents
207 u.offset = fromIndex
208 u.offsetInProgress = u.offset
209 default:
210 // Truncate to fromIndex (exclusive), and append the new entries.
211 u.logger.Infof("truncate the unstable entries before index %d", fromIndex)
212 keep := u.slice(u.offset, fromIndex) // NB: appending to this slice is safe,
213 u.entries = append(keep, ents...) // and will reallocate/copy it
214 // Only in-progress entries before fromIndex are still considered to be
215 // in-progress.
216 u.offsetInProgress = min(u.offsetInProgress, fromIndex)
217 }
218}
219
220// slice returns the entries from the unstable log with indexes in the range
221// [lo, hi). The entire range must be stored in the unstable log or the method
222// will panic. The returned slice can be appended to, but the entries in it must
223// not be changed because they are still shared with unstable.
224//
225// TODO(pavelkalinnikov): this, and similar []pb.Entry slices, may bubble up all
226// the way to the application code through Ready struct. Protect other slices
227// similarly, and document how the client can use them.
228func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
229 u.mustCheckOutOfBounds(lo, hi)
230 // NB: use the full slice expression to limit what the caller can do with the
231 // returned slice. For example, an append will reallocate and copy this slice
232 // instead of corrupting the neighbouring u.entries.
233 return u.entries[lo-u.offset : hi-u.offset : hi-u.offset]
234}
235
236// u.offset <= lo <= hi <= u.offset+len(u.entries)
237func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
238 if lo > hi {
239 u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi)
240 }
241 upper := u.offset + uint64(len(u.entries))
242 if lo < u.offset || hi > upper {
243 u.logger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
244 }
245}