blob: 8f78178829edc1c793a8f2dfbc4eb2b15656f4cd [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18 "bytes"
19 "fmt"
20 "strings"
21
22 pb "go.etcd.io/raft/v3/raftpb"
23)
24
25func (st StateType) MarshalJSON() ([]byte, error) {
26 return []byte(fmt.Sprintf("%q", st.String())), nil
27}
28
29var isLocalMsg = [...]bool{
30 pb.MsgHup: true,
31 pb.MsgBeat: true,
32 pb.MsgUnreachable: true,
33 pb.MsgSnapStatus: true,
34 pb.MsgCheckQuorum: true,
35 pb.MsgStorageAppend: true,
36 pb.MsgStorageAppendResp: true,
37 pb.MsgStorageApply: true,
38 pb.MsgStorageApplyResp: true,
39}
40
41var isResponseMsg = [...]bool{
42 pb.MsgAppResp: true,
43 pb.MsgVoteResp: true,
44 pb.MsgHeartbeatResp: true,
45 pb.MsgUnreachable: true,
46 pb.MsgReadIndexResp: true,
47 pb.MsgPreVoteResp: true,
48 pb.MsgStorageAppendResp: true,
49 pb.MsgStorageApplyResp: true,
50}
51
52func isMsgInArray(msgt pb.MessageType, arr []bool) bool {
53 i := int(msgt)
54 return i < len(arr) && arr[i]
55}
56
57func IsLocalMsg(msgt pb.MessageType) bool {
58 return isMsgInArray(msgt, isLocalMsg[:])
59}
60
61func IsResponseMsg(msgt pb.MessageType) bool {
62 return isMsgInArray(msgt, isResponseMsg[:])
63}
64
65func IsLocalMsgTarget(id uint64) bool {
66 return id == LocalAppendThread || id == LocalApplyThread
67}
68
69// voteResponseType maps vote and prevote message types to their corresponding responses.
70func voteRespMsgType(msgt pb.MessageType) pb.MessageType {
71 switch msgt {
72 case pb.MsgVote:
73 return pb.MsgVoteResp
74 case pb.MsgPreVote:
75 return pb.MsgPreVoteResp
76 default:
77 panic(fmt.Sprintf("not a vote message: %s", msgt))
78 }
79}
80
81func DescribeHardState(hs pb.HardState) string {
82 var buf strings.Builder
83 fmt.Fprintf(&buf, "Term:%d", hs.Term)
84 if hs.Vote != 0 {
85 fmt.Fprintf(&buf, " Vote:%d", hs.Vote)
86 }
87 fmt.Fprintf(&buf, " Commit:%d", hs.Commit)
88 return buf.String()
89}
90
91func DescribeSoftState(ss SoftState) string {
92 return fmt.Sprintf("Lead:%d State:%s", ss.Lead, ss.RaftState)
93}
94
95func DescribeConfState(state pb.ConfState) string {
96 return fmt.Sprintf(
97 "Voters:%v VotersOutgoing:%v Learners:%v LearnersNext:%v AutoLeave:%v",
98 state.Voters, state.VotersOutgoing, state.Learners, state.LearnersNext, state.AutoLeave,
99 )
100}
101
102func DescribeSnapshot(snap pb.Snapshot) string {
103 m := snap.Metadata
104 return fmt.Sprintf("Index:%d Term:%d ConfState:%s", m.Index, m.Term, DescribeConfState(m.ConfState))
105}
106
107func DescribeReady(rd Ready, f EntryFormatter) string {
108 var buf strings.Builder
109 if rd.SoftState != nil {
110 fmt.Fprint(&buf, DescribeSoftState(*rd.SoftState))
111 buf.WriteByte('\n')
112 }
113 if !IsEmptyHardState(rd.HardState) {
114 fmt.Fprintf(&buf, "HardState %s", DescribeHardState(rd.HardState))
115 buf.WriteByte('\n')
116 }
117 if len(rd.ReadStates) > 0 {
118 fmt.Fprintf(&buf, "ReadStates %v\n", rd.ReadStates)
119 }
120 if len(rd.Entries) > 0 {
121 buf.WriteString("Entries:\n")
122 fmt.Fprint(&buf, DescribeEntries(rd.Entries, f))
123 }
124 if !IsEmptySnap(rd.Snapshot) {
125 fmt.Fprintf(&buf, "Snapshot %s\n", DescribeSnapshot(rd.Snapshot))
126 }
127 if len(rd.CommittedEntries) > 0 {
128 buf.WriteString("CommittedEntries:\n")
129 fmt.Fprint(&buf, DescribeEntries(rd.CommittedEntries, f))
130 }
131 if len(rd.Messages) > 0 {
132 buf.WriteString("Messages:\n")
133 for _, msg := range rd.Messages {
134 fmt.Fprint(&buf, DescribeMessage(msg, f))
135 buf.WriteByte('\n')
136 }
137 }
138 if buf.Len() > 0 {
139 return fmt.Sprintf("Ready MustSync=%t:\n%s", rd.MustSync, buf.String())
140 }
141 return "<empty Ready>"
142}
143
144// EntryFormatter can be implemented by the application to provide human-readable formatting
145// of entry data. Nil is a valid EntryFormatter and will use a default format.
146type EntryFormatter func([]byte) string
147
148// DescribeMessage returns a concise human-readable description of a
149// Message for debugging.
150func DescribeMessage(m pb.Message, f EntryFormatter) string {
151 return describeMessageWithIndent("", m, f)
152}
153
154func describeMessageWithIndent(indent string, m pb.Message, f EntryFormatter) string {
155 var buf bytes.Buffer
156 fmt.Fprintf(&buf, "%s%s->%s %v Term:%d Log:%d/%d", indent,
157 describeTarget(m.From), describeTarget(m.To), m.Type, m.Term, m.LogTerm, m.Index)
158 if m.Reject {
159 fmt.Fprintf(&buf, " Rejected (Hint: %d)", m.RejectHint)
160 }
161 if m.Commit != 0 {
162 fmt.Fprintf(&buf, " Commit:%d", m.Commit)
163 }
164 if m.Vote != 0 {
165 fmt.Fprintf(&buf, " Vote:%d", m.Vote)
166 }
167 if ln := len(m.Entries); ln == 1 {
168 fmt.Fprintf(&buf, " Entries:[%s]", DescribeEntry(m.Entries[0], f))
169 } else if ln > 1 {
170 fmt.Fprint(&buf, " Entries:[")
171 for _, e := range m.Entries {
172 fmt.Fprintf(&buf, "\n%s ", indent)
173 buf.WriteString(DescribeEntry(e, f))
174 }
175 fmt.Fprintf(&buf, "\n%s]", indent)
176 }
177 if s := m.Snapshot; s != nil && !IsEmptySnap(*s) {
178 fmt.Fprintf(&buf, "\n%s Snapshot: %s", indent, DescribeSnapshot(*s))
179 }
180 if len(m.Responses) > 0 {
181 fmt.Fprintf(&buf, " Responses:[")
182 for _, m := range m.Responses {
183 buf.WriteString("\n")
184 buf.WriteString(describeMessageWithIndent(indent+" ", m, f))
185 }
186 fmt.Fprintf(&buf, "\n%s]", indent)
187 }
188 return buf.String()
189}
190
191func describeTarget(id uint64) string {
192 switch id {
193 case None:
194 return "None"
195 case LocalAppendThread:
196 return "AppendThread"
197 case LocalApplyThread:
198 return "ApplyThread"
199 default:
200 return fmt.Sprintf("%x", id)
201 }
202}
203
204// DescribeEntry returns a concise human-readable description of an
205// Entry for debugging.
206func DescribeEntry(e pb.Entry, f EntryFormatter) string {
207 if f == nil {
208 f = func(data []byte) string { return fmt.Sprintf("%q", data) }
209 }
210
211 formatConfChange := func(cc pb.ConfChangeI) string {
212 // TODO(tbg): give the EntryFormatter a type argument so that it gets
213 // a chance to expose the Context.
214 return pb.ConfChangesToString(cc.AsV2().Changes)
215 }
216
217 var formatted string
218 switch e.Type {
219 case pb.EntryNormal:
220 formatted = f(e.Data)
221 case pb.EntryConfChange:
222 var cc pb.ConfChange
223 if err := cc.Unmarshal(e.Data); err != nil {
224 formatted = err.Error()
225 } else {
226 formatted = formatConfChange(cc)
227 }
228 case pb.EntryConfChangeV2:
229 var cc pb.ConfChangeV2
230 if err := cc.Unmarshal(e.Data); err != nil {
231 formatted = err.Error()
232 } else {
233 formatted = formatConfChange(cc)
234 }
235 }
236 if formatted != "" {
237 formatted = " " + formatted
238 }
239 return fmt.Sprintf("%d/%d %s%s", e.Term, e.Index, e.Type, formatted)
240}
241
242// DescribeEntries calls DescribeEntry for each Entry, adding a newline to
243// each.
244func DescribeEntries(ents []pb.Entry, f EntryFormatter) string {
245 var buf bytes.Buffer
246 for _, e := range ents {
247 _, _ = buf.WriteString(DescribeEntry(e, f) + "\n")
248 }
249 return buf.String()
250}
251
252// entryEncodingSize represents the protocol buffer encoding size of one or more
253// entries.
254type entryEncodingSize uint64
255
256func entsSize(ents []pb.Entry) entryEncodingSize {
257 var size entryEncodingSize
258 for _, ent := range ents {
259 size += entryEncodingSize(ent.Size())
260 }
261 return size
262}
263
264// limitSize returns the longest prefix of the given entries slice, such that
265// its total byte size does not exceed maxSize. Always returns a non-empty slice
266// if the input is non-empty, so, as an exception, if the size of the first
267// entry exceeds maxSize, a non-empty slice with just this entry is returned.
268func limitSize(ents []pb.Entry, maxSize entryEncodingSize) []pb.Entry {
269 if len(ents) == 0 {
270 return ents
271 }
272 size := ents[0].Size()
273 for limit := 1; limit < len(ents); limit++ {
274 size += ents[limit].Size()
275 if entryEncodingSize(size) > maxSize {
276 return ents[:limit]
277 }
278 }
279 return ents
280}
281
282// entryPayloadSize represents the size of one or more entries' payloads.
283// Notably, it does not depend on its Index or Term. Entries with empty
284// payloads, like those proposed after a leadership change, are considered
285// to be zero size.
286type entryPayloadSize uint64
287
288// payloadSize is the size of the payload of the provided entry.
289func payloadSize(e pb.Entry) entryPayloadSize {
290 return entryPayloadSize(len(e.Data))
291}
292
293// payloadsSize is the size of the payloads of the provided entries.
294func payloadsSize(ents []pb.Entry) entryPayloadSize {
295 var s entryPayloadSize
296 for _, e := range ents {
297 s += payloadSize(e)
298 }
299 return s
300}
301
302func assertConfStatesEquivalent(l Logger, cs1, cs2 pb.ConfState) {
303 err := cs1.Equivalent(cs2)
304 if err == nil {
305 return
306 }
307 l.Panic(err)
308}
309
310// extend appends vals to the given dst slice. It differs from the standard
311// slice append only in the way it allocates memory. If cap(dst) is not enough
312// for appending the values, precisely size len(dst)+len(vals) is allocated.
313//
314// Use this instead of standard append in situations when this is the last
315// append to dst, so there is no sense in allocating more than needed.
316func extend(dst, vals []pb.Entry) []pb.Entry {
317 need := len(dst) + len(vals)
318 if need <= cap(dst) {
319 return append(dst, vals...) // does not allocate
320 }
321 buf := make([]pb.Entry, need, need) // allocates precisely what's needed
322 copy(buf, dst)
323 copy(buf[len(dst):], vals)
324 return buf
325}