blob: 8df0dc76d0072f6deca6dec4db9b8d859a9c1c1b [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package lz4
2
3import (
4 "errors"
5 "io"
6
7 "github.com/pierrec/lz4/v4/internal/lz4block"
8 "github.com/pierrec/lz4/v4/internal/lz4errors"
9 "github.com/pierrec/lz4/v4/internal/lz4stream"
10)
11
12type crState int
13
14const (
15 crStateInitial crState = iota
16 crStateReading
17 crStateFlushing
18 crStateDone
19)
20
21type CompressingReader struct {
22 state crState
23 src io.ReadCloser // source reader
24 level lz4block.CompressionLevel // how hard to try
25 frame *lz4stream.Frame // frame being built
26 in []byte
27 out ovWriter
28 handler func(int)
29}
30
31// NewCompressingReader creates a reader which reads compressed data from
32// raw stream. This makes it a logical opposite of a normal lz4.Reader.
33// We require an io.ReadCloser as an underlying source for compatibility
34// with Go's http.Request.
35func NewCompressingReader(src io.ReadCloser) *CompressingReader {
36 zrd := &CompressingReader {
37 frame: lz4stream.NewFrame(),
38 }
39
40 _ = zrd.Apply(DefaultBlockSizeOption, DefaultChecksumOption, defaultOnBlockDone)
41 zrd.Reset(src)
42
43 return zrd
44}
45
46// Source exposes the underlying source stream for introspection and control.
47func (zrd *CompressingReader) Source() io.ReadCloser {
48 return zrd.src
49}
50
51// Close simply invokes the underlying stream Close method. This method is
52// provided for the benefit of Go http client/server, which relies on Close
53// for goroutine termination.
54func (zrd *CompressingReader) Close() error {
55 return zrd.src.Close()
56}
57
58// Apply applies useful options to the lz4 encoder.
59func (zrd *CompressingReader) Apply(options ...Option) (err error) {
60 if zrd.state != crStateInitial {
61 return lz4errors.ErrOptionClosedOrError
62 }
63
64 zrd.Reset(zrd.src)
65
66 for _, o := range options {
67 if err = o(zrd); err != nil {
68 return
69 }
70 }
71 return
72}
73
74func (*CompressingReader) private() {}
75
76func (zrd *CompressingReader) init() error {
77 zrd.frame.InitW(&zrd.out, 1, false)
78 size := zrd.frame.Descriptor.Flags.BlockSizeIndex()
79 zrd.in = size.Get()
80 return zrd.frame.Descriptor.Write(zrd.frame, &zrd.out)
81}
82
83// Read allows reading of lz4 compressed data
84func (zrd *CompressingReader) Read(p []byte) (n int, err error) {
85 defer func() {
86 if err != nil {
87 zrd.state = crStateDone
88 }
89 }()
90
91 if !zrd.out.reset(p) {
92 return len(p), nil
93 }
94
95 switch zrd.state {
96 case crStateInitial:
97 err = zrd.init()
98 if err != nil {
99 return
100 }
101 zrd.state = crStateReading
102 case crStateDone:
103 return 0, errors.New("This reader is done")
104 case crStateFlushing:
105 if zrd.out.dataPos > 0 {
106 n = zrd.out.dataPos
107 zrd.out.data = nil
108 zrd.out.dataPos = 0
109 return
110 } else {
111 zrd.state = crStateDone
112 return 0, io.EOF
113 }
114 }
115
116 for zrd.state == crStateReading {
117 block := zrd.frame.Blocks.Block
118
119 var rCount int
120 rCount, err = io.ReadFull(zrd.src, zrd.in)
121 switch err {
122 case nil:
123 err = block.Compress(
124 zrd.frame, zrd.in[ : rCount], zrd.level,
125 ).Write(zrd.frame, &zrd.out)
126 zrd.handler(len(block.Data))
127 if err != nil {
128 return
129 }
130
131 if zrd.out.dataPos == len(zrd.out.data) {
132 n = zrd.out.dataPos
133 zrd.out.dataPos = 0
134 zrd.out.data = nil
135 return
136 }
137 case io.EOF, io.ErrUnexpectedEOF: // read may be partial
138 if rCount > 0 {
139 err = block.Compress(
140 zrd.frame, zrd.in[ : rCount], zrd.level,
141 ).Write(zrd.frame, &zrd.out)
142 zrd.handler(len(block.Data))
143 if err != nil {
144 return
145 }
146 }
147
148 err = zrd.frame.CloseW(&zrd.out, 1)
149 if err != nil {
150 return
151 }
152 zrd.state = crStateFlushing
153
154 n = zrd.out.dataPos
155 zrd.out.dataPos = 0
156 zrd.out.data = nil
157 return
158 default:
159 return
160 }
161 }
162
163 err = lz4errors.ErrInternalUnhandledState
164 return
165}
166
167// Reset makes the stream usable again; mostly handy to reuse lz4 encoder
168// instances.
169func (zrd *CompressingReader) Reset(src io.ReadCloser) {
170 zrd.frame.Reset(1)
171 zrd.state = crStateInitial
172 zrd.src = src
173 zrd.out.clear()
174}
175
176type ovWriter struct {
177 data []byte
178 ov []byte
179 dataPos int
180 ovPos int
181}
182
183func (wr *ovWriter) Write(p []byte) (n int, err error) {
184 count := copy(wr.data[wr.dataPos : ], p)
185 wr.dataPos += count
186
187 if count < len(p) {
188 wr.ov = append(wr.ov, p[count : ]...)
189 }
190
191 return len(p), nil
192}
193
194func (wr *ovWriter) reset(out []byte) bool {
195 ovRem := len(wr.ov) - wr.ovPos
196
197 if ovRem >= len(out) {
198 wr.ovPos += copy(out, wr.ov[wr.ovPos : ])
199 return false
200 }
201
202 if ovRem > 0 {
203 copy(out, wr.ov[wr.ovPos : ])
204 wr.ov = wr.ov[ : 0]
205 wr.ovPos = 0
206 wr.dataPos = ovRem
207 } else if wr.ovPos > 0 {
208 wr.ov = wr.ov[ : 0]
209 wr.ovPos = 0
210 wr.dataPos = 0
211 }
212
213 wr.data = out
214 return true
215}
216
217func (wr *ovWriter) clear() {
218 wr.data = nil
219 wr.dataPos = 0
220 wr.ov = wr.ov[ : 0]
221 wr.ovPos = 0
222}