[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor
Change-Id: Ibaf8681ccdbffcc8a3c68612c49d7822a20e1b14
diff --git a/vendor/github.com/pierrec/lz4/writer.go b/vendor/github.com/pierrec/lz4/writer.go
index 0120438..f066d56 100644
--- a/vendor/github.com/pierrec/lz4/writer.go
+++ b/vendor/github.com/pierrec/lz4/writer.go
@@ -4,21 +4,35 @@
"encoding/binary"
"fmt"
"io"
+ "runtime"
"github.com/pierrec/lz4/internal/xxh32"
)
+// zResult contains the results of compressing a block.
+type zResult struct {
+ size uint32 // Block header
+ data []byte // Compressed data
+ checksum uint32 // Data checksum
+}
+
// Writer implements the LZ4 frame encoder.
type Writer struct {
Header
+ // Handler called when a block has been successfully written out.
+ // It provides the number of bytes written.
+ OnBlockDone func(size int)
buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
dst io.Writer // Destination.
checksum xxh32.XXHZero // Frame checksum.
- zdata []byte // Compressed data.
- data []byte // Data to be compressed.
+ data []byte // Data to be compressed + buffer for compressed data.
idx int // Index into data.
hashtable [winSize]int // Hash table used in CompressBlock().
+
+ // For concurrency.
+ c chan chan zResult // Channel for block compression goroutines and writer goroutine.
+ err error // Any error encountered while writing to the underlying destination.
}
// NewWriter returns a new LZ4 frame encoder.
@@ -26,28 +40,90 @@
// The supplied Header is checked at the first Write.
// It is ok to change it before the first Write but then not until a Reset() is performed.
func NewWriter(dst io.Writer) *Writer {
- return &Writer{dst: dst}
+ z := new(Writer)
+ z.Reset(dst)
+ return z
+}
+
+// WithConcurrency sets the number of concurrent go routines used for compression.
+// A negative value sets the concurrency to GOMAXPROCS.
+func (z *Writer) WithConcurrency(n int) *Writer {
+ switch {
+ case n == 0 || n == 1:
+ z.c = nil
+ return z
+ case n < 0:
+ n = runtime.GOMAXPROCS(0)
+ }
+ z.c = make(chan chan zResult, n)
+ // Writer goroutine managing concurrent block compression goroutines.
+ go func() {
+ // Process next block compression item.
+ for c := range z.c {
+ // Read the next compressed block result.
+ // Waiting here ensures that the blocks are output in the order they were sent.
+ // The incoming channel is always closed as it indicates to the caller that
+ // the block has been processed.
+ res := <-c
+ n := len(res.data)
+ if n == 0 {
+ // Notify the block compression routine that we are done with its result.
+ // This is used when a sentinel block is sent to terminate the compression.
+ close(c)
+ return
+ }
+ // Write the block.
+ if err := z.writeUint32(res.size); err != nil && z.err == nil {
+ z.err = err
+ }
+ if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
+ z.err = err
+ }
+ if z.BlockChecksum {
+ if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
+ z.err = err
+ }
+ }
+ // It is now safe to release the buffer as no longer in use by any goroutine.
+ putBuffer(cap(res.data), res.data)
+ if h := z.OnBlockDone; h != nil {
+ h(n)
+ }
+ close(c)
+ }
+ }()
+ return z
+}
+
+// newBuffers instantiates new buffers which size matches the one in Header.
+// The returned buffers are for decompression and compression respectively.
+func (z *Writer) newBuffers() {
+ bSize := z.Header.BlockMaxSize
+ buf := getBuffer(bSize)
+ z.data = buf[:bSize] // Uncompressed buffer is the first half.
+}
+
+// freeBuffers puts the writer's buffers back to the pool.
+func (z *Writer) freeBuffers() {
+ // Put the buffer back into the pool, if any.
+ putBuffer(z.Header.BlockMaxSize, z.data)
+ z.data = nil
}
// writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
func (z *Writer) writeHeader() error {
// Default to 4Mb if BlockMaxSize is not set.
if z.Header.BlockMaxSize == 0 {
- z.Header.BlockMaxSize = bsMapID[7]
+ z.Header.BlockMaxSize = blockSize4M
}
// The only option that needs to be validated.
bSize := z.Header.BlockMaxSize
- bSizeID, ok := bsMapValue[bSize]
- if !ok {
+ if !isValidBlockSize(z.Header.BlockMaxSize) {
return fmt.Errorf("lz4: invalid block max size: %d", bSize)
}
// Allocate the compressed/uncompressed buffers.
// The compressed buffer cannot exceed the uncompressed one.
- if n := 2 * bSize; cap(z.zdata) < n {
- z.zdata = make([]byte, n, n)
- }
- z.zdata = z.zdata[:bSize]
- z.data = z.zdata[:cap(z.zdata)][bSize:]
+ z.newBuffers()
z.idx = 0
// Size is optional.
@@ -67,7 +143,7 @@
flg |= 1 << 2
}
buf[4] = flg
- buf[5] = bSizeID << 4
+ buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4
// Current buffer size: magic(4) + flags(1) + block max size (1).
n := 6
@@ -147,28 +223,39 @@
// compressBlock compresses a block.
func (z *Writer) compressBlock(data []byte) error {
if !z.NoChecksum {
- z.checksum.Write(data)
+ _, _ = z.checksum.Write(data)
}
+ if z.c != nil {
+ c := make(chan zResult)
+ z.c <- c // Send now to guarantee order
+
+ // get a buffer from the pool and copy the data over
+ block := getBuffer(z.Header.BlockMaxSize)[:len(data)]
+ copy(block, data)
+
+ go writerCompressBlock(c, z.Header, block)
+ return nil
+ }
+
+ zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
// The compressed block size cannot exceed the input's.
var zn int
- var err error
if level := z.Header.CompressionLevel; level != 0 {
- zn, err = CompressBlockHC(data, z.zdata, level)
+ zn, _ = CompressBlockHC(data, zdata, level)
} else {
- zn, err = CompressBlock(data, z.zdata, z.hashtable[:])
+ zn, _ = CompressBlock(data, zdata, z.hashtable[:])
}
- var zdata []byte
var bLen uint32
if debugFlag {
debug("block compression %d => %d", len(data), zn)
}
- if err == nil && zn > 0 && zn < len(data) {
+ if zn > 0 && zn < len(data) {
// Compressible and compressed size smaller than uncompressed: ok!
bLen = uint32(zn)
- zdata = z.zdata[:zn]
+ zdata = zdata[:zn]
} else {
// Uncompressed block.
bLen = uint32(len(data)) | compressedBlockFlag
@@ -182,24 +269,26 @@
if err := z.writeUint32(bLen); err != nil {
return err
}
- if _, err := z.dst.Write(zdata); err != nil {
+ written, err := z.dst.Write(zdata)
+ if err != nil {
return err
}
+ if h := z.OnBlockDone; h != nil {
+ h(written)
+ }
- if z.BlockChecksum {
- checksum := xxh32.ChecksumZero(zdata)
+ if !z.BlockChecksum {
if debugFlag {
- debug("block checksum %x", checksum)
+ debug("current frame checksum %x", z.checksum.Sum32())
}
- if err := z.writeUint32(checksum); err != nil {
- return err
- }
+ return nil
}
+ checksum := xxh32.ChecksumZero(zdata)
if debugFlag {
- debug("current frame checksum %x", z.checksum.Sum32())
+ debug("block checksum %x", checksum)
+ defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }()
}
-
- return nil
+ return z.writeUint32(checksum)
}
// Flush flushes any pending compressed data to the underlying writer.
@@ -213,7 +302,35 @@
return nil
}
- return z.compressBlock(z.data[:z.idx])
+ data := getBuffer(z.Header.BlockMaxSize)[:len(z.data[:z.idx])]
+ copy(data, z.data[:z.idx])
+
+ z.idx = 0
+ if z.c == nil {
+ return z.compressBlock(data)
+ }
+ if !z.NoChecksum {
+ _, _ = z.checksum.Write(data)
+ }
+ c := make(chan zResult)
+ z.c <- c
+ writerCompressBlock(c, z.Header, data)
+ return nil
+}
+
+func (z *Writer) close() error {
+ if z.c == nil {
+ return nil
+ }
+ // Send a sentinel block (no data to compress) to terminate the writer main goroutine.
+ c := make(chan zResult)
+ z.c <- c
+ c <- zResult{}
+ // Wait for the main goroutine to complete.
+ <-c
+ // At this point the main goroutine has shut down or is about to return.
+ z.c = nil
+ return z.err
}
// Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
@@ -223,10 +340,13 @@
return err
}
}
-
if err := z.Flush(); err != nil {
return err
}
+ if err := z.close(); err != nil {
+ return err
+ }
+ z.freeBuffers()
if debugFlag {
debug("writing last empty block")
@@ -234,28 +354,33 @@
if err := z.writeUint32(0); err != nil {
return err
}
- if !z.NoChecksum {
- checksum := z.checksum.Sum32()
- if debugFlag {
- debug("stream checksum %x", checksum)
- }
- if err := z.writeUint32(checksum); err != nil {
- return err
- }
+ if z.NoChecksum {
+ return nil
}
- return nil
+ checksum := z.checksum.Sum32()
+ if debugFlag {
+ debug("stream checksum %x", checksum)
+ }
+ return z.writeUint32(checksum)
}
// Reset clears the state of the Writer z such that it is equivalent to its
// initial state from NewWriter, but instead writing to w.
// No access to the underlying io.Writer is performed.
func (z *Writer) Reset(w io.Writer) {
- z.Header = Header{}
+ n := cap(z.c)
+ _ = z.close()
+ z.freeBuffers()
+ z.Header.Reset()
z.dst = w
z.checksum.Reset()
- z.zdata = z.zdata[:0]
- z.data = z.data[:0]
z.idx = 0
+ z.err = nil
+ // reset hashtable to ensure deterministic output.
+ for i := range z.hashtable {
+ z.hashtable[i] = 0
+ }
+ z.WithConcurrency(n)
}
// writeUint32 writes a uint32 to the underlying writer.
@@ -265,3 +390,33 @@
_, err := z.dst.Write(buf)
return err
}
+
+// writerCompressBlock compresses data into a pooled buffer and writes its result
+// out to the input channel.
+func writerCompressBlock(c chan zResult, header Header, data []byte) {
+ zdata := getBuffer(header.BlockMaxSize)
+ // The compressed block size cannot exceed the input's.
+ var zn int
+ if level := header.CompressionLevel; level != 0 {
+ zn, _ = CompressBlockHC(data, zdata, level)
+ } else {
+ var hashTable [winSize]int
+ zn, _ = CompressBlock(data, zdata, hashTable[:])
+ }
+ var res zResult
+ if zn > 0 && zn < len(data) {
+ res.size = uint32(zn)
+ res.data = zdata[:zn]
+ // release the uncompressed block since it is not used anymore
+ putBuffer(header.BlockMaxSize, data)
+ } else {
+ res.size = uint32(len(data)) | compressedBlockFlag
+ res.data = data
+ // release the compressed block since it was not used
+ putBuffer(header.BlockMaxSize, zdata)
+ }
+ if header.BlockChecksum {
+ res.checksum = xxh32.ChecksumZero(res.data)
+ }
+ c <- res
+}