blob: f616c3150fbdf90f8253baea47097dd75972cbb8 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18 "errors"
19 "sync"
20
21 pb "go.etcd.io/raft/v3/raftpb"
22)
23
24// ErrCompacted is returned by Storage.Entries/Compact when a requested
25// index is unavailable because it predates the last snapshot.
26var ErrCompacted = errors.New("requested index is unavailable due to compaction")
27
28// ErrSnapOutOfDate is returned by Storage.CreateSnapshot when a requested
29// index is older than the existing snapshot.
30var ErrSnapOutOfDate = errors.New("requested index is older than the existing snapshot")
31
32// ErrUnavailable is returned by Storage interface when the requested log entries
33// are unavailable.
34var ErrUnavailable = errors.New("requested entry at index is unavailable")
35
36// ErrSnapshotTemporarilyUnavailable is returned by the Storage interface when the required
37// snapshot is temporarily unavailable.
38var ErrSnapshotTemporarilyUnavailable = errors.New("snapshot is temporarily unavailable")
39
40// Storage is an interface that may be implemented by the application
41// to retrieve log entries from storage.
42//
43// If any Storage method returns an error, the raft instance will
44// become inoperable and refuse to participate in elections; the
45// application is responsible for cleanup and recovery in this case.
46type Storage interface {
47 // TODO(tbg): split this into two interfaces, LogStorage and StateStorage.
48
49 // InitialState returns the saved HardState and ConfState information.
50 InitialState() (pb.HardState, pb.ConfState, error)
51
52 // Entries returns a slice of consecutive log entries in the range [lo, hi),
53 // starting from lo. The maxSize limits the total size of the log entries
54 // returned, but Entries returns at least one entry if any.
55 //
56 // The caller of Entries owns the returned slice, and may append to it. The
57 // individual entries in the slice must not be mutated, neither by the Storage
58 // implementation nor the caller. Note that raft may forward these entries
59 // back to the application via Ready struct, so the corresponding handler must
60 // not mutate entries either (see comments in Ready struct).
61 //
62 // Since the caller may append to the returned slice, Storage implementation
63 // must protect its state from corruption that such appends may cause. For
64 // example, common ways to do so are:
65 // - allocate the slice before returning it (safest option),
66 // - return a slice protected by Go full slice expression, which causes
67 // copying on appends (see MemoryStorage).
68 //
69 // Returns ErrCompacted if entry lo has been compacted, or ErrUnavailable if
70 // encountered an unavailable entry in [lo, hi).
71 Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
72
73 // Term returns the term of entry i, which must be in the range
74 // [FirstIndex()-1, LastIndex()]. The term of the entry before
75 // FirstIndex is retained for matching purposes even though the
76 // rest of that entry may not be available.
77 Term(i uint64) (uint64, error)
78 // LastIndex returns the index of the last entry in the log.
79 LastIndex() (uint64, error)
80 // FirstIndex returns the index of the first log entry that is
81 // possibly available via Entries (older entries have been incorporated
82 // into the latest Snapshot; if storage only contains the dummy entry the
83 // first log entry is not available).
84 FirstIndex() (uint64, error)
85 // Snapshot returns the most recent snapshot.
86 // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
87 // so raft state machine could know that Storage needs some time to prepare
88 // snapshot and call Snapshot later.
89 Snapshot() (pb.Snapshot, error)
90}
91
92type inMemStorageCallStats struct {
93 initialState, firstIndex, lastIndex, entries, term, snapshot int
94}
95
96// MemoryStorage implements the Storage interface backed by an
97// in-memory array.
98type MemoryStorage struct {
99 // Protects access to all fields. Most methods of MemoryStorage are
100 // run on the raft goroutine, but Append() is run on an application
101 // goroutine.
102 sync.Mutex
103
104 hardState pb.HardState
105 snapshot pb.Snapshot
106 // ents[i] has raft log position i+snapshot.Metadata.Index
107 ents []pb.Entry
108
109 callStats inMemStorageCallStats
110}
111
112// NewMemoryStorage creates an empty MemoryStorage.
113func NewMemoryStorage() *MemoryStorage {
114 return &MemoryStorage{
115 // When starting from scratch populate the list with a dummy entry at term zero.
116 ents: make([]pb.Entry, 1),
117 }
118}
119
120// InitialState implements the Storage interface.
121func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error) {
122 ms.callStats.initialState++
123 return ms.hardState, ms.snapshot.Metadata.ConfState, nil
124}
125
126// SetHardState saves the current HardState.
127func (ms *MemoryStorage) SetHardState(st pb.HardState) error {
128 ms.Lock()
129 defer ms.Unlock()
130 ms.hardState = st
131 return nil
132}
133
134// Entries implements the Storage interface.
135func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) {
136 ms.Lock()
137 defer ms.Unlock()
138 ms.callStats.entries++
139 offset := ms.ents[0].Index
140 if lo <= offset {
141 return nil, ErrCompacted
142 }
143 if hi > ms.lastIndex()+1 {
144 getLogger().Panicf("entries' hi(%d) is out of bound lastindex(%d)", hi, ms.lastIndex())
145 }
146 // only contains dummy entries.
147 if len(ms.ents) == 1 {
148 return nil, ErrUnavailable
149 }
150
151 ents := limitSize(ms.ents[lo-offset:hi-offset], entryEncodingSize(maxSize))
152 // NB: use the full slice expression to limit what the caller can do with the
153 // returned slice. For example, an append will reallocate and copy this slice
154 // instead of corrupting the neighbouring ms.ents.
155 return ents[:len(ents):len(ents)], nil
156}
157
158// Term implements the Storage interface.
159func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
160 ms.Lock()
161 defer ms.Unlock()
162 ms.callStats.term++
163 offset := ms.ents[0].Index
164 if i < offset {
165 return 0, ErrCompacted
166 }
167 if int(i-offset) >= len(ms.ents) {
168 return 0, ErrUnavailable
169 }
170 return ms.ents[i-offset].Term, nil
171}
172
173// LastIndex implements the Storage interface.
174func (ms *MemoryStorage) LastIndex() (uint64, error) {
175 ms.Lock()
176 defer ms.Unlock()
177 ms.callStats.lastIndex++
178 return ms.lastIndex(), nil
179}
180
181func (ms *MemoryStorage) lastIndex() uint64 {
182 return ms.ents[0].Index + uint64(len(ms.ents)) - 1
183}
184
185// FirstIndex implements the Storage interface.
186func (ms *MemoryStorage) FirstIndex() (uint64, error) {
187 ms.Lock()
188 defer ms.Unlock()
189 ms.callStats.firstIndex++
190 return ms.firstIndex(), nil
191}
192
193func (ms *MemoryStorage) firstIndex() uint64 {
194 return ms.ents[0].Index + 1
195}
196
197// Snapshot implements the Storage interface.
198func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) {
199 ms.Lock()
200 defer ms.Unlock()
201 ms.callStats.snapshot++
202 return ms.snapshot, nil
203}
204
205// ApplySnapshot overwrites the contents of this Storage object with
206// those of the given snapshot.
207func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error {
208 ms.Lock()
209 defer ms.Unlock()
210
211 //handle check for old snapshot being applied
212 msIndex := ms.snapshot.Metadata.Index
213 snapIndex := snap.Metadata.Index
214 if msIndex >= snapIndex {
215 return ErrSnapOutOfDate
216 }
217
218 ms.snapshot = snap
219 ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}}
220 return nil
221}
222
223// CreateSnapshot makes a snapshot which can be retrieved with Snapshot() and
224// can be used to reconstruct the state at that point.
225// If any configuration changes have been made since the last compaction,
226// the result of the last ApplyConfChange must be passed in.
227func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error) {
228 ms.Lock()
229 defer ms.Unlock()
230 if i <= ms.snapshot.Metadata.Index {
231 return pb.Snapshot{}, ErrSnapOutOfDate
232 }
233
234 offset := ms.ents[0].Index
235 if i > ms.lastIndex() {
236 getLogger().Panicf("snapshot %d is out of bound lastindex(%d)", i, ms.lastIndex())
237 }
238
239 ms.snapshot.Metadata.Index = i
240 ms.snapshot.Metadata.Term = ms.ents[i-offset].Term
241 if cs != nil {
242 ms.snapshot.Metadata.ConfState = *cs
243 }
244 ms.snapshot.Data = data
245 return ms.snapshot, nil
246}
247
248// Compact discards all log entries prior to compactIndex.
249// It is the application's responsibility to not attempt to compact an index
250// greater than raftLog.applied.
251func (ms *MemoryStorage) Compact(compactIndex uint64) error {
252 ms.Lock()
253 defer ms.Unlock()
254 offset := ms.ents[0].Index
255 if compactIndex <= offset {
256 return ErrCompacted
257 }
258 if compactIndex > ms.lastIndex() {
259 getLogger().Panicf("compact %d is out of bound lastindex(%d)", compactIndex, ms.lastIndex())
260 }
261
262 i := compactIndex - offset
263 // NB: allocate a new slice instead of reusing the old ms.ents. Entries in
264 // ms.ents are immutable, and can be referenced from outside MemoryStorage
265 // through slices returned by ms.Entries().
266 ents := make([]pb.Entry, 1, uint64(len(ms.ents))-i)
267 ents[0].Index = ms.ents[i].Index
268 ents[0].Term = ms.ents[i].Term
269 ents = append(ents, ms.ents[i+1:]...)
270 ms.ents = ents
271 return nil
272}
273
274// Append the new entries to storage.
275// TODO (xiangli): ensure the entries are continuous and
276// entries[0].Index > ms.entries[0].Index
277func (ms *MemoryStorage) Append(entries []pb.Entry) error {
278 if len(entries) == 0 {
279 return nil
280 }
281
282 ms.Lock()
283 defer ms.Unlock()
284
285 first := ms.firstIndex()
286 last := entries[0].Index + uint64(len(entries)) - 1
287
288 // shortcut if there is no new entry.
289 if last < first {
290 return nil
291 }
292 // truncate compacted entries
293 if first > entries[0].Index {
294 entries = entries[first-entries[0].Index:]
295 }
296
297 offset := entries[0].Index - ms.ents[0].Index
298 switch {
299 case uint64(len(ms.ents)) > offset:
300 // NB: full slice expression protects ms.ents at index >= offset from
301 // rewrites, as they may still be referenced from outside MemoryStorage.
302 ms.ents = append(ms.ents[:offset:offset], entries...)
303 case uint64(len(ms.ents)) == offset:
304 ms.ents = append(ms.ents, entries...)
305 default:
306 getLogger().Panicf("missing log entry [last: %d, append at: %d]",
307 ms.lastIndex(), entries[0].Index)
308 }
309 return nil
310}