blob: ea2a19376c1b1df1a1649307ed89cf01313a95d2 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2019+ Klaus Post. All rights reserved.
2// License information can be found in the LICENSE file.
3// Based on work by Yann Collet, released under BSD License.
4
5package zstd
6
7import (
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +05308 "context"
9 "encoding/binary"
khenaidood948f772021-08-11 17:49:24 -040010 "io"
11 "sync"
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +053012
13 "github.com/klauspost/compress/zstd/internal/xxhash"
khenaidood948f772021-08-11 17:49:24 -040014)
15
16// Decoder provides decoding of zstandard streams.
17// The decoder has been designed to operate without allocations after a warmup.
18// This means that you should store the decoder for best performance.
19// To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
20// A decoder can safely be re-used even if the previous stream failed.
21// To release the resources, you must call the Close() function on a decoder.
22type Decoder struct {
23 o decoderOptions
24
25 // Unreferenced decoders, ready for use.
26 decoders chan *blockDec
27
khenaidood948f772021-08-11 17:49:24 -040028 // Current read position used for Reader functionality.
29 current decoderState
30
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +053031 // sync stream decoding
32 syncStream struct {
33 decodedFrame uint64
34 br readerWrapper
35 enabled bool
36 inFrame bool
Abhay Kumara2ae5992025-11-10 14:02:24 +000037 dstBuf []byte
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +053038 }
39
40 frame *frameDec
41
khenaidood948f772021-08-11 17:49:24 -040042 // Custom dictionaries.
Abhay Kumara2ae5992025-11-10 14:02:24 +000043 dicts map[uint32]*dict
khenaidood948f772021-08-11 17:49:24 -040044
45 // streamWg is the waitgroup for all streams
46 streamWg sync.WaitGroup
47}
48
49// decoderState is used for maintaining state when the decoder
50// is used for streaming.
51type decoderState struct {
52 // current block being written to stream.
53 decodeOutput
54
55 // output in order to be written to stream.
56 output chan decodeOutput
57
58 // cancel remaining output.
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +053059 cancel context.CancelFunc
60
61 // crc of current frame
62 crc *xxhash.Digest
khenaidood948f772021-08-11 17:49:24 -040063
64 flushed bool
65}
66
67var (
68 // Check the interfaces we want to support.
69 _ = io.WriterTo(&Decoder{})
70 _ = io.Reader(&Decoder{})
71)
72
73// NewReader creates a new decoder.
74// A nil Reader can be provided in which case Reset can be used to start a decode.
75//
76// A Decoder can be used in two modes:
77//
78// 1) As a stream, or
79// 2) For stateless decoding using DecodeAll.
80//
81// Only a single stream can be decoded concurrently, but the same decoder
82// can run multiple concurrent stateless decodes. It is even possible to
83// use stateless decodes while a stream is being decoded.
84//
Abhay Kumara2ae5992025-11-10 14:02:24 +000085// The Reset function can be used to initiate a new stream, which will considerably
khenaidood948f772021-08-11 17:49:24 -040086// reduce the allocations normally caused by NewReader.
87func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
88 initPredefined()
89 var d Decoder
90 d.o.setDefault()
91 for _, o := range opts {
92 err := o(&d.o)
93 if err != nil {
94 return nil, err
95 }
96 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +053097 d.current.crc = xxhash.New()
khenaidood948f772021-08-11 17:49:24 -040098 d.current.flushed = true
99
100 if r == nil {
101 d.current.err = ErrDecoderNilInput
102 }
103
104 // Transfer option dicts.
Abhay Kumara2ae5992025-11-10 14:02:24 +0000105 d.dicts = make(map[uint32]*dict, len(d.o.dicts))
khenaidood948f772021-08-11 17:49:24 -0400106 for _, dc := range d.o.dicts {
107 d.dicts[dc.id] = dc
108 }
109 d.o.dicts = nil
110
111 // Create decoders
112 d.decoders = make(chan *blockDec, d.o.concurrent)
113 for i := 0; i < d.o.concurrent; i++ {
114 dec := newBlockDec(d.o.lowMem)
115 dec.localFrame = newFrameDec(d.o)
116 d.decoders <- dec
117 }
118
119 if r == nil {
120 return &d, nil
121 }
122 return &d, d.Reset(r)
123}
124
125// Read bytes from the decompressed stream into p.
Abhay Kumara2ae5992025-11-10 14:02:24 +0000126// Returns the number of bytes read and any error that occurred.
khenaidood948f772021-08-11 17:49:24 -0400127// When the stream is done, io.EOF will be returned.
128func (d *Decoder) Read(p []byte) (int, error) {
khenaidood948f772021-08-11 17:49:24 -0400129 var n int
130 for {
131 if len(d.current.b) > 0 {
132 filled := copy(p, d.current.b)
133 p = p[filled:]
134 d.current.b = d.current.b[filled:]
135 n += filled
136 }
137 if len(p) == 0 {
138 break
139 }
140 if len(d.current.b) == 0 {
141 // We have an error and no more data
142 if d.current.err != nil {
143 break
144 }
145 if !d.nextBlock(n == 0) {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530146 return n, d.current.err
khenaidood948f772021-08-11 17:49:24 -0400147 }
148 }
149 }
150 if len(d.current.b) > 0 {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530151 if debugDecoder {
khenaidood948f772021-08-11 17:49:24 -0400152 println("returning", n, "still bytes left:", len(d.current.b))
153 }
154 // Only return error at end of block
155 return n, nil
156 }
157 if d.current.err != nil {
158 d.drainOutput()
159 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530160 if debugDecoder {
khenaidood948f772021-08-11 17:49:24 -0400161 println("returning", n, d.current.err, len(d.decoders))
162 }
163 return n, d.current.err
164}
165
166// Reset will reset the decoder the supplied stream after the current has finished processing.
167// Note that this functionality cannot be used after Close has been called.
168// Reset can be called with a nil reader to release references to the previous reader.
169// After being called with a nil reader, no other operations than Reset or DecodeAll or Close
170// should be used.
171func (d *Decoder) Reset(r io.Reader) error {
172 if d.current.err == ErrDecoderClosed {
173 return d.current.err
174 }
175
176 d.drainOutput()
177
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530178 d.syncStream.br.r = nil
khenaidood948f772021-08-11 17:49:24 -0400179 if r == nil {
180 d.current.err = ErrDecoderNilInput
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530181 if len(d.current.b) > 0 {
182 d.current.b = d.current.b[:0]
183 }
khenaidood948f772021-08-11 17:49:24 -0400184 d.current.flushed = true
185 return nil
186 }
187
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530188 // If bytes buffer and < 5MB, do sync decoding anyway.
Abhay Kumara2ae5992025-11-10 14:02:24 +0000189 if bb, ok := r.(byter); ok && bb.Len() < d.o.decodeBufsBelow && !d.o.limitToCap {
khenaidood948f772021-08-11 17:49:24 -0400190 bb2 := bb
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530191 if debugDecoder {
khenaidood948f772021-08-11 17:49:24 -0400192 println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
193 }
194 b := bb2.Bytes()
195 var dst []byte
Abhay Kumara2ae5992025-11-10 14:02:24 +0000196 if cap(d.syncStream.dstBuf) > 0 {
197 dst = d.syncStream.dstBuf[:0]
khenaidood948f772021-08-11 17:49:24 -0400198 }
199
Abhay Kumara2ae5992025-11-10 14:02:24 +0000200 dst, err := d.DecodeAll(b, dst)
khenaidood948f772021-08-11 17:49:24 -0400201 if err == nil {
202 err = io.EOF
203 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000204 // Save output buffer
205 d.syncStream.dstBuf = dst
khenaidood948f772021-08-11 17:49:24 -0400206 d.current.b = dst
207 d.current.err = err
208 d.current.flushed = true
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530209 if debugDecoder {
khenaidood948f772021-08-11 17:49:24 -0400210 println("sync decode to", len(dst), "bytes, err:", err)
211 }
212 return nil
213 }
khenaidood948f772021-08-11 17:49:24 -0400214 // Remove current block.
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530215 d.stashDecoder()
khenaidood948f772021-08-11 17:49:24 -0400216 d.current.decodeOutput = decodeOutput{}
217 d.current.err = nil
khenaidood948f772021-08-11 17:49:24 -0400218 d.current.flushed = false
219 d.current.d = nil
Abhay Kumara2ae5992025-11-10 14:02:24 +0000220 d.syncStream.dstBuf = nil
khenaidood948f772021-08-11 17:49:24 -0400221
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530222 // Ensure no-one else is still running...
223 d.streamWg.Wait()
224 if d.frame == nil {
225 d.frame = newFrameDec(d.o)
khenaidood948f772021-08-11 17:49:24 -0400226 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530227
228 if d.o.concurrent == 1 {
229 return d.startSyncDecoder(r)
230 }
231
232 d.current.output = make(chan decodeOutput, d.o.concurrent)
233 ctx, cancel := context.WithCancel(context.Background())
234 d.current.cancel = cancel
235 d.streamWg.Add(1)
236 go d.startStreamDecoder(ctx, r, d.current.output)
237
khenaidood948f772021-08-11 17:49:24 -0400238 return nil
239}
240
241// drainOutput will drain the output until errEndOfStream is sent.
242func (d *Decoder) drainOutput() {
243 if d.current.cancel != nil {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530244 if debugDecoder {
245 println("cancelling current")
246 }
247 d.current.cancel()
khenaidood948f772021-08-11 17:49:24 -0400248 d.current.cancel = nil
249 }
250 if d.current.d != nil {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530251 if debugDecoder {
khenaidood948f772021-08-11 17:49:24 -0400252 printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
253 }
254 d.decoders <- d.current.d
255 d.current.d = nil
256 d.current.b = nil
257 }
258 if d.current.output == nil || d.current.flushed {
259 println("current already flushed")
260 return
261 }
262 for v := range d.current.output {
263 if v.d != nil {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530264 if debugDecoder {
khenaidood948f772021-08-11 17:49:24 -0400265 printf("re-adding decoder %p", v.d)
266 }
267 d.decoders <- v.d
268 }
khenaidood948f772021-08-11 17:49:24 -0400269 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530270 d.current.output = nil
271 d.current.flushed = true
khenaidood948f772021-08-11 17:49:24 -0400272}
273
274// WriteTo writes data to w until there's no more data to write or when an error occurs.
275// The return value n is the number of bytes written.
276// Any error encountered during the write is also returned.
277func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
khenaidood948f772021-08-11 17:49:24 -0400278 var n int64
279 for {
280 if len(d.current.b) > 0 {
281 n2, err2 := w.Write(d.current.b)
282 n += int64(n2)
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530283 if err2 != nil && (d.current.err == nil || d.current.err == io.EOF) {
khenaidood948f772021-08-11 17:49:24 -0400284 d.current.err = err2
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530285 } else if n2 != len(d.current.b) {
286 d.current.err = io.ErrShortWrite
khenaidood948f772021-08-11 17:49:24 -0400287 }
288 }
289 if d.current.err != nil {
290 break
291 }
292 d.nextBlock(true)
293 }
294 err := d.current.err
295 if err != nil {
296 d.drainOutput()
297 }
298 if err == io.EOF {
299 err = nil
300 }
301 return n, err
302}
303
304// DecodeAll allows stateless decoding of a blob of bytes.
305// Output will be appended to dst, so if the destination size is known
306// you can pre-allocate the destination slice to avoid allocations.
307// DecodeAll can be used concurrently.
308// The Decoder concurrency limits will be respected.
309func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530310 if d.decoders == nil {
khenaidood948f772021-08-11 17:49:24 -0400311 return dst, ErrDecoderClosed
312 }
313
314 // Grab a block decoder and frame decoder.
315 block := <-d.decoders
316 frame := block.localFrame
Abhay Kumara2ae5992025-11-10 14:02:24 +0000317 initialSize := len(dst)
khenaidood948f772021-08-11 17:49:24 -0400318 defer func() {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530319 if debugDecoder {
khenaidood948f772021-08-11 17:49:24 -0400320 printf("re-adding decoder: %p", block)
321 }
322 frame.rawInput = nil
323 frame.bBuf = nil
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530324 if frame.history.decoders.br != nil {
325 frame.history.decoders.br.in = nil
Abhay Kumara2ae5992025-11-10 14:02:24 +0000326 frame.history.decoders.br.cursor = 0
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530327 }
khenaidood948f772021-08-11 17:49:24 -0400328 d.decoders <- block
329 }()
330 frame.bBuf = input
331
332 for {
333 frame.history.reset()
334 err := frame.reset(&frame.bBuf)
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530335 if err != nil {
336 if err == io.EOF {
337 if debugDecoder {
338 println("frame reset return EOF")
339 }
340 return dst, nil
khenaidood948f772021-08-11 17:49:24 -0400341 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530342 return dst, err
khenaidood948f772021-08-11 17:49:24 -0400343 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000344 if err = d.setDict(frame); err != nil {
345 return nil, err
khenaidood948f772021-08-11 17:49:24 -0400346 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530347 if frame.WindowSize > d.o.maxWindowSize {
348 if debugDecoder {
349 println("window size exceeded:", frame.WindowSize, ">", d.o.maxWindowSize)
350 }
351 return dst, ErrWindowSizeExceeded
khenaidood948f772021-08-11 17:49:24 -0400352 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530353 if frame.FrameContentSize != fcsUnknown {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000354 if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)-initialSize) {
355 if debugDecoder {
356 println("decoder size exceeded; fcs:", frame.FrameContentSize, "> mcs:", d.o.maxDecodedSize-uint64(len(dst)-initialSize), "len:", len(dst))
357 }
358 return dst, ErrDecoderSizeExceeded
359 }
360 if d.o.limitToCap && frame.FrameContentSize > uint64(cap(dst)-len(dst)) {
361 if debugDecoder {
362 println("decoder size exceeded; fcs:", frame.FrameContentSize, "> (cap-len)", cap(dst)-len(dst))
363 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530364 return dst, ErrDecoderSizeExceeded
365 }
khenaidood948f772021-08-11 17:49:24 -0400366 if cap(dst)-len(dst) < int(frame.FrameContentSize) {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530367 dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize)+compressedBlockOverAlloc)
khenaidood948f772021-08-11 17:49:24 -0400368 copy(dst2, dst)
369 dst = dst2
370 }
371 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530372
Abhay Kumara2ae5992025-11-10 14:02:24 +0000373 if cap(dst) == 0 && !d.o.limitToCap {
khenaidood948f772021-08-11 17:49:24 -0400374 // Allocate len(input) * 2 by default if nothing is provided
375 // and we didn't get frame content size.
376 size := len(input) * 2
377 // Cap to 1 MB.
378 if size > 1<<20 {
379 size = 1 << 20
380 }
381 if uint64(size) > d.o.maxDecodedSize {
382 size = int(d.o.maxDecodedSize)
383 }
384 dst = make([]byte, 0, size)
385 }
386
387 dst, err = frame.runDecoder(dst, block)
388 if err != nil {
389 return dst, err
390 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000391 if uint64(len(dst)-initialSize) > d.o.maxDecodedSize {
392 return dst, ErrDecoderSizeExceeded
393 }
khenaidood948f772021-08-11 17:49:24 -0400394 if len(frame.bBuf) == 0 {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530395 if debugDecoder {
khenaidood948f772021-08-11 17:49:24 -0400396 println("frame dbuf empty")
397 }
398 break
399 }
400 }
401 return dst, nil
402}
403
404// nextBlock returns the next block.
405// If an error occurs d.err will be set.
406// Optionally the function can block for new output.
407// If non-blocking mode is used the returned boolean will be false
408// if no data was available without blocking.
409func (d *Decoder) nextBlock(blocking bool) (ok bool) {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530410 if d.current.err != nil {
411 // Keep error state.
412 return false
413 }
414 d.current.b = d.current.b[:0]
415
416 // SYNC:
417 if d.syncStream.enabled {
418 if !blocking {
419 return false
420 }
421 ok = d.nextBlockSync()
422 if !ok {
423 d.stashDecoder()
424 }
425 return ok
426 }
427
428 //ASYNC:
429 d.stashDecoder()
430 if blocking {
431 d.current.decodeOutput, ok = <-d.current.output
432 } else {
433 select {
434 case d.current.decodeOutput, ok = <-d.current.output:
435 default:
436 return false
437 }
438 }
439 if !ok {
440 // This should not happen, so signal error state...
441 d.current.err = io.ErrUnexpectedEOF
442 return false
443 }
444 next := d.current.decodeOutput
445 if next.d != nil && next.d.async.newHist != nil {
446 d.current.crc.Reset()
447 }
448 if debugDecoder {
449 var tmp [4]byte
450 binary.LittleEndian.PutUint32(tmp[:], uint32(xxhash.Sum64(next.b)))
451 println("got", len(d.current.b), "bytes, error:", d.current.err, "data crc:", tmp)
452 }
453
Abhay Kumara2ae5992025-11-10 14:02:24 +0000454 if d.o.ignoreChecksum {
455 return true
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530456 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000457
458 if len(next.b) > 0 {
459 d.current.crc.Write(next.b)
460 }
461 if next.err == nil && next.d != nil && next.d.hasCRC {
462 got := uint32(d.current.crc.Sum64())
463 if got != next.d.checkCRC {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530464 if debugDecoder {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000465 printf("CRC Check Failed: %08x (got) != %08x (on stream)\n", got, next.d.checkCRC)
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530466 }
467 d.current.err = ErrCRCMismatch
468 } else {
469 if debugDecoder {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000470 printf("CRC ok %08x\n", got)
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530471 }
472 }
473 }
474
475 return true
476}
477
478func (d *Decoder) nextBlockSync() (ok bool) {
479 if d.current.d == nil {
480 d.current.d = <-d.decoders
481 }
482 for len(d.current.b) == 0 {
483 if !d.syncStream.inFrame {
484 d.frame.history.reset()
485 d.current.err = d.frame.reset(&d.syncStream.br)
Abhay Kumara2ae5992025-11-10 14:02:24 +0000486 if d.current.err == nil {
487 d.current.err = d.setDict(d.frame)
488 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530489 if d.current.err != nil {
490 return false
491 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530492 if d.frame.WindowSize > d.o.maxDecodedSize || d.frame.WindowSize > d.o.maxWindowSize {
493 d.current.err = ErrDecoderSizeExceeded
494 return false
495 }
496
497 d.syncStream.decodedFrame = 0
498 d.syncStream.inFrame = true
499 }
500 d.current.err = d.frame.next(d.current.d)
501 if d.current.err != nil {
502 return false
503 }
504 d.frame.history.ensureBlock()
505 if debugDecoder {
506 println("History trimmed:", len(d.frame.history.b), "decoded already:", d.syncStream.decodedFrame)
507 }
508 histBefore := len(d.frame.history.b)
509 d.current.err = d.current.d.decodeBuf(&d.frame.history)
510
511 if d.current.err != nil {
512 println("error after:", d.current.err)
513 return false
514 }
515 d.current.b = d.frame.history.b[histBefore:]
516 if debugDecoder {
517 println("history after:", len(d.frame.history.b))
518 }
519
520 // Check frame size (before CRC)
521 d.syncStream.decodedFrame += uint64(len(d.current.b))
522 if d.syncStream.decodedFrame > d.frame.FrameContentSize {
523 if debugDecoder {
524 printf("DecodedFrame (%d) > FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
525 }
526 d.current.err = ErrFrameSizeExceeded
527 return false
528 }
529
530 // Check FCS
531 if d.current.d.Last && d.frame.FrameContentSize != fcsUnknown && d.syncStream.decodedFrame != d.frame.FrameContentSize {
532 if debugDecoder {
533 printf("DecodedFrame (%d) != FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
534 }
535 d.current.err = ErrFrameSizeMismatch
536 return false
537 }
538
539 // Update/Check CRC
540 if d.frame.HasCheckSum {
541 if !d.o.ignoreChecksum {
542 d.frame.crc.Write(d.current.b)
543 }
544 if d.current.d.Last {
545 if !d.o.ignoreChecksum {
546 d.current.err = d.frame.checkCRC()
547 } else {
548 d.current.err = d.frame.consumeCRC()
549 }
550 if d.current.err != nil {
551 println("CRC error:", d.current.err)
552 return false
553 }
554 }
555 }
556 d.syncStream.inFrame = !d.current.d.Last
557 }
558 return true
559}
560
561func (d *Decoder) stashDecoder() {
khenaidood948f772021-08-11 17:49:24 -0400562 if d.current.d != nil {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530563 if debugDecoder {
khenaidood948f772021-08-11 17:49:24 -0400564 printf("re-adding current decoder %p", d.current.d)
565 }
566 d.decoders <- d.current.d
567 d.current.d = nil
568 }
khenaidood948f772021-08-11 17:49:24 -0400569}
570
571// Close will release all resources.
572// It is NOT possible to reuse the decoder after this.
573func (d *Decoder) Close() {
574 if d.current.err == ErrDecoderClosed {
575 return
576 }
577 d.drainOutput()
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530578 if d.current.cancel != nil {
579 d.current.cancel()
khenaidood948f772021-08-11 17:49:24 -0400580 d.streamWg.Wait()
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530581 d.current.cancel = nil
khenaidood948f772021-08-11 17:49:24 -0400582 }
583 if d.decoders != nil {
584 close(d.decoders)
585 for dec := range d.decoders {
586 dec.Close()
587 }
588 d.decoders = nil
589 }
590 if d.current.d != nil {
591 d.current.d.Close()
592 d.current.d = nil
593 }
594 d.current.err = ErrDecoderClosed
595}
596
597// IOReadCloser returns the decoder as an io.ReadCloser for convenience.
598// Any changes to the decoder will be reflected, so the returned ReadCloser
599// can be reused along with the decoder.
600// io.WriterTo is also supported by the returned ReadCloser.
601func (d *Decoder) IOReadCloser() io.ReadCloser {
602 return closeWrapper{d: d}
603}
604
605// closeWrapper wraps a function call as a closer.
606type closeWrapper struct {
607 d *Decoder
608}
609
610// WriteTo forwards WriteTo calls to the decoder.
611func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
612 return c.d.WriteTo(w)
613}
614
615// Read forwards read calls to the decoder.
616func (c closeWrapper) Read(p []byte) (n int, err error) {
617 return c.d.Read(p)
618}
619
620// Close closes the decoder.
621func (c closeWrapper) Close() error {
622 c.d.Close()
623 return nil
624}
625
626type decodeOutput struct {
627 d *blockDec
628 b []byte
629 err error
630}
631
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530632func (d *Decoder) startSyncDecoder(r io.Reader) error {
633 d.frame.history.reset()
634 d.syncStream.br = readerWrapper{r: r}
635 d.syncStream.inFrame = false
636 d.syncStream.enabled = true
637 d.syncStream.decodedFrame = 0
638 return nil
khenaidood948f772021-08-11 17:49:24 -0400639}
640
khenaidood948f772021-08-11 17:49:24 -0400641// Create Decoder:
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530642// ASYNC:
643// Spawn 3 go routines.
644// 0: Read frames and decode block literals.
645// 1: Decode sequences.
646// 2: Execute sequences, send to output.
647func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) {
khenaidood948f772021-08-11 17:49:24 -0400648 defer d.streamWg.Done()
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530649 br := readerWrapper{r: r}
650
651 var seqDecode = make(chan *blockDec, d.o.concurrent)
652 var seqExecute = make(chan *blockDec, d.o.concurrent)
653
654 // Async 1: Decode sequences...
655 go func() {
656 var hist history
657 var hasErr bool
658
659 for block := range seqDecode {
660 if hasErr {
661 if block != nil {
662 seqExecute <- block
663 }
664 continue
665 }
666 if block.async.newHist != nil {
667 if debugDecoder {
668 println("Async 1: new history, recent:", block.async.newHist.recentOffsets)
669 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000670 hist.reset()
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530671 hist.decoders = block.async.newHist.decoders
672 hist.recentOffsets = block.async.newHist.recentOffsets
673 hist.windowSize = block.async.newHist.windowSize
674 if block.async.newHist.dict != nil {
675 hist.setDict(block.async.newHist.dict)
676 }
677 }
678 if block.err != nil || block.Type != blockTypeCompressed {
679 hasErr = block.err != nil
680 seqExecute <- block
681 continue
682 }
683
684 hist.decoders.literals = block.async.literals
685 block.err = block.prepareSequences(block.async.seqData, &hist)
686 if debugDecoder && block.err != nil {
687 println("prepareSequences returned:", block.err)
688 }
689 hasErr = block.err != nil
690 if block.err == nil {
691 block.err = block.decodeSequences(&hist)
692 if debugDecoder && block.err != nil {
693 println("decodeSequences returned:", block.err)
694 }
695 hasErr = block.err != nil
696 // block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs]
697 block.async.seqSize = hist.decoders.seqSize
698 }
699 seqExecute <- block
khenaidood948f772021-08-11 17:49:24 -0400700 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530701 close(seqExecute)
Abhay Kumara2ae5992025-11-10 14:02:24 +0000702 hist.reset()
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530703 }()
704
705 var wg sync.WaitGroup
706 wg.Add(1)
707
708 // Async 3: Execute sequences...
709 frameHistCache := d.frame.history.b
710 go func() {
711 var hist history
712 var decodedFrame uint64
713 var fcs uint64
714 var hasErr bool
715 for block := range seqExecute {
716 out := decodeOutput{err: block.err, d: block}
717 if block.err != nil || hasErr {
718 hasErr = true
719 output <- out
720 continue
khenaidood948f772021-08-11 17:49:24 -0400721 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530722 if block.async.newHist != nil {
723 if debugDecoder {
724 println("Async 2: new history")
725 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000726 hist.reset()
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530727 hist.windowSize = block.async.newHist.windowSize
728 hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer
729 if block.async.newHist.dict != nil {
730 hist.setDict(block.async.newHist.dict)
731 }
732
733 if cap(hist.b) < hist.allocFrameBuffer {
734 if cap(frameHistCache) >= hist.allocFrameBuffer {
735 hist.b = frameHistCache
736 } else {
737 hist.b = make([]byte, 0, hist.allocFrameBuffer)
738 println("Alloc history sized", hist.allocFrameBuffer)
739 }
740 }
741 hist.b = hist.b[:0]
742 fcs = block.async.fcs
743 decodedFrame = 0
744 }
745 do := decodeOutput{err: block.err, d: block}
746 switch block.Type {
747 case blockTypeRLE:
748 if debugDecoder {
749 println("add rle block length:", block.RLESize)
750 }
751
752 if cap(block.dst) < int(block.RLESize) {
753 if block.lowMem {
754 block.dst = make([]byte, block.RLESize)
755 } else {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000756 block.dst = make([]byte, maxCompressedBlockSize)
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530757 }
758 }
759 block.dst = block.dst[:block.RLESize]
760 v := block.data[0]
761 for i := range block.dst {
762 block.dst[i] = v
763 }
764 hist.append(block.dst)
765 do.b = block.dst
766 case blockTypeRaw:
767 if debugDecoder {
768 println("add raw block length:", len(block.data))
769 }
770 hist.append(block.data)
771 do.b = block.data
772 case blockTypeCompressed:
773 if debugDecoder {
774 println("execute with history length:", len(hist.b), "window:", hist.windowSize)
775 }
776 hist.decoders.seqSize = block.async.seqSize
777 hist.decoders.literals = block.async.literals
778 do.err = block.executeSequences(&hist)
779 hasErr = do.err != nil
780 if debugDecoder && hasErr {
781 println("executeSequences returned:", do.err)
782 }
783 do.b = block.dst
784 }
785 if !hasErr {
786 decodedFrame += uint64(len(do.b))
787 if decodedFrame > fcs {
788 println("fcs exceeded", block.Last, fcs, decodedFrame)
789 do.err = ErrFrameSizeExceeded
790 hasErr = true
791 } else if block.Last && fcs != fcsUnknown && decodedFrame != fcs {
792 do.err = ErrFrameSizeMismatch
793 hasErr = true
khenaidood948f772021-08-11 17:49:24 -0400794 } else {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530795 if debugDecoder {
796 println("fcs ok", block.Last, fcs, decodedFrame)
797 }
khenaidood948f772021-08-11 17:49:24 -0400798 }
799 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530800 output <- do
801 }
802 close(output)
803 frameHistCache = hist.b
804 wg.Done()
805 if debugDecoder {
806 println("decoder goroutines finished")
807 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000808 hist.reset()
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530809 }()
810
Abhay Kumara2ae5992025-11-10 14:02:24 +0000811 var hist history
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530812decodeStream:
813 for {
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530814 var hasErr bool
Abhay Kumara2ae5992025-11-10 14:02:24 +0000815 hist.reset()
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530816 decodeBlock := func(block *blockDec) {
817 if hasErr {
818 if block != nil {
819 seqDecode <- block
khenaidood948f772021-08-11 17:49:24 -0400820 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530821 return
822 }
823 if block.err != nil || block.Type != blockTypeCompressed {
824 hasErr = block.err != nil
825 seqDecode <- block
826 return
827 }
828
829 remain, err := block.decodeLiterals(block.data, &hist)
830 block.err = err
831 hasErr = block.err != nil
832 if err == nil {
833 block.async.literals = hist.decoders.literals
834 block.async.seqData = remain
835 } else if debugDecoder {
836 println("decodeLiterals error:", err)
837 }
838 seqDecode <- block
839 }
840 frame := d.frame
841 if debugDecoder {
842 println("New frame...")
843 }
844 var historySent bool
845 frame.history.reset()
846 err := frame.reset(&br)
847 if debugDecoder && err != nil {
848 println("Frame decoder returned", err)
849 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000850 if err == nil {
851 err = d.setDict(frame)
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530852 }
853 if err == nil && d.frame.WindowSize > d.o.maxWindowSize {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000854 if debugDecoder {
855 println("decoder size exceeded, fws:", d.frame.WindowSize, "> mws:", d.o.maxWindowSize)
856 }
857
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530858 err = ErrDecoderSizeExceeded
859 }
860 if err != nil {
861 select {
862 case <-ctx.Done():
863 case dec := <-d.decoders:
864 dec.sendErr(err)
865 decodeBlock(dec)
866 }
867 break decodeStream
868 }
869
870 // Go through all blocks of the frame.
871 for {
872 var dec *blockDec
873 select {
874 case <-ctx.Done():
875 break decodeStream
876 case dec = <-d.decoders:
877 // Once we have a decoder, we MUST return it.
878 }
879 err := frame.next(dec)
880 if !historySent {
881 h := frame.history
882 if debugDecoder {
883 println("Alloc History:", h.allocFrameBuffer)
884 }
885 hist.reset()
886 if h.dict != nil {
887 hist.setDict(h.dict)
888 }
889 dec.async.newHist = &h
890 dec.async.fcs = frame.FrameContentSize
891 historySent = true
892 } else {
893 dec.async.newHist = nil
894 }
895 if debugDecoder && err != nil {
896 println("next block returned error:", err)
897 }
898 dec.err = err
Abhay Kumara2ae5992025-11-10 14:02:24 +0000899 dec.hasCRC = false
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530900 if dec.Last && frame.HasCheckSum && err == nil {
901 crc, err := frame.rawInput.readSmall(4)
Abhay Kumara2ae5992025-11-10 14:02:24 +0000902 if len(crc) < 4 {
903 if err == nil {
904 err = io.ErrUnexpectedEOF
905
906 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530907 println("CRC missing?", err)
908 dec.err = err
Abhay Kumara2ae5992025-11-10 14:02:24 +0000909 } else {
910 dec.checkCRC = binary.LittleEndian.Uint32(crc)
911 dec.hasCRC = true
912 if debugDecoder {
913 printf("found crc to check: %08x\n", dec.checkCRC)
914 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530915 }
916 }
917 err = dec.err
918 last := dec.Last
919 decodeBlock(dec)
920 if err != nil {
921 break decodeStream
922 }
923 if last {
khenaidood948f772021-08-11 17:49:24 -0400924 break
925 }
khenaidood948f772021-08-11 17:49:24 -0400926 }
khenaidood948f772021-08-11 17:49:24 -0400927 }
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530928 close(seqDecode)
929 wg.Wait()
Abhay Kumara2ae5992025-11-10 14:02:24 +0000930 hist.reset()
Akash Reddy Kankanalacf045372025-06-10 14:11:24 +0530931 d.frame.history.b = frameHistCache
khenaidood948f772021-08-11 17:49:24 -0400932}
Abhay Kumara2ae5992025-11-10 14:02:24 +0000933
934func (d *Decoder) setDict(frame *frameDec) (err error) {
935 dict, ok := d.dicts[frame.DictionaryID]
936 if ok {
937 if debugDecoder {
938 println("setting dict", frame.DictionaryID)
939 }
940 frame.history.setDict(dict)
941 } else if frame.DictionaryID != 0 {
942 // A zero or missing dictionary id is ambiguous:
943 // either dictionary zero, or no dictionary. In particular,
944 // zstd --patch-from uses this id for the source file,
945 // so only return an error if the dictionary id is not zero.
946 err = ErrUnknownDictionary
947 }
948 return err
949}