blob: b752cb8a05499a3f23a7299d5885794e5b547a20 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "bytes"
5 "fmt"
6 "sync"
7
8 snappy "github.com/eapache/go-xerial-snappy"
9 "github.com/klauspost/compress/gzip"
10 "github.com/pierrec/lz4/v4"
11)
12
13var (
14 lz4WriterPool = sync.Pool{
15 New: func() interface{} {
16 lz := lz4.NewWriter(nil)
17 if err := lz.Apply(lz4.BlockSizeOption(lz4.Block64Kb)); err != nil {
18 panic(err)
19 }
20 return lz
21 },
22 }
23
24 gzipWriterPool = sync.Pool{
25 New: func() interface{} {
26 return gzip.NewWriter(nil)
27 },
28 }
29 gzipWriterPoolForCompressionLevel1 = sync.Pool{
30 New: func() interface{} {
31 gz, err := gzip.NewWriterLevel(nil, 1)
32 if err != nil {
33 panic(err)
34 }
35 return gz
36 },
37 }
38 gzipWriterPoolForCompressionLevel2 = sync.Pool{
39 New: func() interface{} {
40 gz, err := gzip.NewWriterLevel(nil, 2)
41 if err != nil {
42 panic(err)
43 }
44 return gz
45 },
46 }
47 gzipWriterPoolForCompressionLevel3 = sync.Pool{
48 New: func() interface{} {
49 gz, err := gzip.NewWriterLevel(nil, 3)
50 if err != nil {
51 panic(err)
52 }
53 return gz
54 },
55 }
56 gzipWriterPoolForCompressionLevel4 = sync.Pool{
57 New: func() interface{} {
58 gz, err := gzip.NewWriterLevel(nil, 4)
59 if err != nil {
60 panic(err)
61 }
62 return gz
63 },
64 }
65 gzipWriterPoolForCompressionLevel5 = sync.Pool{
66 New: func() interface{} {
67 gz, err := gzip.NewWriterLevel(nil, 5)
68 if err != nil {
69 panic(err)
70 }
71 return gz
72 },
73 }
74 gzipWriterPoolForCompressionLevel6 = sync.Pool{
75 New: func() interface{} {
76 gz, err := gzip.NewWriterLevel(nil, 6)
77 if err != nil {
78 panic(err)
79 }
80 return gz
81 },
82 }
83 gzipWriterPoolForCompressionLevel7 = sync.Pool{
84 New: func() interface{} {
85 gz, err := gzip.NewWriterLevel(nil, 7)
86 if err != nil {
87 panic(err)
88 }
89 return gz
90 },
91 }
92 gzipWriterPoolForCompressionLevel8 = sync.Pool{
93 New: func() interface{} {
94 gz, err := gzip.NewWriterLevel(nil, 8)
95 if err != nil {
96 panic(err)
97 }
98 return gz
99 },
100 }
101 gzipWriterPoolForCompressionLevel9 = sync.Pool{
102 New: func() interface{} {
103 gz, err := gzip.NewWriterLevel(nil, 9)
104 if err != nil {
105 panic(err)
106 }
107 return gz
108 },
109 }
110)
111
112func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
113 switch cc {
114 case CompressionNone:
115 return data, nil
116 case CompressionGZIP:
117 return gzipCompress(level, data)
118 case CompressionSnappy:
119 return snappy.Encode(data), nil
120 case CompressionLZ4:
121 return lz4Compress(data)
122 case CompressionZSTD:
123 return zstdCompress(ZstdEncoderParams{level}, nil, data)
124 default:
125 return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}
126 }
127}
128
129func gzipCompress(level int, data []byte) ([]byte, error) {
130 var (
131 buf bytes.Buffer
132 writer *gzip.Writer
133 pool *sync.Pool
134 )
135
136 switch level {
137 case CompressionLevelDefault:
138 pool = &gzipWriterPool
139 case 1:
140 pool = &gzipWriterPoolForCompressionLevel1
141 case 2:
142 pool = &gzipWriterPoolForCompressionLevel2
143 case 3:
144 pool = &gzipWriterPoolForCompressionLevel3
145 case 4:
146 pool = &gzipWriterPoolForCompressionLevel4
147 case 5:
148 pool = &gzipWriterPoolForCompressionLevel5
149 case 6:
150 pool = &gzipWriterPoolForCompressionLevel6
151 case 7:
152 pool = &gzipWriterPoolForCompressionLevel7
153 case 8:
154 pool = &gzipWriterPoolForCompressionLevel8
155 case 9:
156 pool = &gzipWriterPoolForCompressionLevel9
157 default:
158 var err error
159 writer, err = gzip.NewWriterLevel(&buf, level)
160 if err != nil {
161 return nil, err
162 }
163 }
164 if pool != nil {
165 writer = pool.Get().(*gzip.Writer)
166 writer.Reset(&buf)
167 defer pool.Put(writer)
168 }
169 if _, err := writer.Write(data); err != nil {
170 return nil, err
171 }
172 if err := writer.Close(); err != nil {
173 return nil, err
174 }
175 return buf.Bytes(), nil
176}
177
178func lz4Compress(data []byte) ([]byte, error) {
179 writer := lz4WriterPool.Get().(*lz4.Writer)
180 defer lz4WriterPool.Put(writer)
181
182 var buf bytes.Buffer
183 writer.Reset(&buf)
184 if _, err := writer.Write(data); err != nil {
185 return nil, err
186 }
187 if err := writer.Close(); err != nil {
188 return nil, err
189 }
190 return buf.Bytes(), nil
191}