blob: 5716661c8667a7b2e162a41f26d58d97b0f25daf [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2019 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package tracker
16
17import (
18 "fmt"
19 "slices"
20 "strings"
21)
22
23// Progress represents a follower’s progress in the view of the leader. Leader
24// maintains progresses of all followers, and sends entries to the follower
25// based on its progress.
26//
27// NB(tbg): Progress is basically a state machine whose transitions are mostly
28// strewn around `*raft.raft`. Additionally, some fields are only used when in a
29// certain State. All of this isn't ideal.
30type Progress struct {
31 // Match is the index up to which the follower's log is known to match the
32 // leader's.
33 Match uint64
34 // Next is the log index of the next entry to send to this follower. All
35 // entries with indices in (Match, Next) interval are already in flight.
36 //
37 // Invariant: 0 <= Match < Next.
38 // NB: it follows that Next >= 1.
39 //
40 // In StateSnapshot, Next == PendingSnapshot + 1.
41 Next uint64
42
43 // sentCommit is the highest commit index in flight to the follower.
44 //
45 // Generally, it is monotonic, but con regress in some cases, e.g. when
46 // converting to `StateProbe` or when receiving a rejection from a follower.
47 //
48 // In StateSnapshot, sentCommit == PendingSnapshot == Next-1.
49 sentCommit uint64
50
51 // State defines how the leader should interact with the follower.
52 //
53 // When in StateProbe, leader sends at most one replication message
54 // per heartbeat interval. It also probes actual progress of the follower.
55 //
56 // When in StateReplicate, leader optimistically increases next
57 // to the latest entry sent after sending replication message. This is
58 // an optimized state for fast replicating log entries to the follower.
59 //
60 // When in StateSnapshot, leader should have sent out snapshot
61 // before and stops sending any replication message.
62 State StateType
63
64 // PendingSnapshot is used in StateSnapshot and tracks the last index of the
65 // leader at the time at which it realized a snapshot was necessary. This
66 // matches the index in the MsgSnap message emitted from raft.
67 //
68 // While there is a pending snapshot, replication to the follower is paused.
69 // The follower will transition back to StateReplicate if the leader
70 // receives an MsgAppResp from it that reconnects the follower to the
71 // leader's log (such an MsgAppResp is emitted when the follower applies a
72 // snapshot). It may be surprising that PendingSnapshot is not taken into
73 // account here, but consider that complex systems may delegate the sending
74 // of snapshots to alternative datasources (i.e. not the leader). In such
75 // setups, it is difficult to manufacture a snapshot at a particular index
76 // requested by raft and the actual index may be ahead or behind. This
77 // should be okay, as long as the snapshot allows replication to resume.
78 //
79 // The follower will transition to StateProbe if ReportSnapshot is called on
80 // the leader; if SnapshotFinish is passed then PendingSnapshot becomes the
81 // basis for the next attempt to append. In practice, the first mechanism is
82 // the one that is relevant in most cases. However, if this MsgAppResp is
83 // lost (fallible network) then the second mechanism ensures that in this
84 // case the follower does not erroneously remain in StateSnapshot.
85 PendingSnapshot uint64
86
87 // RecentActive is true if the progress is recently active. Receiving any messages
88 // from the corresponding follower indicates the progress is active.
89 // RecentActive can be reset to false after an election timeout.
90 // This is always true on the leader.
91 RecentActive bool
92
93 // MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This
94 // happens in StateProbe, or StateReplicate with saturated Inflights. In both
95 // cases, we need to continue sending MsgApp once in a while to guarantee
96 // progress, but we only do so when MsgAppFlowPaused is false (it is reset on
97 // receiving a heartbeat response), to not overflow the receiver. See
98 // IsPaused().
99 MsgAppFlowPaused bool
100
101 // Inflights is a sliding window for the inflight messages.
102 // Each inflight message contains one or more log entries.
103 // The max number of entries per message is defined in raft config as MaxSizePerMsg.
104 // Thus inflight effectively limits both the number of inflight messages
105 // and the bandwidth each Progress can use.
106 // When inflights is Full, no more message should be sent.
107 // When a leader sends out a message, the index of the last
108 // entry should be added to inflights. The index MUST be added
109 // into inflights in order.
110 // When a leader receives a reply, the previous inflights should
111 // be freed by calling inflights.FreeLE with the index of the last
112 // received entry.
113 Inflights *Inflights
114
115 // IsLearner is true if this progress is tracked for a learner.
116 IsLearner bool
117}
118
119// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused,
120// PendingSnapshot, and Inflights.
121func (pr *Progress) ResetState(state StateType) {
122 pr.MsgAppFlowPaused = false
123 pr.PendingSnapshot = 0
124 pr.State = state
125 pr.Inflights.reset()
126}
127
128// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
129// optionally and if larger, the index of the pending snapshot.
130func (pr *Progress) BecomeProbe() {
131 // If the original state is StateSnapshot, progress knows that
132 // the pending snapshot has been sent to this peer successfully, then
133 // probes from pendingSnapshot + 1.
134 if pr.State == StateSnapshot {
135 pendingSnapshot := pr.PendingSnapshot
136 pr.ResetState(StateProbe)
137 pr.Next = max(pr.Match+1, pendingSnapshot+1)
138 } else {
139 pr.ResetState(StateProbe)
140 pr.Next = pr.Match + 1
141 }
142 pr.sentCommit = min(pr.sentCommit, pr.Next-1)
143}
144
145// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
146func (pr *Progress) BecomeReplicate() {
147 pr.ResetState(StateReplicate)
148 pr.Next = pr.Match + 1
149}
150
151// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
152// snapshot index.
153func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
154 pr.ResetState(StateSnapshot)
155 pr.PendingSnapshot = snapshoti
156 pr.Next = snapshoti + 1
157 pr.sentCommit = snapshoti
158}
159
160// SentEntries updates the progress on the given number of consecutive entries
161// being sent in a MsgApp, with the given total bytes size, appended at log
162// indices >= pr.Next.
163//
164// Must be used with StateProbe or StateReplicate.
165func (pr *Progress) SentEntries(entries int, bytes uint64) {
166 switch pr.State {
167 case StateReplicate:
168 if entries > 0 {
169 pr.Next += uint64(entries)
170 pr.Inflights.Add(pr.Next-1, bytes)
171 }
172 // If this message overflows the in-flights tracker, or it was already full,
173 // consider this message being a probe, so that the flow is paused.
174 pr.MsgAppFlowPaused = pr.Inflights.Full()
175 case StateProbe:
176 // TODO(pavelkalinnikov): this condition captures the previous behaviour,
177 // but we should set MsgAppFlowPaused unconditionally for simplicity, because any
178 // MsgApp in StateProbe is a probe, not only non-empty ones.
179 if entries > 0 {
180 pr.MsgAppFlowPaused = true
181 }
182 default:
183 panic(fmt.Sprintf("sending append in unhandled state %s", pr.State))
184 }
185}
186
187// CanBumpCommit returns true if sending the given commit index can potentially
188// advance the follower's commit index.
189func (pr *Progress) CanBumpCommit(index uint64) bool {
190 // Sending the given commit index may bump the follower's commit index up to
191 // Next-1 in normal operation, or higher in some rare cases. Allow sending a
192 // commit index eagerly only if we haven't already sent one that bumps the
193 // follower's commit all the way to Next-1.
194 return index > pr.sentCommit && pr.sentCommit < pr.Next-1
195}
196
197// SentCommit updates the sentCommit.
198func (pr *Progress) SentCommit(commit uint64) {
199 pr.sentCommit = commit
200}
201
202// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
203// index acked by it. The method returns false if the given n index comes from
204// an outdated message. Otherwise it updates the progress and returns true.
205func (pr *Progress) MaybeUpdate(n uint64) bool {
206 if n <= pr.Match {
207 return false
208 }
209 pr.Match = n
210 pr.Next = max(pr.Next, n+1) // invariant: Match < Next
211 pr.MsgAppFlowPaused = false
212 return true
213}
214
215// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
216// arguments are the index of the append message rejected by the follower, and
217// the hint that we want to decrease to.
218//
219// Rejections can happen spuriously as messages are sent out of order or
220// duplicated. In such cases, the rejection pertains to an index that the
221// Progress already knows were previously acknowledged, and false is returned
222// without changing the Progress.
223//
224// If the rejection is genuine, Next is lowered sensibly, and the Progress is
225// cleared for sending log entries.
226func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
227 if pr.State == StateReplicate {
228 // The rejection must be stale if the progress has matched and "rejected"
229 // is smaller than "match".
230 if rejected <= pr.Match {
231 return false
232 }
233 // Directly decrease next to match + 1.
234 //
235 // TODO(tbg): why not use matchHint if it's larger?
236 pr.Next = pr.Match + 1
237 // Regress the sentCommit since it unlikely has been applied.
238 pr.sentCommit = min(pr.sentCommit, pr.Next-1)
239 return true
240 }
241
242 // The rejection must be stale if "rejected" does not match next - 1. This
243 // is because non-replicating followers are probed one entry at a time.
244 // The check is a best effort assuming message reordering is rare.
245 if pr.Next-1 != rejected {
246 return false
247 }
248
249 pr.Next = max(min(rejected, matchHint+1), pr.Match+1)
250 // Regress the sentCommit since it unlikely has been applied.
251 pr.sentCommit = min(pr.sentCommit, pr.Next-1)
252 pr.MsgAppFlowPaused = false
253 return true
254}
255
256// IsPaused returns whether sending log entries to this node has been throttled.
257// This is done when a node has rejected recent MsgApps, is currently waiting
258// for a snapshot, or has reached the MaxInflightMsgs limit. In normal
259// operation, this is false. A throttled node will be contacted less frequently
260// until it has reached a state in which it's able to accept a steady stream of
261// log entries again.
262func (pr *Progress) IsPaused() bool {
263 switch pr.State {
264 case StateProbe:
265 return pr.MsgAppFlowPaused
266 case StateReplicate:
267 return pr.MsgAppFlowPaused
268 case StateSnapshot:
269 return true
270 default:
271 panic("unexpected state")
272 }
273}
274
275func (pr *Progress) String() string {
276 var buf strings.Builder
277 fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next)
278 if pr.IsLearner {
279 fmt.Fprint(&buf, " learner")
280 }
281 if pr.IsPaused() {
282 fmt.Fprint(&buf, " paused")
283 }
284 if pr.PendingSnapshot > 0 {
285 fmt.Fprintf(&buf, " pendingSnap=%d", pr.PendingSnapshot)
286 }
287 if !pr.RecentActive {
288 fmt.Fprint(&buf, " inactive")
289 }
290 if n := pr.Inflights.Count(); n > 0 {
291 fmt.Fprintf(&buf, " inflight=%d", n)
292 if pr.Inflights.Full() {
293 fmt.Fprint(&buf, "[full]")
294 }
295 }
296 return buf.String()
297}
298
299// ProgressMap is a map of *Progress.
300type ProgressMap map[uint64]*Progress
301
302// String prints the ProgressMap in sorted key order, one Progress per line.
303func (m ProgressMap) String() string {
304 ids := make([]uint64, 0, len(m))
305 for k := range m {
306 ids = append(ids, k)
307 }
308 slices.Sort(ids)
309 var buf strings.Builder
310 for _, id := range ids {
311 fmt.Fprintf(&buf, "%d: %s\n", id, m[id])
312 }
313 return buf.String()
314}