| package sarama |
| |
| import ( |
| "bytes" |
| "fmt" |
| "sync" |
| |
| snappy "github.com/eapache/go-xerial-snappy" |
| "github.com/klauspost/compress/gzip" |
| "github.com/pierrec/lz4/v4" |
| ) |
| |
| var ( |
| lz4ReaderPool = sync.Pool{ |
| New: func() interface{} { |
| return lz4.NewReader(nil) |
| }, |
| } |
| |
| gzipReaderPool sync.Pool |
| |
| bufferPool = sync.Pool{ |
| New: func() interface{} { |
| return new(bytes.Buffer) |
| }, |
| } |
| |
| bytesPool = sync.Pool{ |
| New: func() interface{} { |
| res := make([]byte, 0, 4096) |
| return &res |
| }, |
| } |
| ) |
| |
| func decompress(cc CompressionCodec, data []byte) ([]byte, error) { |
| switch cc { |
| case CompressionNone: |
| return data, nil |
| case CompressionGZIP: |
| var err error |
| reader, ok := gzipReaderPool.Get().(*gzip.Reader) |
| if !ok { |
| reader, err = gzip.NewReader(bytes.NewReader(data)) |
| } else { |
| err = reader.Reset(bytes.NewReader(data)) |
| } |
| |
| if err != nil { |
| return nil, err |
| } |
| |
| buffer := bufferPool.Get().(*bytes.Buffer) |
| _, err = buffer.ReadFrom(reader) |
| // copy the buffer to a new slice with the correct length |
| // reuse gzipReader and buffer |
| gzipReaderPool.Put(reader) |
| res := make([]byte, buffer.Len()) |
| copy(res, buffer.Bytes()) |
| buffer.Reset() |
| bufferPool.Put(buffer) |
| |
| return res, err |
| case CompressionSnappy: |
| return snappy.Decode(data) |
| case CompressionLZ4: |
| reader, ok := lz4ReaderPool.Get().(*lz4.Reader) |
| if !ok { |
| reader = lz4.NewReader(bytes.NewReader(data)) |
| } else { |
| reader.Reset(bytes.NewReader(data)) |
| } |
| buffer := bufferPool.Get().(*bytes.Buffer) |
| _, err := buffer.ReadFrom(reader) |
| // copy the buffer to a new slice with the correct length |
| // reuse lz4Reader and buffer |
| lz4ReaderPool.Put(reader) |
| res := make([]byte, buffer.Len()) |
| copy(res, buffer.Bytes()) |
| buffer.Reset() |
| bufferPool.Put(buffer) |
| |
| return res, err |
| case CompressionZSTD: |
| buffer := *bytesPool.Get().(*[]byte) |
| var err error |
| buffer, err = zstdDecompress(ZstdDecoderParams{}, buffer, data) |
| // copy the buffer to a new slice with the correct length and reuse buffer |
| res := make([]byte, len(buffer)) |
| copy(res, buffer) |
| buffer = buffer[:0] |
| bytesPool.Put(&buffer) |
| |
| return res, err |
| default: |
| return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)} |
| } |
| } |