blob: fd4b5dac3371378d1052a8c874569b6c733d0a60 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "expvar"
19 "fmt"
20 "log"
21 "sync"
22 "time"
23
24 "go.uber.org/zap"
25
26 "go.etcd.io/etcd/client/pkg/v3/logutil"
27 "go.etcd.io/etcd/pkg/v3/contention"
28 "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
29 serverstorage "go.etcd.io/etcd/server/v3/storage"
30 "go.etcd.io/raft/v3"
31 "go.etcd.io/raft/v3/raftpb"
32)
33
34const (
35 // The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
36 // Assuming the RTT is around 10ms, 1MB max size is large enough.
37 maxSizePerMsg = 1 * 1024 * 1024
38 // Never overflow the rafthttp buffer, which is 4096.
39 // TODO: a better const?
40 maxInflightMsgs = 4096 / 8
41)
42
43var (
44 // protects raftStatus
45 raftStatusMu sync.Mutex
46 // indirection for expvar func interface
47 // expvar panics when publishing duplicate name
48 // expvar does not support remove a registered name
49 // so only register a func that calls raftStatus
50 // and change raftStatus as we need.
51 raftStatus func() raft.Status
52)
53
54func init() {
55 expvar.Publish("raft.status", expvar.Func(func() any {
56 raftStatusMu.Lock()
57 defer raftStatusMu.Unlock()
58 if raftStatus == nil {
59 return nil
60 }
61 return raftStatus()
62 }))
63}
64
65// toApply contains entries, snapshot to be applied. Once
66// an toApply is consumed, the entries will be persisted to
67// raft storage concurrently; the application must read
68// notifyc before assuming the raft messages are stable.
69type toApply struct {
70 entries []raftpb.Entry
71 snapshot raftpb.Snapshot
72 // notifyc synchronizes etcd server applies with the raft node
73 notifyc chan struct{}
74 // raftAdvancedC notifies EtcdServer.apply that
75 // 'raftLog.applied' has advanced by r.Advance
76 // it should be used only when entries contain raftpb.EntryConfChange
77 raftAdvancedC <-chan struct{}
78}
79
80type raftNode struct {
81 lg *zap.Logger
82
83 tickMu *sync.RWMutex
84 // timestamp of the latest tick
85 latestTickTs time.Time
86 raftNodeConfig
87
88 // a chan to send/receive snapshot
89 msgSnapC chan raftpb.Message
90
91 // a chan to send out apply
92 applyc chan toApply
93
94 // a chan to send out readState
95 readStateC chan raft.ReadState
96
97 // utility
98 ticker *time.Ticker
99 // contention detectors for raft heartbeat message
100 td *contention.TimeoutDetector
101
102 stopped chan struct{}
103 done chan struct{}
104}
105
106type raftNodeConfig struct {
107 lg *zap.Logger
108
109 // to check if msg receiver is removed from cluster
110 isIDRemoved func(id uint64) bool
111 raft.Node
112 raftStorage *raft.MemoryStorage
113 storage serverstorage.Storage
114 heartbeat time.Duration // for logging
115 // transport specifies the transport to send and receive msgs to members.
116 // Sending messages MUST NOT block. It is okay to drop messages, since
117 // clients should timeout and reissue their messages.
118 // If transport is nil, server will panic.
119 transport rafthttp.Transporter
120}
121
122func newRaftNode(cfg raftNodeConfig) *raftNode {
123 var lg raft.Logger
124 if cfg.lg != nil {
125 lg = NewRaftLoggerZap(cfg.lg)
126 } else {
127 lcfg := logutil.DefaultZapLoggerConfig
128 var err error
129 lg, err = NewRaftLogger(&lcfg)
130 if err != nil {
131 log.Fatalf("cannot create raft logger %v", err)
132 }
133 }
134 raft.SetLogger(lg)
135 r := &raftNode{
136 lg: cfg.lg,
137 tickMu: new(sync.RWMutex),
138 raftNodeConfig: cfg,
139 latestTickTs: time.Now(),
140 // set up contention detectors for raft heartbeat message.
141 // expect to send a heartbeat within 2 heartbeat intervals.
142 td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
143 readStateC: make(chan raft.ReadState, 1),
144 msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
145 applyc: make(chan toApply),
146 stopped: make(chan struct{}),
147 done: make(chan struct{}),
148 }
149 if r.heartbeat == 0 {
150 r.ticker = &time.Ticker{}
151 } else {
152 r.ticker = time.NewTicker(r.heartbeat)
153 }
154 return r
155}
156
157// raft.Node does not have locks in Raft package
158func (r *raftNode) tick() {
159 r.tickMu.Lock()
160 r.Tick()
161 r.latestTickTs = time.Now()
162 r.tickMu.Unlock()
163}
164
165func (r *raftNode) getLatestTickTs() time.Time {
166 r.tickMu.RLock()
167 defer r.tickMu.RUnlock()
168 return r.latestTickTs
169}
170
171// start prepares and starts raftNode in a new goroutine. It is no longer safe
172// to modify the fields after it has been started.
173func (r *raftNode) start(rh *raftReadyHandler) {
174 internalTimeout := time.Second
175
176 go func() {
177 defer r.onStop()
178 islead := false
179
180 for {
181 select {
182 case <-r.ticker.C:
183 r.tick()
184 case rd := <-r.Ready():
185 if rd.SoftState != nil {
186 newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
187 if newLeader {
188 leaderChanges.Inc()
189 }
190
191 if rd.SoftState.Lead == raft.None {
192 hasLeader.Set(0)
193 } else {
194 hasLeader.Set(1)
195 }
196
197 rh.updateLead(rd.SoftState.Lead)
198 islead = rd.RaftState == raft.StateLeader
199 if islead {
200 isLeader.Set(1)
201 } else {
202 isLeader.Set(0)
203 }
204 rh.updateLeadership(newLeader)
205 r.td.Reset()
206 }
207
208 if len(rd.ReadStates) != 0 {
209 select {
210 case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
211 case <-time.After(internalTimeout):
212 r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
213 case <-r.stopped:
214 return
215 }
216 }
217
218 notifyc := make(chan struct{}, 1)
219 raftAdvancedC := make(chan struct{}, 1)
220 ap := toApply{
221 entries: rd.CommittedEntries,
222 snapshot: rd.Snapshot,
223 notifyc: notifyc,
224 raftAdvancedC: raftAdvancedC,
225 }
226
227 updateCommittedIndex(&ap, rh)
228
229 select {
230 case r.applyc <- ap:
231 case <-r.stopped:
232 return
233 }
234
235 // the leader can write to its disk in parallel with replicating to the followers and then
236 // writing to their disks.
237 // For more details, check raft thesis 10.2.1
238 if islead {
239 // gofail: var raftBeforeLeaderSend struct{}
240 r.transport.Send(r.processMessages(rd.Messages))
241 }
242
243 // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
244 // ensure that recovery after a snapshot restore is possible.
245 if !raft.IsEmptySnap(rd.Snapshot) {
246 // gofail: var raftBeforeSaveSnap struct{}
247 if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
248 r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
249 }
250 // gofail: var raftAfterSaveSnap struct{}
251 }
252
253 // gofail: var raftBeforeSave struct{}
254 if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
255 r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
256 }
257 if !raft.IsEmptyHardState(rd.HardState) {
258 proposalsCommitted.Set(float64(rd.HardState.Commit))
259 }
260 // gofail: var raftAfterSave struct{}
261
262 if !raft.IsEmptySnap(rd.Snapshot) {
263 // Force WAL to fsync its hard state before Release() releases
264 // old data from the WAL. Otherwise could get an error like:
265 // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
266 // See https://github.com/etcd-io/etcd/issues/10219 for more details.
267 if err := r.storage.Sync(); err != nil {
268 r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
269 }
270
271 // etcdserver now claim the snapshot has been persisted onto the disk
272 notifyc <- struct{}{}
273
274 // gofail: var raftBeforeApplySnap struct{}
275 r.raftStorage.ApplySnapshot(rd.Snapshot)
276 r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
277 // gofail: var raftAfterApplySnap struct{}
278
279 if err := r.storage.Release(rd.Snapshot); err != nil {
280 r.lg.Fatal("failed to release Raft wal", zap.Error(err))
281 }
282 // gofail: var raftAfterWALRelease struct{}
283 }
284
285 r.raftStorage.Append(rd.Entries)
286
287 confChanged := false
288 for _, ent := range rd.CommittedEntries {
289 if ent.Type == raftpb.EntryConfChange {
290 confChanged = true
291 break
292 }
293 }
294
295 if !islead {
296 // finish processing incoming messages before we signal notifyc chan
297 msgs := r.processMessages(rd.Messages)
298
299 // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
300 notifyc <- struct{}{}
301
302 // Candidate or follower needs to wait for all pending configuration
303 // changes to be applied before sending messages.
304 // Otherwise we might incorrectly count votes (e.g. votes from removed members).
305 // Also slow machine's follower raft-layer could proceed to become the leader
306 // on its own single-node cluster, before toApply-layer applies the config change.
307 // We simply wait for ALL pending entries to be applied for now.
308 // We might improve this later on if it causes unnecessary long blocking issues.
309
310 if confChanged {
311 // blocks until 'applyAll' calls 'applyWait.Trigger'
312 // to be in sync with scheduled config-change job
313 // (assume notifyc has cap of 1)
314 select {
315 case notifyc <- struct{}{}:
316 case <-r.stopped:
317 return
318 }
319 }
320
321 // gofail: var raftBeforeFollowerSend struct{}
322 r.transport.Send(msgs)
323 } else {
324 // leader already processed 'MsgSnap' and signaled
325 notifyc <- struct{}{}
326 }
327
328 // gofail: var raftBeforeAdvance struct{}
329 r.Advance()
330
331 if confChanged {
332 // notify etcdserver that raft has already been notified or advanced.
333 raftAdvancedC <- struct{}{}
334 }
335 case <-r.stopped:
336 return
337 }
338 }
339 }()
340}
341
342func updateCommittedIndex(ap *toApply, rh *raftReadyHandler) {
343 var ci uint64
344 if len(ap.entries) != 0 {
345 ci = ap.entries[len(ap.entries)-1].Index
346 }
347 if ap.snapshot.Metadata.Index > ci {
348 ci = ap.snapshot.Metadata.Index
349 }
350 if ci != 0 {
351 rh.updateCommittedIndex(ci)
352 }
353}
354
355func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
356 sentAppResp := false
357 for i := len(ms) - 1; i >= 0; i-- {
358 if r.isIDRemoved(ms[i].To) {
359 ms[i].To = 0
360 continue
361 }
362
363 if ms[i].Type == raftpb.MsgAppResp {
364 if sentAppResp {
365 ms[i].To = 0
366 } else {
367 sentAppResp = true
368 }
369 }
370
371 if ms[i].Type == raftpb.MsgSnap {
372 // There are two separate data store: the store for v2, and the KV for v3.
373 // The msgSnap only contains the most recent snapshot of store without KV.
374 // So we need to redirect the msgSnap to etcd server main loop for merging in the
375 // current store snapshot and KV snapshot.
376 select {
377 case r.msgSnapC <- ms[i]:
378 default:
379 // drop msgSnap if the inflight chan if full.
380 }
381 ms[i].To = 0
382 }
383 if ms[i].Type == raftpb.MsgHeartbeat {
384 ok, exceed := r.td.Observe(ms[i].To)
385 if !ok {
386 // TODO: limit request rate.
387 r.lg.Warn(
388 "leader failed to send out heartbeat on time; took too long, leader is overloaded likely from slow disk",
389 zap.String("to", fmt.Sprintf("%x", ms[i].To)),
390 zap.Duration("heartbeat-interval", r.heartbeat),
391 zap.Duration("expected-duration", 2*r.heartbeat),
392 zap.Duration("exceeded-duration", exceed),
393 )
394 heartbeatSendFailures.Inc()
395 }
396 }
397 }
398 return ms
399}
400
401func (r *raftNode) apply() chan toApply {
402 return r.applyc
403}
404
405func (r *raftNode) stop() {
406 select {
407 case r.stopped <- struct{}{}:
408 // Not already stopped, so trigger it
409 case <-r.done:
410 // Has already been stopped - no need to do anything
411 return
412 }
413 // Block until the stop has been acknowledged by start()
414 <-r.done
415}
416
417func (r *raftNode) onStop() {
418 r.Stop()
419 r.ticker.Stop()
420 r.transport.Stop()
421 if err := r.storage.Close(); err != nil {
422 r.lg.Panic("failed to close Raft storage", zap.Error(err))
423 }
424 close(r.done)
425}
426
427// for testing
428func (r *raftNode) pauseSending() {
429 p := r.transport.(rafthttp.Pausable)
430 p.Pause()
431}
432
433func (r *raftNode) resumeSending() {
434 p := r.transport.(rafthttp.Pausable)
435 p.Resume()
436}
437
438// advanceTicks advances ticks of Raft node.
439// This can be used for fast-forwarding election
440// ticks in multi data-center deployments, thus
441// speeding up election process.
442func (r *raftNode) advanceTicks(ticks int) {
443 for i := 0; i < ticks; i++ {
444 r.tick()
445 }
446}