| Abhay Kumar | 40252eb | 2025-10-13 13:25:53 +0000 | [diff] [blame^] | 1 | // Copyright 2019 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package tracker |
| 16 | |
| 17 | import ( |
| 18 | "fmt" |
| 19 | "slices" |
| 20 | "strings" |
| 21 | ) |
| 22 | |
| 23 | // Progress represents a follower’s progress in the view of the leader. Leader |
| 24 | // maintains progresses of all followers, and sends entries to the follower |
| 25 | // based on its progress. |
| 26 | // |
| 27 | // NB(tbg): Progress is basically a state machine whose transitions are mostly |
| 28 | // strewn around `*raft.raft`. Additionally, some fields are only used when in a |
| 29 | // certain State. All of this isn't ideal. |
| 30 | type Progress struct { |
| 31 | // Match is the index up to which the follower's log is known to match the |
| 32 | // leader's. |
| 33 | Match uint64 |
| 34 | // Next is the log index of the next entry to send to this follower. All |
| 35 | // entries with indices in (Match, Next) interval are already in flight. |
| 36 | // |
| 37 | // Invariant: 0 <= Match < Next. |
| 38 | // NB: it follows that Next >= 1. |
| 39 | // |
| 40 | // In StateSnapshot, Next == PendingSnapshot + 1. |
| 41 | Next uint64 |
| 42 | |
| 43 | // sentCommit is the highest commit index in flight to the follower. |
| 44 | // |
| 45 | // Generally, it is monotonic, but con regress in some cases, e.g. when |
| 46 | // converting to `StateProbe` or when receiving a rejection from a follower. |
| 47 | // |
| 48 | // In StateSnapshot, sentCommit == PendingSnapshot == Next-1. |
| 49 | sentCommit uint64 |
| 50 | |
| 51 | // State defines how the leader should interact with the follower. |
| 52 | // |
| 53 | // When in StateProbe, leader sends at most one replication message |
| 54 | // per heartbeat interval. It also probes actual progress of the follower. |
| 55 | // |
| 56 | // When in StateReplicate, leader optimistically increases next |
| 57 | // to the latest entry sent after sending replication message. This is |
| 58 | // an optimized state for fast replicating log entries to the follower. |
| 59 | // |
| 60 | // When in StateSnapshot, leader should have sent out snapshot |
| 61 | // before and stops sending any replication message. |
| 62 | State StateType |
| 63 | |
| 64 | // PendingSnapshot is used in StateSnapshot and tracks the last index of the |
| 65 | // leader at the time at which it realized a snapshot was necessary. This |
| 66 | // matches the index in the MsgSnap message emitted from raft. |
| 67 | // |
| 68 | // While there is a pending snapshot, replication to the follower is paused. |
| 69 | // The follower will transition back to StateReplicate if the leader |
| 70 | // receives an MsgAppResp from it that reconnects the follower to the |
| 71 | // leader's log (such an MsgAppResp is emitted when the follower applies a |
| 72 | // snapshot). It may be surprising that PendingSnapshot is not taken into |
| 73 | // account here, but consider that complex systems may delegate the sending |
| 74 | // of snapshots to alternative datasources (i.e. not the leader). In such |
| 75 | // setups, it is difficult to manufacture a snapshot at a particular index |
| 76 | // requested by raft and the actual index may be ahead or behind. This |
| 77 | // should be okay, as long as the snapshot allows replication to resume. |
| 78 | // |
| 79 | // The follower will transition to StateProbe if ReportSnapshot is called on |
| 80 | // the leader; if SnapshotFinish is passed then PendingSnapshot becomes the |
| 81 | // basis for the next attempt to append. In practice, the first mechanism is |
| 82 | // the one that is relevant in most cases. However, if this MsgAppResp is |
| 83 | // lost (fallible network) then the second mechanism ensures that in this |
| 84 | // case the follower does not erroneously remain in StateSnapshot. |
| 85 | PendingSnapshot uint64 |
| 86 | |
| 87 | // RecentActive is true if the progress is recently active. Receiving any messages |
| 88 | // from the corresponding follower indicates the progress is active. |
| 89 | // RecentActive can be reset to false after an election timeout. |
| 90 | // This is always true on the leader. |
| 91 | RecentActive bool |
| 92 | |
| 93 | // MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This |
| 94 | // happens in StateProbe, or StateReplicate with saturated Inflights. In both |
| 95 | // cases, we need to continue sending MsgApp once in a while to guarantee |
| 96 | // progress, but we only do so when MsgAppFlowPaused is false (it is reset on |
| 97 | // receiving a heartbeat response), to not overflow the receiver. See |
| 98 | // IsPaused(). |
| 99 | MsgAppFlowPaused bool |
| 100 | |
| 101 | // Inflights is a sliding window for the inflight messages. |
| 102 | // Each inflight message contains one or more log entries. |
| 103 | // The max number of entries per message is defined in raft config as MaxSizePerMsg. |
| 104 | // Thus inflight effectively limits both the number of inflight messages |
| 105 | // and the bandwidth each Progress can use. |
| 106 | // When inflights is Full, no more message should be sent. |
| 107 | // When a leader sends out a message, the index of the last |
| 108 | // entry should be added to inflights. The index MUST be added |
| 109 | // into inflights in order. |
| 110 | // When a leader receives a reply, the previous inflights should |
| 111 | // be freed by calling inflights.FreeLE with the index of the last |
| 112 | // received entry. |
| 113 | Inflights *Inflights |
| 114 | |
| 115 | // IsLearner is true if this progress is tracked for a learner. |
| 116 | IsLearner bool |
| 117 | } |
| 118 | |
| 119 | // ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused, |
| 120 | // PendingSnapshot, and Inflights. |
| 121 | func (pr *Progress) ResetState(state StateType) { |
| 122 | pr.MsgAppFlowPaused = false |
| 123 | pr.PendingSnapshot = 0 |
| 124 | pr.State = state |
| 125 | pr.Inflights.reset() |
| 126 | } |
| 127 | |
| 128 | // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, |
| 129 | // optionally and if larger, the index of the pending snapshot. |
| 130 | func (pr *Progress) BecomeProbe() { |
| 131 | // If the original state is StateSnapshot, progress knows that |
| 132 | // the pending snapshot has been sent to this peer successfully, then |
| 133 | // probes from pendingSnapshot + 1. |
| 134 | if pr.State == StateSnapshot { |
| 135 | pendingSnapshot := pr.PendingSnapshot |
| 136 | pr.ResetState(StateProbe) |
| 137 | pr.Next = max(pr.Match+1, pendingSnapshot+1) |
| 138 | } else { |
| 139 | pr.ResetState(StateProbe) |
| 140 | pr.Next = pr.Match + 1 |
| 141 | } |
| 142 | pr.sentCommit = min(pr.sentCommit, pr.Next-1) |
| 143 | } |
| 144 | |
| 145 | // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1. |
| 146 | func (pr *Progress) BecomeReplicate() { |
| 147 | pr.ResetState(StateReplicate) |
| 148 | pr.Next = pr.Match + 1 |
| 149 | } |
| 150 | |
| 151 | // BecomeSnapshot moves the Progress to StateSnapshot with the specified pending |
| 152 | // snapshot index. |
| 153 | func (pr *Progress) BecomeSnapshot(snapshoti uint64) { |
| 154 | pr.ResetState(StateSnapshot) |
| 155 | pr.PendingSnapshot = snapshoti |
| 156 | pr.Next = snapshoti + 1 |
| 157 | pr.sentCommit = snapshoti |
| 158 | } |
| 159 | |
| 160 | // SentEntries updates the progress on the given number of consecutive entries |
| 161 | // being sent in a MsgApp, with the given total bytes size, appended at log |
| 162 | // indices >= pr.Next. |
| 163 | // |
| 164 | // Must be used with StateProbe or StateReplicate. |
| 165 | func (pr *Progress) SentEntries(entries int, bytes uint64) { |
| 166 | switch pr.State { |
| 167 | case StateReplicate: |
| 168 | if entries > 0 { |
| 169 | pr.Next += uint64(entries) |
| 170 | pr.Inflights.Add(pr.Next-1, bytes) |
| 171 | } |
| 172 | // If this message overflows the in-flights tracker, or it was already full, |
| 173 | // consider this message being a probe, so that the flow is paused. |
| 174 | pr.MsgAppFlowPaused = pr.Inflights.Full() |
| 175 | case StateProbe: |
| 176 | // TODO(pavelkalinnikov): this condition captures the previous behaviour, |
| 177 | // but we should set MsgAppFlowPaused unconditionally for simplicity, because any |
| 178 | // MsgApp in StateProbe is a probe, not only non-empty ones. |
| 179 | if entries > 0 { |
| 180 | pr.MsgAppFlowPaused = true |
| 181 | } |
| 182 | default: |
| 183 | panic(fmt.Sprintf("sending append in unhandled state %s", pr.State)) |
| 184 | } |
| 185 | } |
| 186 | |
| 187 | // CanBumpCommit returns true if sending the given commit index can potentially |
| 188 | // advance the follower's commit index. |
| 189 | func (pr *Progress) CanBumpCommit(index uint64) bool { |
| 190 | // Sending the given commit index may bump the follower's commit index up to |
| 191 | // Next-1 in normal operation, or higher in some rare cases. Allow sending a |
| 192 | // commit index eagerly only if we haven't already sent one that bumps the |
| 193 | // follower's commit all the way to Next-1. |
| 194 | return index > pr.sentCommit && pr.sentCommit < pr.Next-1 |
| 195 | } |
| 196 | |
| 197 | // SentCommit updates the sentCommit. |
| 198 | func (pr *Progress) SentCommit(commit uint64) { |
| 199 | pr.sentCommit = commit |
| 200 | } |
| 201 | |
| 202 | // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the |
| 203 | // index acked by it. The method returns false if the given n index comes from |
| 204 | // an outdated message. Otherwise it updates the progress and returns true. |
| 205 | func (pr *Progress) MaybeUpdate(n uint64) bool { |
| 206 | if n <= pr.Match { |
| 207 | return false |
| 208 | } |
| 209 | pr.Match = n |
| 210 | pr.Next = max(pr.Next, n+1) // invariant: Match < Next |
| 211 | pr.MsgAppFlowPaused = false |
| 212 | return true |
| 213 | } |
| 214 | |
| 215 | // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The |
| 216 | // arguments are the index of the append message rejected by the follower, and |
| 217 | // the hint that we want to decrease to. |
| 218 | // |
| 219 | // Rejections can happen spuriously as messages are sent out of order or |
| 220 | // duplicated. In such cases, the rejection pertains to an index that the |
| 221 | // Progress already knows were previously acknowledged, and false is returned |
| 222 | // without changing the Progress. |
| 223 | // |
| 224 | // If the rejection is genuine, Next is lowered sensibly, and the Progress is |
| 225 | // cleared for sending log entries. |
| 226 | func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { |
| 227 | if pr.State == StateReplicate { |
| 228 | // The rejection must be stale if the progress has matched and "rejected" |
| 229 | // is smaller than "match". |
| 230 | if rejected <= pr.Match { |
| 231 | return false |
| 232 | } |
| 233 | // Directly decrease next to match + 1. |
| 234 | // |
| 235 | // TODO(tbg): why not use matchHint if it's larger? |
| 236 | pr.Next = pr.Match + 1 |
| 237 | // Regress the sentCommit since it unlikely has been applied. |
| 238 | pr.sentCommit = min(pr.sentCommit, pr.Next-1) |
| 239 | return true |
| 240 | } |
| 241 | |
| 242 | // The rejection must be stale if "rejected" does not match next - 1. This |
| 243 | // is because non-replicating followers are probed one entry at a time. |
| 244 | // The check is a best effort assuming message reordering is rare. |
| 245 | if pr.Next-1 != rejected { |
| 246 | return false |
| 247 | } |
| 248 | |
| 249 | pr.Next = max(min(rejected, matchHint+1), pr.Match+1) |
| 250 | // Regress the sentCommit since it unlikely has been applied. |
| 251 | pr.sentCommit = min(pr.sentCommit, pr.Next-1) |
| 252 | pr.MsgAppFlowPaused = false |
| 253 | return true |
| 254 | } |
| 255 | |
| 256 | // IsPaused returns whether sending log entries to this node has been throttled. |
| 257 | // This is done when a node has rejected recent MsgApps, is currently waiting |
| 258 | // for a snapshot, or has reached the MaxInflightMsgs limit. In normal |
| 259 | // operation, this is false. A throttled node will be contacted less frequently |
| 260 | // until it has reached a state in which it's able to accept a steady stream of |
| 261 | // log entries again. |
| 262 | func (pr *Progress) IsPaused() bool { |
| 263 | switch pr.State { |
| 264 | case StateProbe: |
| 265 | return pr.MsgAppFlowPaused |
| 266 | case StateReplicate: |
| 267 | return pr.MsgAppFlowPaused |
| 268 | case StateSnapshot: |
| 269 | return true |
| 270 | default: |
| 271 | panic("unexpected state") |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | func (pr *Progress) String() string { |
| 276 | var buf strings.Builder |
| 277 | fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next) |
| 278 | if pr.IsLearner { |
| 279 | fmt.Fprint(&buf, " learner") |
| 280 | } |
| 281 | if pr.IsPaused() { |
| 282 | fmt.Fprint(&buf, " paused") |
| 283 | } |
| 284 | if pr.PendingSnapshot > 0 { |
| 285 | fmt.Fprintf(&buf, " pendingSnap=%d", pr.PendingSnapshot) |
| 286 | } |
| 287 | if !pr.RecentActive { |
| 288 | fmt.Fprint(&buf, " inactive") |
| 289 | } |
| 290 | if n := pr.Inflights.Count(); n > 0 { |
| 291 | fmt.Fprintf(&buf, " inflight=%d", n) |
| 292 | if pr.Inflights.Full() { |
| 293 | fmt.Fprint(&buf, "[full]") |
| 294 | } |
| 295 | } |
| 296 | return buf.String() |
| 297 | } |
| 298 | |
| 299 | // ProgressMap is a map of *Progress. |
| 300 | type ProgressMap map[uint64]*Progress |
| 301 | |
| 302 | // String prints the ProgressMap in sorted key order, one Progress per line. |
| 303 | func (m ProgressMap) String() string { |
| 304 | ids := make([]uint64, 0, len(m)) |
| 305 | for k := range m { |
| 306 | ids = append(ids, k) |
| 307 | } |
| 308 | slices.Sort(ids) |
| 309 | var buf strings.Builder |
| 310 | for _, id := range ids { |
| 311 | fmt.Fprintf(&buf, "%d: %s\n", id, m[id]) |
| 312 | } |
| 313 | return buf.String() |
| 314 | } |