blob: ddeb5435d0213a75dcc41b8017f0371e51fa6860 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "encoding/binary"
5 "math"
6 "time"
7
8 "github.com/rcrowley/go-metrics"
9)
10
11var (
12 errInvalidArrayLength = PacketDecodingError{"invalid array length"}
13 errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"}
14 errInvalidStringLength = PacketDecodingError{"invalid string length"}
15 errVarintOverflow = PacketDecodingError{"varint overflow"}
16 errUVarintOverflow = PacketDecodingError{"uvarint overflow"}
17 errInvalidBool = PacketDecodingError{"invalid bool"}
18)
19
20type realDecoder struct {
21 raw []byte
22 off int
23 stack []pushDecoder
24 registry metrics.Registry
25}
26
27type realFlexibleDecoder struct {
28 *realDecoder
29}
30
31// primitives
32
33func (rd *realDecoder) getInt8() (int8, error) {
34 if rd.remaining() < 1 {
35 rd.off = len(rd.raw)
36 return -1, ErrInsufficientData
37 }
38 tmp := int8(rd.raw[rd.off])
39 rd.off++
40 return tmp, nil
41}
42
43func (rd *realDecoder) getInt16() (int16, error) {
44 if rd.remaining() < 2 {
45 rd.off = len(rd.raw)
46 return -1, ErrInsufficientData
47 }
48 tmp := int16(binary.BigEndian.Uint16(rd.raw[rd.off:]))
49 rd.off += 2
50 return tmp, nil
51}
52
53func (rd *realDecoder) getInt32() (int32, error) {
54 if rd.remaining() < 4 {
55 rd.off = len(rd.raw)
56 return -1, ErrInsufficientData
57 }
58 tmp := int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
59 rd.off += 4
60 return tmp, nil
61}
62
63func (rd *realDecoder) getInt64() (int64, error) {
64 if rd.remaining() < 8 {
65 rd.off = len(rd.raw)
66 return -1, ErrInsufficientData
67 }
68 tmp := int64(binary.BigEndian.Uint64(rd.raw[rd.off:]))
69 rd.off += 8
70 return tmp, nil
71}
72
73func (rd *realDecoder) getVarint() (int64, error) {
74 tmp, n := binary.Varint(rd.raw[rd.off:])
75 if n == 0 {
76 rd.off = len(rd.raw)
77 return -1, ErrInsufficientData
78 }
79 if n < 0 {
80 rd.off -= n
81 return -1, errVarintOverflow
82 }
83 rd.off += n
84 return tmp, nil
85}
86
87func (rd *realDecoder) getUVarint() (uint64, error) {
88 tmp, n := binary.Uvarint(rd.raw[rd.off:])
89 if n == 0 {
90 rd.off = len(rd.raw)
91 return 0, ErrInsufficientData
92 }
93
94 if n < 0 {
95 rd.off -= n
96 return 0, errUVarintOverflow
97 }
98
99 rd.off += n
100 return tmp, nil
101}
102
103func (rd *realDecoder) getFloat64() (float64, error) {
104 if rd.remaining() < 8 {
105 rd.off = len(rd.raw)
106 return -1, ErrInsufficientData
107 }
108 tmp := math.Float64frombits(binary.BigEndian.Uint64(rd.raw[rd.off:]))
109 rd.off += 8
110 return tmp, nil
111}
112
113func (rd *realDecoder) getArrayLength() (int, error) {
114 if rd.remaining() < 4 {
115 rd.off = len(rd.raw)
116 return -1, ErrInsufficientData
117 }
118 // cast to int32 first to get correct signedness for length before then
119 // casting to int for ease of interop
120 tmp := int(int32(binary.BigEndian.Uint32(rd.raw[rd.off:])))
121 rd.off += 4
122 if tmp > rd.remaining() {
123 rd.off = len(rd.raw)
124 return -1, ErrInsufficientData
125 } else if tmp > int(MaxResponseSize) {
126 return -1, errInvalidArrayLength
127 }
128 return tmp, nil
129}
130
131func (rd *realDecoder) getBool() (bool, error) {
132 b, err := rd.getInt8()
133 if err != nil || b == 0 {
134 return false, err
135 }
136 if b != 1 {
137 return false, errInvalidBool
138 }
139 return true, nil
140}
141
142func (rd *realDecoder) getKError() (KError, error) {
143 i, err := rd.getInt16()
144 return KError(i), err
145}
146
147func (rd *realDecoder) getDurationMs() (time.Duration, error) {
148 t, err := rd.getInt32()
149 if err != nil {
150 return time.Duration(0), err
151 }
152 return time.Duration(t) * time.Millisecond, nil
153}
154
155func (rd *realDecoder) getTaggedFieldArray(decoders taggedFieldDecoders) error {
156 return PacketDecodingError{"tagged fields used in non-flexible context"}
157}
158
159func (rd *realDecoder) getEmptyTaggedFieldArray() (int, error) {
160 return 0, nil
161}
162
163// collections
164
165func (rd *realDecoder) getBytes() ([]byte, error) {
166 tmp, err := rd.getInt32()
167 if err != nil {
168 return nil, err
169 }
170 if tmp == -1 {
171 return nil, nil
172 }
173
174 return rd.getRawBytes(int(tmp))
175}
176
177func (rd *realDecoder) getVarintBytes() ([]byte, error) {
178 tmp, err := rd.getVarint()
179 if err != nil {
180 return nil, err
181 }
182 if tmp == -1 {
183 return nil, nil
184 }
185
186 return rd.getRawBytes(int(tmp))
187}
188
189func (rd *realDecoder) getStringLength() (int, error) {
190 length, err := rd.getInt16()
191 if err != nil {
192 return 0, err
193 }
194
195 n := int(length)
196
197 switch {
198 case n < -1:
199 return 0, errInvalidStringLength
200 case n > rd.remaining():
201 rd.off = len(rd.raw)
202 return 0, ErrInsufficientData
203 }
204
205 return n, nil
206}
207
208func (rd *realDecoder) getString() (string, error) {
209 n, err := rd.getStringLength()
210 if err != nil || n == -1 {
211 return "", err
212 }
213
214 tmpStr := string(rd.raw[rd.off : rd.off+n])
215 rd.off += n
216 return tmpStr, nil
217}
218
219func (rd *realDecoder) getNullableString() (*string, error) {
220 n, err := rd.getStringLength()
221 if err != nil || n == -1 {
222 return nil, err
223 }
224
225 tmpStr := string(rd.raw[rd.off : rd.off+n])
226 rd.off += n
227 return &tmpStr, err
228}
229
230func (rd *realDecoder) getInt32Array() ([]int32, error) {
231 n, err := rd.getArrayLength()
232 if err != nil {
233 return nil, err
234 }
235 if n <= 0 {
236 return nil, nil
237 }
238
239 if rd.remaining() < 4*n {
240 rd.off = len(rd.raw)
241 return nil, ErrInsufficientData
242 }
243
244 ret := make([]int32, n)
245 for i := range ret {
246 ret[i] = int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
247 rd.off += 4
248 }
249 return ret, nil
250}
251
252func (rd *realDecoder) getInt64Array() ([]int64, error) {
253 n, err := rd.getArrayLength()
254 if err != nil {
255 return nil, err
256 }
257 if n <= 0 {
258 return nil, nil
259 }
260
261 if rd.remaining() < 8*n {
262 rd.off = len(rd.raw)
263 return nil, ErrInsufficientData
264 }
265
266 ret := make([]int64, n)
267 for i := range ret {
268 ret[i] = int64(binary.BigEndian.Uint64(rd.raw[rd.off:]))
269 rd.off += 8
270 }
271 return ret, nil
272}
273
274func (rd *realDecoder) getStringArray() ([]string, error) {
275 n, err := rd.getArrayLength()
276 if err != nil {
277 return nil, err
278 }
279 if n <= 0 {
280 return nil, nil
281 }
282
283 ret := make([]string, n)
284 for i := range ret {
285 str, err := rd.getString()
286 if err != nil {
287 return nil, err
288 }
289
290 ret[i] = str
291 }
292 return ret, nil
293}
294
295// subsets
296
297func (rd *realDecoder) remaining() int {
298 return len(rd.raw) - rd.off
299}
300
301func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
302 buf, err := rd.getRawBytes(length)
303 if err != nil {
304 return nil, err
305 }
306 return &realDecoder{raw: buf}, nil
307}
308
309func (rd *realDecoder) getRawBytes(length int) ([]byte, error) {
310 if length < 0 {
311 return nil, errInvalidByteSliceLength
312 } else if length > rd.remaining() {
313 rd.off = len(rd.raw)
314 return nil, ErrInsufficientData
315 }
316
317 start := rd.off
318 rd.off += length
319 return rd.raw[start:rd.off], nil
320}
321
322func (rd *realDecoder) peek(offset, length int) (packetDecoder, error) {
323 if rd.remaining() < offset+length {
324 return nil, ErrInsufficientData
325 }
326 off := rd.off + offset
327 return &realDecoder{raw: rd.raw[off : off+length]}, nil
328}
329
330func (rd *realDecoder) peekInt8(offset int) (int8, error) {
331 const byteLen = 1
332 if rd.remaining() < offset+byteLen {
333 return -1, ErrInsufficientData
334 }
335 return int8(rd.raw[rd.off+offset]), nil
336}
337
338// stacks
339
340func (rd *realDecoder) push(in pushDecoder) error {
341 in.saveOffset(rd.off)
342
343 var reserve int
344 if dpd, ok := in.(dynamicPushDecoder); ok {
345 if err := dpd.decode(rd); err != nil {
346 return err
347 }
348 } else {
349 reserve = in.reserveLength()
350 if rd.remaining() < reserve {
351 rd.off = len(rd.raw)
352 return ErrInsufficientData
353 }
354 }
355
356 rd.stack = append(rd.stack, in)
357
358 rd.off += reserve
359
360 return nil
361}
362
363func (rd *realDecoder) pop() error {
364 // this is go's ugly pop pattern (the inverse of append)
365 in := rd.stack[len(rd.stack)-1]
366 rd.stack = rd.stack[:len(rd.stack)-1]
367
368 return in.check(rd.off, rd.raw)
369}
370
371func (rd *realDecoder) metricRegistry() metrics.Registry {
372 return rd.registry
373}
374
375func (rd *realFlexibleDecoder) getArrayLength() (int, error) {
376 n, err := rd.getUVarint()
377 if err != nil {
378 return 0, err
379 }
380
381 if n == 0 {
382 return 0, nil
383 }
384
385 return int(n) - 1, nil
386}
387
388func (rd *realFlexibleDecoder) getEmptyTaggedFieldArray() (int, error) {
389 tagCount, err := rd.getUVarint()
390 if err != nil {
391 return 0, err
392 }
393
394 // skip over any tagged fields without deserializing them
395 // as we don't currently support doing anything with them
396 for i := uint64(0); i < tagCount; i++ {
397 // fetch and ignore tag identifier
398 _, err := rd.getUVarint()
399 if err != nil {
400 return 0, err
401 }
402 length, err := rd.getUVarint()
403 if err != nil {
404 return 0, err
405 }
406 if _, err := rd.getRawBytes(int(length)); err != nil {
407 return 0, err
408 }
409 }
410
411 return 0, nil
412}
413
414func (rd *realFlexibleDecoder) getTaggedFieldArray(decoders taggedFieldDecoders) error {
415 // if we have no decoders just skip over the tagged fields
416 if decoders == nil {
417 _, err := rd.getEmptyTaggedFieldArray()
418 return err
419 }
420
421 tagCount, err := rd.getUVarint()
422 if err != nil {
423 return err
424 }
425
426 for i := uint64(0); i < tagCount; i++ {
427 // fetch and ignore tag identifier
428 id, err := rd.getUVarint()
429 if err != nil {
430 return err
431 }
432 length, err := rd.getUVarint()
433 if err != nil {
434 return err
435 }
436 bytes, err := rd.getRawBytes(int(length))
437 if err != nil {
438 return err
439 }
440 decoder, ok := decoders[id]
441 if !ok {
442 continue
443 }
444 if err := decoder(&realFlexibleDecoder{&realDecoder{raw: bytes}}); err != nil {
445 return err
446 }
447 }
448 return nil
449}
450
451func (rd *realFlexibleDecoder) getBytes() ([]byte, error) {
452 n, err := rd.getUVarint()
453 if err != nil {
454 return nil, err
455 }
456
457 length := int(n - 1)
458 return rd.getRawBytes(length)
459}
460
461func (rd *realFlexibleDecoder) getStringLength() (int, error) {
462 length, err := rd.getUVarint()
463 if err != nil {
464 return 0, err
465 }
466
467 n := int(length - 1)
468
469 switch {
470 case n < -1:
471 return 0, errInvalidStringLength
472 case n > rd.remaining():
473 rd.off = len(rd.raw)
474 return 0, ErrInsufficientData
475 }
476
477 return n, nil
478}
479
480func (rd *realFlexibleDecoder) getString() (string, error) {
481 length, err := rd.getStringLength()
482 if err != nil || length == -1 {
483 return "", err
484 }
485
486 if length < 0 {
487 return "", errInvalidStringLength
488 }
489 tmpStr := string(rd.raw[rd.off : rd.off+length])
490 rd.off += length
491 return tmpStr, nil
492}
493
494func (rd *realFlexibleDecoder) getNullableString() (*string, error) {
495 length, err := rd.getStringLength()
496 if err != nil {
497 return nil, err
498 }
499
500 if length < 0 {
501 return nil, err
502 }
503
504 tmpStr := string(rd.raw[rd.off : rd.off+length])
505 rd.off += length
506 return &tmpStr, err
507}
508
509func (rd *realFlexibleDecoder) getInt32Array() ([]int32, error) {
510 n, err := rd.getUVarint()
511 if err != nil {
512 return nil, err
513 }
514
515 if n == 0 {
516 return nil, nil
517 }
518
519 arrayLength := int(n) - 1
520
521 ret := make([]int32, arrayLength)
522
523 for i := range ret {
524 ret[i] = int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
525 rd.off += 4
526 }
527 return ret, nil
528}
529
530func (rd *realFlexibleDecoder) getStringArray() ([]string, error) {
531 n, err := rd.getArrayLength()
532 if err != nil {
533 return nil, err
534 }
535 if n <= 0 {
536 return nil, nil
537 }
538
539 ret := make([]string, n)
540 for i := range ret {
541 str, err := rd.getString()
542 if err != nil {
543 return nil, err
544 }
545
546 ret[i] = str
547 }
548 return ret, nil
549}