| package lz4 |
| |
| import ( |
| "io" |
| |
| "github.com/pierrec/lz4/v4/internal/lz4block" |
| "github.com/pierrec/lz4/v4/internal/lz4errors" |
| "github.com/pierrec/lz4/v4/internal/lz4stream" |
| ) |
| |
| var writerStates = []aState{ |
| noState: newState, |
| newState: writeState, |
| writeState: closedState, |
| closedState: newState, |
| errorState: newState, |
| } |
| |
| // NewWriter returns a new LZ4 frame encoder. |
| func NewWriter(w io.Writer) *Writer { |
| zw := &Writer{frame: lz4stream.NewFrame()} |
| zw.state.init(writerStates) |
| _ = zw.Apply(DefaultBlockSizeOption, DefaultChecksumOption, DefaultConcurrency, defaultOnBlockDone) |
| zw.Reset(w) |
| return zw |
| } |
| |
| // Writer allows writing an LZ4 stream. |
| type Writer struct { |
| state _State |
| src io.Writer // destination writer |
| level lz4block.CompressionLevel // how hard to try |
| num int // concurrency level |
| frame *lz4stream.Frame // frame being built |
| data []byte // pending data |
| idx int // size of pending data |
| handler func(int) |
| legacy bool |
| } |
| |
| func (*Writer) private() {} |
| |
| func (w *Writer) Apply(options ...Option) (err error) { |
| defer w.state.check(&err) |
| switch w.state.state { |
| case newState: |
| case errorState: |
| return w.state.err |
| default: |
| return lz4errors.ErrOptionClosedOrError |
| } |
| w.Reset(w.src) |
| for _, o := range options { |
| if err = o(w); err != nil { |
| return |
| } |
| } |
| return |
| } |
| |
| func (w *Writer) isNotConcurrent() bool { |
| return w.num == 1 |
| } |
| |
| // init sets up the Writer when in newState. It does not change the Writer state. |
| func (w *Writer) init() error { |
| w.frame.InitW(w.src, w.num, w.legacy) |
| size := w.frame.Descriptor.Flags.BlockSizeIndex() |
| w.data = size.Get() |
| w.idx = 0 |
| return w.frame.Descriptor.Write(w.frame, w.src) |
| } |
| |
| func (w *Writer) Write(buf []byte) (n int, err error) { |
| defer w.state.check(&err) |
| switch w.state.state { |
| case writeState: |
| case closedState, errorState: |
| return 0, w.state.err |
| case newState: |
| if err = w.init(); w.state.next(err) { |
| return |
| } |
| default: |
| return 0, w.state.fail() |
| } |
| |
| zn := len(w.data) |
| for len(buf) > 0 { |
| if w.isNotConcurrent() && w.idx == 0 && len(buf) >= zn { |
| // Avoid a copy as there is enough data for a block. |
| if err = w.write(buf[:zn], false); err != nil { |
| return |
| } |
| n += zn |
| buf = buf[zn:] |
| continue |
| } |
| // Accumulate the data to be compressed. |
| m := copy(w.data[w.idx:], buf) |
| n += m |
| w.idx += m |
| buf = buf[m:] |
| |
| if w.idx < len(w.data) { |
| // Buffer not filled. |
| return |
| } |
| |
| // Buffer full. |
| if err = w.write(w.data, true); err != nil { |
| return |
| } |
| if !w.isNotConcurrent() { |
| size := w.frame.Descriptor.Flags.BlockSizeIndex() |
| w.data = size.Get() |
| } |
| w.idx = 0 |
| } |
| return |
| } |
| |
| func (w *Writer) write(data []byte, safe bool) error { |
| if w.isNotConcurrent() { |
| block := w.frame.Blocks.Block |
| err := block.Compress(w.frame, data, w.level).Write(w.frame, w.src) |
| w.handler(len(block.Data)) |
| return err |
| } |
| c := make(chan *lz4stream.FrameDataBlock) |
| w.frame.Blocks.Blocks <- c |
| go func(c chan *lz4stream.FrameDataBlock, data []byte, safe bool) { |
| b := lz4stream.NewFrameDataBlock(w.frame) |
| c <- b.Compress(w.frame, data, w.level) |
| <-c |
| w.handler(len(b.Data)) |
| b.Close(w.frame) |
| if safe { |
| // safe to put it back as the last usage of it was FrameDataBlock.Write() called before c is closed |
| lz4block.Put(data) |
| } |
| }(c, data, safe) |
| |
| return nil |
| } |
| |
| // Flush any buffered data to the underlying writer immediately. |
| func (w *Writer) Flush() (err error) { |
| switch w.state.state { |
| case writeState: |
| case errorState: |
| return w.state.err |
| case newState: |
| if err = w.init(); w.state.next(err) { |
| return |
| } |
| default: |
| return nil |
| } |
| |
| if w.idx > 0 { |
| // Flush pending data, disable w.data freeing as it is done later on. |
| if err = w.write(w.data[:w.idx], false); err != nil { |
| return err |
| } |
| w.idx = 0 |
| } |
| return nil |
| } |
| |
| // Close closes the Writer, flushing any unwritten data to the underlying writer |
| // without closing it. |
| func (w *Writer) Close() error { |
| if err := w.Flush(); err != nil { |
| return err |
| } |
| err := w.frame.CloseW(w.src, w.num) |
| // It is now safe to free the buffer. |
| if w.data != nil { |
| lz4block.Put(w.data) |
| w.data = nil |
| } |
| return err |
| } |
| |
| // Reset clears the state of the Writer w such that it is equivalent to its |
| // initial state from NewWriter, but instead writing to writer. |
| // Reset keeps the previous options unless overwritten by the supplied ones. |
| // No access to writer is performed. |
| // |
| // w.Close must be called before Reset or pending data may be dropped. |
| func (w *Writer) Reset(writer io.Writer) { |
| w.frame.Reset(w.num) |
| w.state.reset() |
| w.src = writer |
| } |
| |
| // ReadFrom efficiently reads from r and compressed into the Writer destination. |
| func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { |
| switch w.state.state { |
| case closedState, errorState: |
| return 0, w.state.err |
| case newState: |
| if err = w.init(); w.state.next(err) { |
| return |
| } |
| default: |
| return 0, w.state.fail() |
| } |
| defer w.state.check(&err) |
| |
| size := w.frame.Descriptor.Flags.BlockSizeIndex() |
| var done bool |
| var rn int |
| data := size.Get() |
| if w.isNotConcurrent() { |
| // Keep the same buffer for the whole process. |
| defer lz4block.Put(data) |
| } |
| for !done { |
| rn, err = io.ReadFull(r, data) |
| switch err { |
| case nil: |
| case io.EOF, io.ErrUnexpectedEOF: // read may be partial |
| done = true |
| default: |
| return |
| } |
| n += int64(rn) |
| err = w.write(data[:rn], true) |
| if err != nil { |
| return |
| } |
| w.handler(rn) |
| if !done && !w.isNotConcurrent() { |
| // The buffer will be returned automatically by go routines (safe=true) |
| // so get a new one fo the next round. |
| data = size.Get() |
| } |
| } |
| return |
| } |