blob: 275daad7cb98ba470405bcecb35b4b2ca25603e0 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package lz4
2
3import (
4 "bytes"
5 "io"
6
7 "github.com/pierrec/lz4/v4/internal/lz4block"
8 "github.com/pierrec/lz4/v4/internal/lz4errors"
9 "github.com/pierrec/lz4/v4/internal/lz4stream"
10)
11
12var readerStates = []aState{
13 noState: newState,
14 errorState: newState,
15 newState: readState,
16 readState: closedState,
17 closedState: newState,
18}
19
20// NewReader returns a new LZ4 frame decoder.
21func NewReader(r io.Reader) *Reader {
22 return newReader(r, false)
23}
24
25func newReader(r io.Reader, legacy bool) *Reader {
26 zr := &Reader{frame: lz4stream.NewFrame()}
27 zr.state.init(readerStates)
28 _ = zr.Apply(DefaultConcurrency, defaultOnBlockDone)
29 zr.Reset(r)
30 return zr
31}
32
33// Reader allows reading an LZ4 stream.
34type Reader struct {
35 state _State
36 src io.Reader // source reader
37 num int // concurrency level
38 frame *lz4stream.Frame // frame being read
39 data []byte // block buffer allocated in non concurrent mode
40 reads chan []byte // pending data
41 idx int // size of pending data
42 handler func(int)
43 cum uint32
44 dict []byte
45}
46
47func (*Reader) private() {}
48
49func (r *Reader) Apply(options ...Option) (err error) {
50 defer r.state.check(&err)
51 switch r.state.state {
52 case newState:
53 case errorState:
54 return r.state.err
55 default:
56 return lz4errors.ErrOptionClosedOrError
57 }
58 for _, o := range options {
59 if err = o(r); err != nil {
60 return
61 }
62 }
63 return
64}
65
66// Size returns the size of the underlying uncompressed data, if set in the stream.
67func (r *Reader) Size() int {
68 switch r.state.state {
69 case readState, closedState:
70 if r.frame.Descriptor.Flags.Size() {
71 return int(r.frame.Descriptor.ContentSize)
72 }
73 }
74 return 0
75}
76
77func (r *Reader) isNotConcurrent() bool {
78 return r.num == 1
79}
80
81func (r *Reader) init() error {
82 err := r.frame.ParseHeaders(r.src)
83 if err != nil {
84 return err
85 }
86 if !r.frame.Descriptor.Flags.BlockIndependence() {
87 // We can't decompress dependent blocks concurrently.
88 // Instead of throwing an error to the user, silently drop concurrency
89 r.num = 1
90 }
91 data, err := r.frame.InitR(r.src, r.num)
92 if err != nil {
93 return err
94 }
95 r.reads = data
96 r.idx = 0
97 size := r.frame.Descriptor.Flags.BlockSizeIndex()
98 r.data = size.Get()
99 r.cum = 0
100 return nil
101}
102
103func (r *Reader) Read(buf []byte) (n int, err error) {
104 defer r.state.check(&err)
105 switch r.state.state {
106 case readState:
107 case closedState, errorState:
108 return 0, r.state.err
109 case newState:
110 // First initialization.
111 if err = r.init(); r.state.next(err) {
112 return
113 }
114 default:
115 return 0, r.state.fail()
116 }
117 for len(buf) > 0 {
118 var bn int
119 if r.idx == 0 {
120 if r.isNotConcurrent() {
121 bn, err = r.read(buf)
122 } else {
123 lz4block.Put(r.data)
124 r.data = <-r.reads
125 if len(r.data) == 0 {
126 // No uncompressed data: something went wrong or we are done.
127 err = r.frame.Blocks.ErrorR()
128 }
129 }
130 switch err {
131 case nil:
132 case io.EOF:
133 if er := r.frame.CloseR(r.src); er != nil {
134 err = er
135 }
136 lz4block.Put(r.data)
137 r.data = nil
138 return
139 default:
140 return
141 }
142 }
143 if bn == 0 {
144 // Fill buf with buffered data.
145 bn = copy(buf, r.data[r.idx:])
146 r.idx += bn
147 if r.idx == len(r.data) {
148 // All data read, get ready for the next Read.
149 r.idx = 0
150 }
151 }
152 buf = buf[bn:]
153 n += bn
154 r.handler(bn)
155 }
156 return
157}
158
159// read uncompresses the next block as follow:
160// - if buf has enough room, the block is uncompressed into it directly
161// and the lenght of used space is returned
162// - else, the uncompress data is stored in r.data and 0 is returned
163func (r *Reader) read(buf []byte) (int, error) {
164 block := r.frame.Blocks.Block
165 _, err := block.Read(r.frame, r.src, r.cum)
166 if err != nil {
167 return 0, err
168 }
169 var direct bool
170 dst := r.data[:cap(r.data)]
171 if len(buf) >= len(dst) {
172 // Uncompress directly into buf.
173 direct = true
174 dst = buf
175 }
176 dst, err = block.Uncompress(r.frame, dst, r.dict, true)
177 if err != nil {
178 return 0, err
179 }
180 if !r.frame.Descriptor.Flags.BlockIndependence() {
181 if len(r.dict)+len(dst) > 128*1024 {
182 preserveSize := 64*1024 - len(dst)
183 if preserveSize < 0 {
184 preserveSize = 0
185 }
186 r.dict = r.dict[len(r.dict)-preserveSize:]
187 }
188 r.dict = append(r.dict, dst...)
189 }
190 r.cum += uint32(len(dst))
191 if direct {
192 return len(dst), nil
193 }
194 r.data = dst
195 return 0, nil
196}
197
198// Reset clears the state of the Reader r such that it is equivalent to its
199// initial state from NewReader, but instead reading from reader.
200// No access to reader is performed.
201func (r *Reader) Reset(reader io.Reader) {
202 if r.data != nil {
203 lz4block.Put(r.data)
204 r.data = nil
205 }
206 r.frame.Reset(r.num)
207 r.state.reset()
208 r.src = reader
209 r.reads = nil
210}
211
212// WriteTo efficiently uncompresses the data from the Reader underlying source to w.
213func (r *Reader) WriteTo(w io.Writer) (n int64, err error) {
214 switch r.state.state {
215 case closedState, errorState:
216 return 0, r.state.err
217 case newState:
218 if err = r.init(); r.state.next(err) {
219 return
220 }
221 default:
222 return 0, r.state.fail()
223 }
224 defer r.state.nextd(&err)
225
226 var data []byte
227 if r.isNotConcurrent() {
228 size := r.frame.Descriptor.Flags.BlockSizeIndex()
229 data = size.Get()
230 defer lz4block.Put(data)
231 }
232 for {
233 var bn int
234 var dst []byte
235 if r.isNotConcurrent() {
236 bn, err = r.read(data)
237 dst = data[:bn]
238 } else {
239 lz4block.Put(dst)
240 dst = <-r.reads
241 bn = len(dst)
242 if bn == 0 {
243 // No uncompressed data: something went wrong or we are done.
244 err = r.frame.Blocks.ErrorR()
245 }
246 }
247 switch err {
248 case nil:
249 case io.EOF:
250 err = r.frame.CloseR(r.src)
251 return
252 default:
253 return
254 }
255 r.handler(bn)
256 bn, err = w.Write(dst)
257 n += int64(bn)
258 if err != nil {
259 return
260 }
261 }
262}
263
264// ValidFrameHeader returns a bool indicating if the given bytes slice matches a LZ4 header.
265func ValidFrameHeader(in []byte) (bool, error) {
266 f := lz4stream.NewFrame()
267 err := f.ParseHeaders(bytes.NewReader(in))
268 if err == nil {
269 return true, nil
270 }
271 if err == lz4errors.ErrInvalidFrame {
272 return false, nil
273 }
274 return false, err
275}