blob: 9b87cb8046a202f23fd1e9ae549203499193f13e [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "bufio"
5 "fmt"
6 "math/rand"
7 "net"
8 "regexp"
9 "time"
10)
11
12const (
13 defaultRetryBackoff = 100 * time.Millisecond
14 defaultRetryMaxBackoff = 1000 * time.Millisecond
15)
16
17type none struct{}
18
19// make []int32 sortable so we can sort partition numbers
20type int32Slice []int32
21
22func (slice int32Slice) Len() int {
23 return len(slice)
24}
25
26func (slice int32Slice) Less(i, j int) bool {
27 return slice[i] < slice[j]
28}
29
30func (slice int32Slice) Swap(i, j int) {
31 slice[i], slice[j] = slice[j], slice[i]
32}
33
34func dupInt32Slice(input []int32) []int32 {
35 ret := make([]int32, 0, len(input))
36 ret = append(ret, input...)
37 return ret
38}
39
40func withRecover(fn func()) {
41 defer func() {
42 handler := PanicHandler
43 if handler != nil {
44 if err := recover(); err != nil {
45 handler(err)
46 }
47 }
48 }()
49
50 fn()
51}
52
53func safeAsyncClose(b *Broker) {
54 go withRecover(func() {
55 if connected, _ := b.Connected(); connected {
56 if err := b.Close(); err != nil {
57 Logger.Println("Error closing broker", b.ID(), ":", err)
58 }
59 }
60 })
61}
62
63// Encoder is a simple interface for any type that can be encoded as an array of bytes
64// in order to be sent as the key or value of a Kafka message. Length() is provided as an
65// optimization, and must return the same as len() on the result of Encode().
66type Encoder interface {
67 Encode() ([]byte, error)
68 Length() int
69}
70
71// make strings and byte slices encodable for convenience so they can be used as keys
72// and/or values in kafka messages
73
74// StringEncoder implements the Encoder interface for Go strings so that they can be used
75// as the Key or Value in a ProducerMessage.
76type StringEncoder string
77
78func (s StringEncoder) Encode() ([]byte, error) {
79 return []byte(s), nil
80}
81
82func (s StringEncoder) Length() int {
83 return len(s)
84}
85
86// ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
87// as the Key or Value in a ProducerMessage.
88type ByteEncoder []byte
89
90func (b ByteEncoder) Encode() ([]byte, error) {
91 return b, nil
92}
93
94func (b ByteEncoder) Length() int {
95 return len(b)
96}
97
98// bufConn wraps a net.Conn with a buffer for reads to reduce the number of
99// reads that trigger syscalls.
100type bufConn struct {
101 net.Conn
102 buf *bufio.Reader
103}
104
105func newBufConn(conn net.Conn) *bufConn {
106 return &bufConn{
107 Conn: conn,
108 buf: bufio.NewReader(conn),
109 }
110}
111
112func (bc *bufConn) Read(b []byte) (n int, err error) {
113 return bc.buf.Read(b)
114}
115
116// KafkaVersion instances represent versions of the upstream Kafka broker.
117type KafkaVersion struct {
118 // it's a struct rather than just typing the array directly to make it opaque and stop people
119 // generating their own arbitrary versions
120 version [4]uint
121}
122
123func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion {
124 return KafkaVersion{
125 version: [4]uint{major, minor, veryMinor, patch},
126 }
127}
128
129// IsAtLeast return true if and only if the version it is called on is
130// greater than or equal to the version passed in:
131//
132// V1.IsAtLeast(V2) // false
133// V2.IsAtLeast(V1) // true
134func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
135 for i := range v.version {
136 if v.version[i] > other.version[i] {
137 return true
138 } else if v.version[i] < other.version[i] {
139 return false
140 }
141 }
142 return true
143}
144
145// Effective constants defining the supported kafka versions.
146var (
147 V0_8_2_0 = newKafkaVersion(0, 8, 2, 0)
148 V0_8_2_1 = newKafkaVersion(0, 8, 2, 1)
149 V0_8_2_2 = newKafkaVersion(0, 8, 2, 2)
150 V0_9_0_0 = newKafkaVersion(0, 9, 0, 0)
151 V0_9_0_1 = newKafkaVersion(0, 9, 0, 1)
152 V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
153 V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
154 V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
155 V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
156 V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
157 V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
158 V0_10_2_2 = newKafkaVersion(0, 10, 2, 2)
159 V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
160 V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
161 V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
162 V1_0_0_0 = newKafkaVersion(1, 0, 0, 0)
163 V1_0_1_0 = newKafkaVersion(1, 0, 1, 0)
164 V1_0_2_0 = newKafkaVersion(1, 0, 2, 0)
165 V1_1_0_0 = newKafkaVersion(1, 1, 0, 0)
166 V1_1_1_0 = newKafkaVersion(1, 1, 1, 0)
167 V2_0_0_0 = newKafkaVersion(2, 0, 0, 0)
168 V2_0_1_0 = newKafkaVersion(2, 0, 1, 0)
169 V2_1_0_0 = newKafkaVersion(2, 1, 0, 0)
170 V2_1_1_0 = newKafkaVersion(2, 1, 1, 0)
171 V2_2_0_0 = newKafkaVersion(2, 2, 0, 0)
172 V2_2_1_0 = newKafkaVersion(2, 2, 1, 0)
173 V2_2_2_0 = newKafkaVersion(2, 2, 2, 0)
174 V2_3_0_0 = newKafkaVersion(2, 3, 0, 0)
175 V2_3_1_0 = newKafkaVersion(2, 3, 1, 0)
176 V2_4_0_0 = newKafkaVersion(2, 4, 0, 0)
177 V2_4_1_0 = newKafkaVersion(2, 4, 1, 0)
178 V2_5_0_0 = newKafkaVersion(2, 5, 0, 0)
179 V2_5_1_0 = newKafkaVersion(2, 5, 1, 0)
180 V2_6_0_0 = newKafkaVersion(2, 6, 0, 0)
181 V2_6_1_0 = newKafkaVersion(2, 6, 1, 0)
182 V2_6_2_0 = newKafkaVersion(2, 6, 2, 0)
183 V2_6_3_0 = newKafkaVersion(2, 6, 3, 0)
184 V2_7_0_0 = newKafkaVersion(2, 7, 0, 0)
185 V2_7_1_0 = newKafkaVersion(2, 7, 1, 0)
186 V2_7_2_0 = newKafkaVersion(2, 7, 2, 0)
187 V2_8_0_0 = newKafkaVersion(2, 8, 0, 0)
188 V2_8_1_0 = newKafkaVersion(2, 8, 1, 0)
189 V2_8_2_0 = newKafkaVersion(2, 8, 2, 0)
190 V3_0_0_0 = newKafkaVersion(3, 0, 0, 0)
191 V3_0_1_0 = newKafkaVersion(3, 0, 1, 0)
192 V3_0_2_0 = newKafkaVersion(3, 0, 2, 0)
193 V3_1_0_0 = newKafkaVersion(3, 1, 0, 0)
194 V3_1_1_0 = newKafkaVersion(3, 1, 1, 0)
195 V3_1_2_0 = newKafkaVersion(3, 1, 2, 0)
196 V3_2_0_0 = newKafkaVersion(3, 2, 0, 0)
197 V3_2_1_0 = newKafkaVersion(3, 2, 1, 0)
198 V3_2_2_0 = newKafkaVersion(3, 2, 2, 0)
199 V3_2_3_0 = newKafkaVersion(3, 2, 3, 0)
200 V3_3_0_0 = newKafkaVersion(3, 3, 0, 0)
201 V3_3_1_0 = newKafkaVersion(3, 3, 1, 0)
202 V3_3_2_0 = newKafkaVersion(3, 3, 2, 0)
203 V3_4_0_0 = newKafkaVersion(3, 4, 0, 0)
204 V3_4_1_0 = newKafkaVersion(3, 4, 1, 0)
205 V3_5_0_0 = newKafkaVersion(3, 5, 0, 0)
206 V3_5_1_0 = newKafkaVersion(3, 5, 1, 0)
207 V3_5_2_0 = newKafkaVersion(3, 5, 2, 0)
208 V3_6_0_0 = newKafkaVersion(3, 6, 0, 0)
209 V3_6_1_0 = newKafkaVersion(3, 6, 1, 0)
210 V3_6_2_0 = newKafkaVersion(3, 6, 2, 0)
211 V3_7_0_0 = newKafkaVersion(3, 7, 0, 0)
212 V3_7_1_0 = newKafkaVersion(3, 7, 1, 0)
213 V3_7_2_0 = newKafkaVersion(3, 7, 2, 0)
214 V3_8_0_0 = newKafkaVersion(3, 8, 0, 0)
215 V3_8_1_0 = newKafkaVersion(3, 8, 1, 0)
216 V3_9_0_0 = newKafkaVersion(3, 9, 0, 0)
217 V3_9_1_0 = newKafkaVersion(3, 9, 1, 0)
218 V4_0_0_0 = newKafkaVersion(4, 0, 0, 0)
219 V4_1_0_0 = newKafkaVersion(4, 1, 0, 0)
220
221 SupportedVersions = []KafkaVersion{
222 V0_8_2_0,
223 V0_8_2_1,
224 V0_8_2_2,
225 V0_9_0_0,
226 V0_9_0_1,
227 V0_10_0_0,
228 V0_10_0_1,
229 V0_10_1_0,
230 V0_10_1_1,
231 V0_10_2_0,
232 V0_10_2_1,
233 V0_10_2_2,
234 V0_11_0_0,
235 V0_11_0_1,
236 V0_11_0_2,
237 V1_0_0_0,
238 V1_0_1_0,
239 V1_0_2_0,
240 V1_1_0_0,
241 V1_1_1_0,
242 V2_0_0_0,
243 V2_0_1_0,
244 V2_1_0_0,
245 V2_1_1_0,
246 V2_2_0_0,
247 V2_2_1_0,
248 V2_2_2_0,
249 V2_3_0_0,
250 V2_3_1_0,
251 V2_4_0_0,
252 V2_4_1_0,
253 V2_5_0_0,
254 V2_5_1_0,
255 V2_6_0_0,
256 V2_6_1_0,
257 V2_6_2_0,
258 V2_6_3_0,
259 V2_7_0_0,
260 V2_7_1_0,
261 V2_7_2_0,
262 V2_8_0_0,
263 V2_8_1_0,
264 V2_8_2_0,
265 V3_0_0_0,
266 V3_0_1_0,
267 V3_0_2_0,
268 V3_1_0_0,
269 V3_1_1_0,
270 V3_1_2_0,
271 V3_2_0_0,
272 V3_2_1_0,
273 V3_2_2_0,
274 V3_2_3_0,
275 V3_3_0_0,
276 V3_3_1_0,
277 V3_3_2_0,
278 V3_4_0_0,
279 V3_4_1_0,
280 V3_5_0_0,
281 V3_5_1_0,
282 V3_5_2_0,
283 V3_6_0_0,
284 V3_6_1_0,
285 V3_6_2_0,
286 V3_7_0_0,
287 V3_7_1_0,
288 V3_7_2_0,
289 V3_8_0_0,
290 V3_8_1_0,
291 V3_9_0_0,
292 V3_9_1_0,
293 V4_0_0_0,
294 V4_1_0_0,
295 }
296 MinVersion = V0_8_2_0
297 MaxVersion = V4_1_0_0
298 DefaultVersion = V2_1_0_0
299
300 // reduced set of protocol versions to matrix test
301 fvtRangeVersions = []KafkaVersion{
302 V0_8_2_2,
303 V0_10_2_2,
304 V1_0_2_0,
305 V1_1_1_0,
306 V2_0_1_0,
307 V2_2_2_0,
308 V2_4_1_0,
309 V2_6_3_0,
310 V2_8_2_0,
311 V3_1_2_0,
312 V3_3_2_0,
313 V3_6_2_0,
314 }
315)
316
317var (
318 // This regex validates that a string complies with the pre kafka 1.0.0 format for version strings, for example 0.11.0.3
319 validPreKafka1Version = regexp.MustCompile(`^0\.\d+\.\d+\.\d+$`)
320
321 // This regex validates that a string complies with the post Kafka 1.0.0 format, for example 1.0.0
322 validPostKafka1Version = regexp.MustCompile(`^\d+\.\d+\.\d+$`)
323)
324
325// ParseKafkaVersion parses and returns kafka version or error from a string
326func ParseKafkaVersion(s string) (KafkaVersion, error) {
327 if len(s) < 5 {
328 return DefaultVersion, fmt.Errorf("invalid version `%s`", s)
329 }
330 var major, minor, veryMinor, patch uint
331 var err error
332 if s[0] == '0' {
333 err = scanKafkaVersion(s, validPreKafka1Version, "0.%d.%d.%d", [3]*uint{&minor, &veryMinor, &patch})
334 } else {
335 err = scanKafkaVersion(s, validPostKafka1Version, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor})
336 }
337 if err != nil {
338 return DefaultVersion, err
339 }
340 return newKafkaVersion(major, minor, veryMinor, patch), nil
341}
342
343func scanKafkaVersion(s string, pattern *regexp.Regexp, format string, v [3]*uint) error {
344 if !pattern.MatchString(s) {
345 return fmt.Errorf("invalid version `%s`", s)
346 }
347 _, err := fmt.Sscanf(s, format, v[0], v[1], v[2])
348 return err
349}
350
351func (v KafkaVersion) String() string {
352 if v.version[0] == 0 {
353 return fmt.Sprintf("0.%d.%d.%d", v.version[1], v.version[2], v.version[3])
354 }
355
356 return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2])
357}
358
359// NewExponentialBackoff returns a function that implements an exponential backoff strategy with jitter.
360// It follows KIP-580, implementing the formula:
361// MIN(retry.backoff.max.ms, (retry.backoff.ms * 2**(failures - 1)) * random(0.8, 1.2))
362// This ensures retries start with `backoff` and exponentially increase until `maxBackoff`, with added jitter.
363// The behavior when `failures = 0` is not explicitly defined in KIP-580 and is left to implementation discretion.
364//
365// Example usage:
366//
367// backoffFunc := sarama.NewExponentialBackoff(config.Producer.Retry.Backoff, 2*time.Second)
368// config.Producer.Retry.BackoffFunc = backoffFunc
369func NewExponentialBackoff(backoff time.Duration, maxBackoff time.Duration) func(retries, maxRetries int) time.Duration {
370 if backoff <= 0 {
371 backoff = defaultRetryBackoff
372 }
373 if maxBackoff <= 0 {
374 maxBackoff = defaultRetryMaxBackoff
375 }
376
377 if backoff > maxBackoff {
378 Logger.Println("Warning: backoff is greater than maxBackoff, using maxBackoff instead.")
379 backoff = maxBackoff
380 }
381
382 return func(retries, maxRetries int) time.Duration {
383 if retries <= 0 {
384 return backoff
385 }
386
387 calculatedBackoff := backoff * time.Duration(1<<(retries-1))
388 jitter := 0.8 + 0.4*rand.Float64()
389 calculatedBackoff = time.Duration(float64(calculatedBackoff) * jitter)
390
391 return min(calculatedBackoff, maxBackoff)
392 }
393}