blob: 0a099832944ee358c807a6b4643c9d9a6124dd2b [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "bytes"
5 "fmt"
6 "sync"
7
8 snappy "github.com/eapache/go-xerial-snappy"
9 "github.com/klauspost/compress/gzip"
10 "github.com/pierrec/lz4/v4"
11)
12
13var (
14 lz4ReaderPool = sync.Pool{
15 New: func() interface{} {
16 return lz4.NewReader(nil)
17 },
18 }
19
20 gzipReaderPool sync.Pool
21
22 bufferPool = sync.Pool{
23 New: func() interface{} {
24 return new(bytes.Buffer)
25 },
26 }
27
28 bytesPool = sync.Pool{
29 New: func() interface{} {
30 res := make([]byte, 0, 4096)
31 return &res
32 },
33 }
34)
35
36func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
37 switch cc {
38 case CompressionNone:
39 return data, nil
40 case CompressionGZIP:
41 var err error
42 reader, ok := gzipReaderPool.Get().(*gzip.Reader)
43 if !ok {
44 reader, err = gzip.NewReader(bytes.NewReader(data))
45 } else {
46 err = reader.Reset(bytes.NewReader(data))
47 }
48
49 if err != nil {
50 return nil, err
51 }
52
53 buffer := bufferPool.Get().(*bytes.Buffer)
54 _, err = buffer.ReadFrom(reader)
55 // copy the buffer to a new slice with the correct length
56 // reuse gzipReader and buffer
57 gzipReaderPool.Put(reader)
58 res := make([]byte, buffer.Len())
59 copy(res, buffer.Bytes())
60 buffer.Reset()
61 bufferPool.Put(buffer)
62
63 return res, err
64 case CompressionSnappy:
65 return snappy.Decode(data)
66 case CompressionLZ4:
67 reader, ok := lz4ReaderPool.Get().(*lz4.Reader)
68 if !ok {
69 reader = lz4.NewReader(bytes.NewReader(data))
70 } else {
71 reader.Reset(bytes.NewReader(data))
72 }
73 buffer := bufferPool.Get().(*bytes.Buffer)
74 _, err := buffer.ReadFrom(reader)
75 // copy the buffer to a new slice with the correct length
76 // reuse lz4Reader and buffer
77 lz4ReaderPool.Put(reader)
78 res := make([]byte, buffer.Len())
79 copy(res, buffer.Bytes())
80 buffer.Reset()
81 bufferPool.Put(buffer)
82
83 return res, err
84 case CompressionZSTD:
85 buffer := *bytesPool.Get().(*[]byte)
86 var err error
87 buffer, err = zstdDecompress(ZstdDecoderParams{}, buffer, data)
88 // copy the buffer to a new slice with the correct length and reuse buffer
89 res := make([]byte, len(buffer))
90 copy(res, buffer)
91 buffer = buffer[:0]
92 bytesPool.Put(&buffer)
93
94 return res, err
95 default:
96 return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)}
97 }
98}