blob: 4358adee1093f407d8d1507c579857e45458d2d2 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package lz4
2
3import (
4 "io"
5
6 "github.com/pierrec/lz4/v4/internal/lz4block"
7 "github.com/pierrec/lz4/v4/internal/lz4errors"
8 "github.com/pierrec/lz4/v4/internal/lz4stream"
9)
10
11var writerStates = []aState{
12 noState: newState,
13 newState: writeState,
14 writeState: closedState,
15 closedState: newState,
16 errorState: newState,
17}
18
19// NewWriter returns a new LZ4 frame encoder.
20func NewWriter(w io.Writer) *Writer {
21 zw := &Writer{frame: lz4stream.NewFrame()}
22 zw.state.init(writerStates)
23 _ = zw.Apply(DefaultBlockSizeOption, DefaultChecksumOption, DefaultConcurrency, defaultOnBlockDone)
24 zw.Reset(w)
25 return zw
26}
27
28// Writer allows writing an LZ4 stream.
29type Writer struct {
30 state _State
31 src io.Writer // destination writer
32 level lz4block.CompressionLevel // how hard to try
33 num int // concurrency level
34 frame *lz4stream.Frame // frame being built
35 data []byte // pending data
36 idx int // size of pending data
37 handler func(int)
38 legacy bool
39}
40
41func (*Writer) private() {}
42
43func (w *Writer) Apply(options ...Option) (err error) {
44 defer w.state.check(&err)
45 switch w.state.state {
46 case newState:
47 case errorState:
48 return w.state.err
49 default:
50 return lz4errors.ErrOptionClosedOrError
51 }
52 w.Reset(w.src)
53 for _, o := range options {
54 if err = o(w); err != nil {
55 return
56 }
57 }
58 return
59}
60
61func (w *Writer) isNotConcurrent() bool {
62 return w.num == 1
63}
64
65// init sets up the Writer when in newState. It does not change the Writer state.
66func (w *Writer) init() error {
67 w.frame.InitW(w.src, w.num, w.legacy)
68 size := w.frame.Descriptor.Flags.BlockSizeIndex()
69 w.data = size.Get()
70 w.idx = 0
71 return w.frame.Descriptor.Write(w.frame, w.src)
72}
73
74func (w *Writer) Write(buf []byte) (n int, err error) {
75 defer w.state.check(&err)
76 switch w.state.state {
77 case writeState:
78 case closedState, errorState:
79 return 0, w.state.err
80 case newState:
81 if err = w.init(); w.state.next(err) {
82 return
83 }
84 default:
85 return 0, w.state.fail()
86 }
87
88 zn := len(w.data)
89 for len(buf) > 0 {
90 if w.isNotConcurrent() && w.idx == 0 && len(buf) >= zn {
91 // Avoid a copy as there is enough data for a block.
92 if err = w.write(buf[:zn], false); err != nil {
93 return
94 }
95 n += zn
96 buf = buf[zn:]
97 continue
98 }
99 // Accumulate the data to be compressed.
100 m := copy(w.data[w.idx:], buf)
101 n += m
102 w.idx += m
103 buf = buf[m:]
104
105 if w.idx < len(w.data) {
106 // Buffer not filled.
107 return
108 }
109
110 // Buffer full.
111 if err = w.write(w.data, true); err != nil {
112 return
113 }
114 if !w.isNotConcurrent() {
115 size := w.frame.Descriptor.Flags.BlockSizeIndex()
116 w.data = size.Get()
117 }
118 w.idx = 0
119 }
120 return
121}
122
123func (w *Writer) write(data []byte, safe bool) error {
124 if w.isNotConcurrent() {
125 block := w.frame.Blocks.Block
126 err := block.Compress(w.frame, data, w.level).Write(w.frame, w.src)
127 w.handler(len(block.Data))
128 return err
129 }
130 c := make(chan *lz4stream.FrameDataBlock)
131 w.frame.Blocks.Blocks <- c
132 go func(c chan *lz4stream.FrameDataBlock, data []byte, safe bool) {
133 b := lz4stream.NewFrameDataBlock(w.frame)
134 c <- b.Compress(w.frame, data, w.level)
135 <-c
136 w.handler(len(b.Data))
137 b.Close(w.frame)
138 if safe {
139 // safe to put it back as the last usage of it was FrameDataBlock.Write() called before c is closed
140 lz4block.Put(data)
141 }
142 }(c, data, safe)
143
144 return nil
145}
146
147// Flush any buffered data to the underlying writer immediately.
148func (w *Writer) Flush() (err error) {
149 switch w.state.state {
150 case writeState:
151 case errorState:
152 return w.state.err
153 case newState:
154 if err = w.init(); w.state.next(err) {
155 return
156 }
157 default:
158 return nil
159 }
160
161 if w.idx > 0 {
162 // Flush pending data, disable w.data freeing as it is done later on.
163 if err = w.write(w.data[:w.idx], false); err != nil {
164 return err
165 }
166 w.idx = 0
167 }
168 return nil
169}
170
171// Close closes the Writer, flushing any unwritten data to the underlying writer
172// without closing it.
173func (w *Writer) Close() error {
174 if err := w.Flush(); err != nil {
175 return err
176 }
177 err := w.frame.CloseW(w.src, w.num)
178 // It is now safe to free the buffer.
179 if w.data != nil {
180 lz4block.Put(w.data)
181 w.data = nil
182 }
183 return err
184}
185
186// Reset clears the state of the Writer w such that it is equivalent to its
187// initial state from NewWriter, but instead writing to writer.
188// Reset keeps the previous options unless overwritten by the supplied ones.
189// No access to writer is performed.
190//
191// w.Close must be called before Reset or pending data may be dropped.
192func (w *Writer) Reset(writer io.Writer) {
193 w.frame.Reset(w.num)
194 w.state.reset()
195 w.src = writer
196}
197
198// ReadFrom efficiently reads from r and compressed into the Writer destination.
199func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
200 switch w.state.state {
201 case closedState, errorState:
202 return 0, w.state.err
203 case newState:
204 if err = w.init(); w.state.next(err) {
205 return
206 }
207 default:
208 return 0, w.state.fail()
209 }
210 defer w.state.check(&err)
211
212 size := w.frame.Descriptor.Flags.BlockSizeIndex()
213 var done bool
214 var rn int
215 data := size.Get()
216 if w.isNotConcurrent() {
217 // Keep the same buffer for the whole process.
218 defer lz4block.Put(data)
219 }
220 for !done {
221 rn, err = io.ReadFull(r, data)
222 switch err {
223 case nil:
224 case io.EOF, io.ErrUnexpectedEOF: // read may be partial
225 done = true
226 default:
227 return
228 }
229 n += int64(rn)
230 err = w.write(data[:rn], true)
231 if err != nil {
232 return
233 }
234 w.handler(rn)
235 if !done && !w.isNotConcurrent() {
236 // The buffer will be returned automatically by go routines (safe=true)
237 // so get a new one fo the next round.
238 data = size.Get()
239 }
240 }
241 return
242}