blob: 06253f4e12c70d0cc776554270982ec0f86edd50 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15/*
16Package raft sends and receives messages in the Protocol Buffer format
17defined in the raftpb package.
18
19Raft is a protocol with which a cluster of nodes can maintain a replicated state machine.
20The state machine is kept in sync through the use of a replicated log.
21For more details on Raft, see "In Search of an Understandable Consensus Algorithm"
22(https://raft.github.io/raft.pdf) by Diego Ongaro and John Ousterhout.
23
24A simple example application, _raftexample_, is also available to help illustrate
25how to use this package in practice:
26https://github.com/etcd-io/etcd/tree/main/contrib/raftexample
27
28# Usage
29
30The primary object in raft is a Node. You either start a Node from scratch
31using raft.StartNode or start a Node from some initial state using raft.RestartNode.
32
33To start a node from scratch:
34
35 storage := raft.NewMemoryStorage()
36 c := &Config{
37 ID: 0x01,
38 ElectionTick: 10,
39 HeartbeatTick: 1,
40 Storage: storage,
41 MaxSizePerMsg: 4096,
42 MaxInflightMsgs: 256,
43 }
44 n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
45
46To restart a node from previous state:
47
48 storage := raft.NewMemoryStorage()
49
50 // recover the in-memory storage from persistent
51 // snapshot, state and entries.
52 storage.ApplySnapshot(snapshot)
53 storage.SetHardState(state)
54 storage.Append(entries)
55
56 c := &Config{
57 ID: 0x01,
58 ElectionTick: 10,
59 HeartbeatTick: 1,
60 Storage: storage,
61 MaxSizePerMsg: 4096,
62 MaxInflightMsgs: 256,
63 }
64
65 // restart raft without peer information.
66 // peer information is already included in the storage.
67 n := raft.RestartNode(c)
68
69Now that you are holding onto a Node you have a few responsibilities:
70
71First, you must read from the Node.Ready() channel and process the updates
72it contains. These steps may be performed in parallel, except as noted in step
732.
74
751. Write HardState, Entries, and Snapshot to persistent storage if they are
76not empty. Note that when writing an Entry with Index i, any
77previously-persisted entries with Index >= i must be discarded.
78
792. Send all Messages to the nodes named in the To field. It is important that
80no messages be sent until the latest HardState has been persisted to disk,
81and all Entries written by any previous Ready batch (Messages may be sent while
82entries from the same batch are being persisted). To reduce the I/O latency, an
83optimization can be applied to make leader write to disk in parallel with its
84followers (as explained at section 10.2.1 in Raft thesis). If any Message has type
85MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be
86large).
87
88Note: Marshalling messages is not thread-safe; it is important that you
89make sure that no new entries are persisted while marshalling.
90The easiest way to achieve this is to serialize the messages directly inside
91your main raft loop.
92
933. Apply Snapshot (if any) and CommittedEntries to the state machine.
94If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange()
95to apply it to the node. The configuration change may be cancelled at this point
96by setting the NodeID field to zero before calling ApplyConfChange
97(but ApplyConfChange must be called one way or the other, and the decision to cancel
98must be based solely on the state machine and not external information such as
99the observed health of the node).
100
1014. Call Node.Advance() to signal readiness for the next batch of updates.
102This may be done at any time after step 1, although all updates must be processed
103in the order they were returned by Ready.
104
105Second, all persisted log entries must be made available via an
106implementation of the Storage interface. The provided MemoryStorage
107type can be used for this (if you repopulate its state upon a
108restart), or you can supply your own disk-backed implementation.
109
110Third, when you receive a message from another node, pass it to Node.Step:
111
112 func recvRaftRPC(ctx context.Context, m raftpb.Message) {
113 n.Step(ctx, m)
114 }
115
116Finally, you need to call Node.Tick() at regular intervals (probably
117via a time.Ticker). Raft has two important timeouts: heartbeat and the
118election timeout. However, internally to the raft package time is
119represented by an abstract "tick".
120
121The total state machine handling loop will look something like this:
122
123 for {
124 select {
125 case <-s.Ticker:
126 n.Tick()
127 case rd := <-s.Node.Ready():
128 saveToStorage(rd.State, rd.Entries, rd.Snapshot)
129 send(rd.Messages)
130 if !raft.IsEmptySnap(rd.Snapshot) {
131 processSnapshot(rd.Snapshot)
132 }
133 for _, entry := range rd.CommittedEntries {
134 process(entry)
135 if entry.Type == raftpb.EntryConfChange {
136 var cc raftpb.ConfChange
137 cc.Unmarshal(entry.Data)
138 s.Node.ApplyConfChange(cc)
139 }
140 }
141 s.Node.Advance()
142 case <-s.done:
143 return
144 }
145 }
146
147To propose changes to the state machine from your node take your application
148data, serialize it into a byte slice and call:
149
150 n.Propose(ctx, data)
151
152If the proposal is committed, data will appear in committed entries with type
153raftpb.EntryNormal. There is no guarantee that a proposed command will be
154committed; you may have to re-propose after a timeout.
155
156To add or remove a node in a cluster, build ConfChange struct 'cc' and call:
157
158 n.ProposeConfChange(ctx, cc)
159
160After config change is committed, some committed entry with type
161raftpb.EntryConfChange will be returned. You must apply it to node through:
162
163 var cc raftpb.ConfChange
164 cc.Unmarshal(data)
165 n.ApplyConfChange(cc)
166
167Note: An ID represents a unique node in a cluster for all time. A
168given ID MUST be used only once even if the old node has been removed.
169This means that for example IP addresses make poor node IDs since they
170may be reused. Node IDs must be non-zero.
171
172# Usage with Asynchronous Storage Writes
173
174The library can be configured with an alternate interface for local storage
175writes that can provide better performance in the presence of high proposal
176concurrency by minimizing interference between proposals. This feature is called
177AsynchronousStorageWrites, and can be enabled using the flag on the Config
178struct with the same name.
179
180When Asynchronous Storage Writes is enabled, the responsibility of code using
181the library is different from what was presented above. Users still read from
182the Node.Ready() channel. However, they process the updates it contains in a
183different manner. Users no longer consult the HardState, Entries, and Snapshot
184fields (steps 1 and 3 above). They also no longer call Node.Advance() to
185indicate that they have processed all entries in the Ready (step 4 above).
186Instead, all local storage operations are also communicated through messages
187present in the Ready.Message slice.
188
189The local storage messages come in two flavors. The first flavor is log append
190messages, which target a LocalAppendThread and carry Entries, HardState, and a
191Snapshot. The second flavor is entry application messages, which target a
192LocalApplyThread and carry CommittedEntries. Messages to the same target must be
193reliably processed in order. Messages to different targets can be processed in
194any order.
195
196Each local storage message carries a slice of response messages that must
197delivered after the corresponding storage write has been completed. These
198responses may target the same node or may target other nodes.
199
200With Asynchronous Storage Writes enabled, the total state machine handling loop
201will look something like this:
202
203 for {
204 select {
205 case <-s.Ticker:
206 n.Tick()
207 case rd := <-s.Node.Ready():
208 for _, m := range rd.Messages {
209 switch m.To {
210 case raft.LocalAppendThread:
211 toAppend <- m
212 case raft.LocalApplyThread:
213 toApply <-m
214 default:
215 sendOverNetwork(m)
216 }
217 }
218 case <-s.done:
219 return
220 }
221 }
222
223Usage of Asynchronous Storage Writes will typically also contain a pair of
224storage handler threads, one for log writes (append) and one for entry
225application to the local state machine (apply). Those will look something like:
226
227 // append thread
228 go func() {
229 for {
230 select {
231 case m := <-toAppend:
232 saveToStorage(m.State, m.Entries, m.Snapshot)
233 send(m.Responses)
234 case <-s.done:
235 return
236 }
237 }
238 }
239
240 // apply thread
241 go func() {
242 for {
243 select {
244 case m := <-toApply:
245 for _, entry := range m.CommittedEntries {
246 process(entry)
247 if entry.Type == raftpb.EntryConfChange {
248 var cc raftpb.ConfChange
249 cc.Unmarshal(entry.Data)
250 s.Node.ApplyConfChange(cc)
251 }
252 }
253 send(m.Responses)
254 case <-s.done:
255 return
256 }
257 }
258 }
259
260# Implementation notes
261
262This implementation is up to date with the final Raft thesis
263(https://github.com/ongardie/dissertation/blob/master/stanford.pdf), although our
264implementation of the membership change protocol differs somewhat from
265that described in chapter 4. The key invariant that membership changes
266happen one node at a time is preserved, but in our implementation the
267membership change takes effect when its entry is applied, not when it
268is added to the log (so the entry is committed under the old
269membership instead of the new). This is equivalent in terms of safety,
270since the old and new configurations are guaranteed to overlap.
271
272To ensure that we do not attempt to commit two membership changes at
273once by matching log positions (which would be unsafe since they
274should have different quorum requirements), we simply disallow any
275proposed membership change while any uncommitted change appears in
276the leader's log.
277
278This approach introduces a problem when you try to remove a member
279from a two-member cluster: If one of the members dies before the
280other one receives the commit of the confchange entry, then the member
281cannot be removed any more since the cluster cannot make progress.
282For this reason it is highly recommended to use three or more nodes in
283every cluster.
284
285# MessageType
286
287Package raft sends and receives message in Protocol Buffer format (defined
288in raftpb package). Each state (follower, candidate, leader) implements its
289own 'step' method ('stepFollower', 'stepCandidate', 'stepLeader') when
290advancing with the given raftpb.Message. Each step is determined by its
291raftpb.MessageType. Note that every step is checked by one common method
292'Step' that safety-checks the terms of node and incoming message to prevent
293stale log entries:
294
295 'MsgHup' is used for election. If a node is a follower or candidate, the
296 'tick' function in 'raft' struct is set as 'tickElection'. If a follower or
297 candidate has not received any heartbeat before the election timeout, it
298 passes 'MsgHup' to its Step method and becomes (or remains) a candidate to
299 start a new election.
300
301 'MsgBeat' is an internal type that signals the leader to send a heartbeat of
302 the 'MsgHeartbeat' type. If a node is a leader, the 'tick' function in
303 the 'raft' struct is set as 'tickHeartbeat', and triggers the leader to
304 send periodic 'MsgHeartbeat' messages to its followers.
305
306 'MsgProp' proposes to append data to its log entries. This is a special
307 type to redirect proposals to leader. Therefore, send method overwrites
308 raftpb.Message's term with its HardState's term to avoid attaching its
309 local term to 'MsgProp'. When 'MsgProp' is passed to the leader's 'Step'
310 method, the leader first calls the 'appendEntry' method to append entries
311 to its log, and then calls 'bcastAppend' method to send those entries to
312 its peers. When passed to candidate, 'MsgProp' is dropped. When passed to
313 follower, 'MsgProp' is stored in follower's mailbox(msgs) by the send
314 method. It is stored with sender's ID and later forwarded to leader by
315 rafthttp package.
316
317 'MsgApp' contains log entries to replicate. A leader calls bcastAppend,
318 which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
319 type. When 'MsgApp' is passed to candidate's Step method, candidate reverts
320 back to follower, because it indicates that there is a valid leader sending
321 'MsgApp' messages. Candidate and follower respond to this message in
322 'MsgAppResp' type.
323
324 'MsgAppResp' is response to log replication request('MsgApp'). When
325 'MsgApp' is passed to candidate or follower's Step method, it responds by
326 calling 'handleAppendEntries' method, which sends 'MsgAppResp' to raft
327 mailbox.
328
329 'MsgVote' requests votes for election. When a node is a follower or
330 candidate and 'MsgHup' is passed to its Step method, then the node calls
331 'campaign' method to campaign itself to become a leader. Once 'campaign'
332 method is called, the node becomes candidate and sends 'MsgVote' to peers
333 in cluster to request votes. When passed to leader or candidate's Step
334 method and the message's Term is lower than leader's or candidate's,
335 'MsgVote' will be rejected ('MsgVoteResp' is returned with Reject true).
336 If leader or candidate receives 'MsgVote' with higher term, it will revert
337 back to follower. When 'MsgVote' is passed to follower, it votes for the
338 sender only when sender's last term is greater than MsgVote's term or
339 sender's last term is equal to MsgVote's term but sender's last committed
340 index is greater than or equal to follower's.
341
342 'MsgVoteResp' contains responses from voting request. When 'MsgVoteResp' is
343 passed to candidate, the candidate calculates how many votes it has won. If
344 it's more than majority (quorum), it becomes leader and calls 'bcastAppend'.
345 If candidate receives majority of votes of denials, it reverts back to
346 follower.
347
348 'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election
349 protocol. When Config.PreVote is true, a pre-election is carried out first
350 (using the same rules as a regular election), and no node increases its term
351 number unless the pre-election indicates that the campaigning node would win.
352 This minimizes disruption when a partitioned node rejoins the cluster.
353
354 'MsgSnap' requests to install a snapshot message. When a node has just
355 become a leader or the leader receives 'MsgProp' message, it calls
356 'bcastAppend' method, which then calls 'sendAppend' method to each
357 follower. In 'sendAppend', if a leader fails to get term or entries,
358 the leader requests snapshot by sending 'MsgSnap' type message.
359
360 'MsgSnapStatus' tells the result of snapshot install message. When a
361 follower rejected 'MsgSnap', it indicates the snapshot request with
362 'MsgSnap' had failed from network issues which causes the network layer
363 to fail to send out snapshots to its followers. Then leader considers
364 follower's progress as probe. When 'MsgSnap' were not rejected, it
365 indicates that the snapshot succeeded and the leader sets follower's
366 progress to probe and resumes its log replication.
367
368 'MsgHeartbeat' sends heartbeat from leader. When 'MsgHeartbeat' is passed
369 to candidate and message's term is higher than candidate's, the candidate
370 reverts back to follower and updates its committed index from the one in
371 this heartbeat. And it sends the message to its mailbox. When
372 'MsgHeartbeat' is passed to follower's Step method and message's term is
373 higher than follower's, the follower updates its leaderID with the ID
374 from the message.
375
376 'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp'
377 is passed to leader's Step method, the leader knows which follower
378 responded. And only when the leader's last committed index is greater than
379 follower's Match index, the leader runs 'sendAppend` method.
380
381 'MsgUnreachable' tells that request(message) wasn't delivered. When
382 'MsgUnreachable' is passed to leader's Step method, the leader discovers
383 that the follower that sent this 'MsgUnreachable' is not reachable, often
384 indicating 'MsgApp' is lost. When follower's progress state is replicate,
385 the leader sets it back to probe.
386
387 'MsgStorageAppend' is a message from a node to its local append storage
388 thread to write entries, hard state, and/or a snapshot to stable storage.
389 The message will carry one or more responses, one of which will be a
390 'MsgStorageAppendResp' back to itself. The responses can also contain
391 'MsgAppResp', 'MsgVoteResp', and 'MsgPreVoteResp' messages. Used with
392 AsynchronousStorageWrites.
393
394 'MsgStorageApply' is a message from a node to its local apply storage
395 thread to apply committed entries. The message will carry one response,
396 which will be a 'MsgStorageApplyResp' back to itself. Used with
397 AsynchronousStorageWrites.
398*/
399package raft