blob: 084fb19c6d15c7b474e2ad37a663611bc79d1891 [file] [log] [blame]
Abhay Kumara61c5222025-11-10 07:32:50 +00001/*
2 *
3 * Copyright 2024 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package mem
20
21import (
bseeniva0b9cbcb2026-02-12 19:11:11 +053022 "fmt"
Abhay Kumara61c5222025-11-10 07:32:50 +000023 "io"
24)
25
26const (
27 // 32 KiB is what io.Copy uses.
28 readAllBufSize = 32 * 1024
29)
30
31// BufferSlice offers a means to represent data that spans one or more Buffer
32// instances. A BufferSlice is meant to be immutable after creation, and methods
33// like Ref create and return copies of the slice. This is why all methods have
34// value receivers rather than pointer receivers.
35//
36// Note that any of the methods that read the underlying buffers such as Ref,
37// Len or CopyTo etc., will panic if any underlying buffers have already been
38// freed. It is recommended to not directly interact with any of the underlying
39// buffers directly, rather such interactions should be mediated through the
40// various methods on this type.
41//
42// By convention, any APIs that return (mem.BufferSlice, error) should reduce
43// the burden on the caller by never returning a mem.BufferSlice that needs to
44// be freed if the error is non-nil, unless explicitly stated.
45type BufferSlice []Buffer
46
47// Len returns the sum of the length of all the Buffers in this slice.
48//
49// # Warning
50//
51// Invoking the built-in len on a BufferSlice will return the number of buffers
52// in the slice, and *not* the value returned by this function.
53func (s BufferSlice) Len() int {
54 var length int
55 for _, b := range s {
56 length += b.Len()
57 }
58 return length
59}
60
61// Ref invokes Ref on each buffer in the slice.
62func (s BufferSlice) Ref() {
63 for _, b := range s {
64 b.Ref()
65 }
66}
67
68// Free invokes Buffer.Free() on each Buffer in the slice.
69func (s BufferSlice) Free() {
70 for _, b := range s {
71 b.Free()
72 }
73}
74
75// CopyTo copies each of the underlying Buffer's data into the given buffer,
76// returning the number of bytes copied. Has the same semantics as the copy
77// builtin in that it will copy as many bytes as it can, stopping when either dst
78// is full or s runs out of data, returning the minimum of s.Len() and len(dst).
79func (s BufferSlice) CopyTo(dst []byte) int {
80 off := 0
81 for _, b := range s {
82 off += copy(dst[off:], b.ReadOnlyData())
83 }
84 return off
85}
86
87// Materialize concatenates all the underlying Buffer's data into a single
88// contiguous buffer using CopyTo.
89func (s BufferSlice) Materialize() []byte {
90 l := s.Len()
91 if l == 0 {
92 return nil
93 }
94 out := make([]byte, l)
95 s.CopyTo(out)
96 return out
97}
98
99// MaterializeToBuffer functions like Materialize except that it writes the data
100// to a single Buffer pulled from the given BufferPool.
101//
102// As a special case, if the input BufferSlice only actually has one Buffer, this
103// function simply increases the refcount before returning said Buffer. Freeing this
104// buffer won't release it until the BufferSlice is itself released.
105func (s BufferSlice) MaterializeToBuffer(pool BufferPool) Buffer {
106 if len(s) == 1 {
107 s[0].Ref()
108 return s[0]
109 }
110 sLen := s.Len()
111 if sLen == 0 {
112 return emptyBuffer{}
113 }
114 buf := pool.Get(sLen)
115 s.CopyTo(*buf)
116 return NewBuffer(buf, pool)
117}
118
119// Reader returns a new Reader for the input slice after taking references to
120// each underlying buffer.
bseeniva0b9cbcb2026-02-12 19:11:11 +0530121func (s BufferSlice) Reader() *Reader {
Abhay Kumara61c5222025-11-10 07:32:50 +0000122 s.Ref()
bseeniva0b9cbcb2026-02-12 19:11:11 +0530123 return &Reader{
Abhay Kumara61c5222025-11-10 07:32:50 +0000124 data: s,
125 len: s.Len(),
126 }
127}
128
129// Reader exposes a BufferSlice's data as an io.Reader, allowing it to interface
bseeniva0b9cbcb2026-02-12 19:11:11 +0530130// with other systems.
131//
Abhay Kumara61c5222025-11-10 07:32:50 +0000132// Buffers will be freed as they are read.
bseeniva0b9cbcb2026-02-12 19:11:11 +0530133//
134// A Reader can be constructed from a BufferSlice; alternatively the zero value
135// of a Reader may be used after calling Reset on it.
136type Reader struct {
Abhay Kumara61c5222025-11-10 07:32:50 +0000137 data BufferSlice
138 len int
139 // The index into data[0].ReadOnlyData().
140 bufferIdx int
141}
142
bseeniva0b9cbcb2026-02-12 19:11:11 +0530143// Remaining returns the number of unread bytes remaining in the slice.
144func (r *Reader) Remaining() int {
Abhay Kumara61c5222025-11-10 07:32:50 +0000145 return r.len
146}
147
bseeniva0b9cbcb2026-02-12 19:11:11 +0530148// Reset frees the currently held buffer slice and starts reading from the
149// provided slice. This allows reusing the reader object.
150func (r *Reader) Reset(s BufferSlice) {
Abhay Kumara61c5222025-11-10 07:32:50 +0000151 r.data.Free()
152 s.Ref()
153 r.data = s
154 r.len = s.Len()
155 r.bufferIdx = 0
156}
157
bseeniva0b9cbcb2026-02-12 19:11:11 +0530158// Close frees the underlying BufferSlice and never returns an error. Subsequent
159// calls to Read will return (0, io.EOF).
160func (r *Reader) Close() error {
Abhay Kumara61c5222025-11-10 07:32:50 +0000161 r.data.Free()
162 r.data = nil
163 r.len = 0
164 return nil
165}
166
bseeniva0b9cbcb2026-02-12 19:11:11 +0530167func (r *Reader) freeFirstBufferIfEmpty() bool {
Abhay Kumara61c5222025-11-10 07:32:50 +0000168 if len(r.data) == 0 || r.bufferIdx != len(r.data[0].ReadOnlyData()) {
169 return false
170 }
171
172 r.data[0].Free()
173 r.data = r.data[1:]
174 r.bufferIdx = 0
175 return true
176}
177
bseeniva0b9cbcb2026-02-12 19:11:11 +0530178func (r *Reader) Read(buf []byte) (n int, _ error) {
Abhay Kumara61c5222025-11-10 07:32:50 +0000179 if r.len == 0 {
180 return 0, io.EOF
181 }
182
183 for len(buf) != 0 && r.len != 0 {
184 // Copy as much as possible from the first Buffer in the slice into the
185 // given byte slice.
186 data := r.data[0].ReadOnlyData()
187 copied := copy(buf, data[r.bufferIdx:])
188 r.len -= copied // Reduce len by the number of bytes copied.
189 r.bufferIdx += copied // Increment the buffer index.
190 n += copied // Increment the total number of bytes read.
191 buf = buf[copied:] // Shrink the given byte slice.
192
193 // If we have copied all the data from the first Buffer, free it and advance to
194 // the next in the slice.
195 r.freeFirstBufferIfEmpty()
196 }
197
198 return n, nil
199}
200
bseeniva0b9cbcb2026-02-12 19:11:11 +0530201// ReadByte reads a single byte.
202func (r *Reader) ReadByte() (byte, error) {
Abhay Kumara61c5222025-11-10 07:32:50 +0000203 if r.len == 0 {
204 return 0, io.EOF
205 }
206
207 // There may be any number of empty buffers in the slice, clear them all until a
208 // non-empty buffer is reached. This is guaranteed to exit since r.len is not 0.
209 for r.freeFirstBufferIfEmpty() {
210 }
211
212 b := r.data[0].ReadOnlyData()[r.bufferIdx]
213 r.len--
214 r.bufferIdx++
215 // Free the first buffer in the slice if the last byte was read
216 r.freeFirstBufferIfEmpty()
217 return b, nil
218}
219
220var _ io.Writer = (*writer)(nil)
221
222type writer struct {
223 buffers *BufferSlice
224 pool BufferPool
225}
226
227func (w *writer) Write(p []byte) (n int, err error) {
228 b := Copy(p, w.pool)
229 *w.buffers = append(*w.buffers, b)
230 return b.Len(), nil
231}
232
233// NewWriter wraps the given BufferSlice and BufferPool to implement the
234// io.Writer interface. Every call to Write copies the contents of the given
235// buffer into a new Buffer pulled from the given pool and the Buffer is
236// added to the given BufferSlice.
237func NewWriter(buffers *BufferSlice, pool BufferPool) io.Writer {
238 return &writer{buffers: buffers, pool: pool}
239}
240
241// ReadAll reads from r until an error or EOF and returns the data it read.
242// A successful call returns err == nil, not err == EOF. Because ReadAll is
243// defined to read from src until EOF, it does not treat an EOF from Read
244// as an error to be reported.
245//
246// Important: A failed call returns a non-nil error and may also return
247// partially read buffers. It is the responsibility of the caller to free the
248// BufferSlice returned, or its memory will not be reused.
249func ReadAll(r io.Reader, pool BufferPool) (BufferSlice, error) {
250 var result BufferSlice
251 if wt, ok := r.(io.WriterTo); ok {
252 // This is more optimal since wt knows the size of chunks it wants to
253 // write and, hence, we can allocate buffers of an optimal size to fit
254 // them. E.g. might be a single big chunk, and we wouldn't chop it
255 // into pieces.
256 w := NewWriter(&result, pool)
257 _, err := wt.WriteTo(w)
258 return result, err
259 }
260nextBuffer:
261 for {
262 buf := pool.Get(readAllBufSize)
263 // We asked for 32KiB but may have been given a bigger buffer.
264 // Use all of it if that's the case.
265 *buf = (*buf)[:cap(*buf)]
266 usedCap := 0
267 for {
268 n, err := r.Read((*buf)[usedCap:])
269 usedCap += n
270 if err != nil {
271 if usedCap == 0 {
272 // Nothing in this buf, put it back
273 pool.Put(buf)
274 } else {
275 *buf = (*buf)[:usedCap]
276 result = append(result, NewBuffer(buf, pool))
277 }
278 if err == io.EOF {
279 err = nil
280 }
281 return result, err
282 }
283 if len(*buf) == usedCap {
284 result = append(result, NewBuffer(buf, pool))
285 continue nextBuffer
286 }
287 }
288 }
289}
bseeniva0b9cbcb2026-02-12 19:11:11 +0530290
291// Discard skips the next n bytes, returning the number of bytes discarded.
292//
293// It frees buffers as they are fully consumed.
294//
295// If Discard skips fewer than n bytes, it also returns an error.
296func (r *Reader) Discard(n int) (discarded int, err error) {
297 total := n
298 for n > 0 && r.len > 0 {
299 curData := r.data[0].ReadOnlyData()
300 curSize := min(n, len(curData)-r.bufferIdx)
301 n -= curSize
302 r.len -= curSize
303 r.bufferIdx += curSize
304 if r.bufferIdx >= len(curData) {
305 r.data[0].Free()
306 r.data = r.data[1:]
307 r.bufferIdx = 0
308 }
309 }
310 discarded = total - n
311 if n > 0 {
312 return discarded, fmt.Errorf("insufficient bytes in reader")
313 }
314 return discarded, nil
315}
316
317// Peek returns the next n bytes without advancing the reader.
318//
319// Peek appends results to the provided res slice and returns the updated slice.
320// This pattern allows re-using the storage of res if it has sufficient
321// capacity.
322//
323// The returned subslices are views into the underlying buffers and are only
324// valid until the reader is advanced past the corresponding buffer.
325//
326// If Peek returns fewer than n bytes, it also returns an error.
327func (r *Reader) Peek(n int, res [][]byte) ([][]byte, error) {
328 for i := 0; n > 0 && i < len(r.data); i++ {
329 curData := r.data[i].ReadOnlyData()
330 start := 0
331 if i == 0 {
332 start = r.bufferIdx
333 }
334 curSize := min(n, len(curData)-start)
335 if curSize == 0 {
336 continue
337 }
338 res = append(res, curData[start:start+curSize])
339 n -= curSize
340 }
341 if n > 0 {
342 return nil, fmt.Errorf("insufficient bytes in reader")
343 }
344 return res, nil
345}