| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 1 | /* |
| 2 | * |
| 3 | * Copyright 2024 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | package mem |
| 20 | |
| 21 | import ( |
| 22 | "io" |
| 23 | ) |
| 24 | |
| 25 | const ( |
| 26 | // 32 KiB is what io.Copy uses. |
| 27 | readAllBufSize = 32 * 1024 |
| 28 | ) |
| 29 | |
| 30 | // BufferSlice offers a means to represent data that spans one or more Buffer |
| 31 | // instances. A BufferSlice is meant to be immutable after creation, and methods |
| 32 | // like Ref create and return copies of the slice. This is why all methods have |
| 33 | // value receivers rather than pointer receivers. |
| 34 | // |
| 35 | // Note that any of the methods that read the underlying buffers such as Ref, |
| 36 | // Len or CopyTo etc., will panic if any underlying buffers have already been |
| 37 | // freed. It is recommended to not directly interact with any of the underlying |
| 38 | // buffers directly, rather such interactions should be mediated through the |
| 39 | // various methods on this type. |
| 40 | // |
| 41 | // By convention, any APIs that return (mem.BufferSlice, error) should reduce |
| 42 | // the burden on the caller by never returning a mem.BufferSlice that needs to |
| 43 | // be freed if the error is non-nil, unless explicitly stated. |
| 44 | type BufferSlice []Buffer |
| 45 | |
| 46 | // Len returns the sum of the length of all the Buffers in this slice. |
| 47 | // |
| 48 | // # Warning |
| 49 | // |
| 50 | // Invoking the built-in len on a BufferSlice will return the number of buffers |
| 51 | // in the slice, and *not* the value returned by this function. |
| 52 | func (s BufferSlice) Len() int { |
| 53 | var length int |
| 54 | for _, b := range s { |
| 55 | length += b.Len() |
| 56 | } |
| 57 | return length |
| 58 | } |
| 59 | |
| 60 | // Ref invokes Ref on each buffer in the slice. |
| 61 | func (s BufferSlice) Ref() { |
| 62 | for _, b := range s { |
| 63 | b.Ref() |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | // Free invokes Buffer.Free() on each Buffer in the slice. |
| 68 | func (s BufferSlice) Free() { |
| 69 | for _, b := range s { |
| 70 | b.Free() |
| 71 | } |
| 72 | } |
| 73 | |
| 74 | // CopyTo copies each of the underlying Buffer's data into the given buffer, |
| 75 | // returning the number of bytes copied. Has the same semantics as the copy |
| 76 | // builtin in that it will copy as many bytes as it can, stopping when either dst |
| 77 | // is full or s runs out of data, returning the minimum of s.Len() and len(dst). |
| 78 | func (s BufferSlice) CopyTo(dst []byte) int { |
| 79 | off := 0 |
| 80 | for _, b := range s { |
| 81 | off += copy(dst[off:], b.ReadOnlyData()) |
| 82 | } |
| 83 | return off |
| 84 | } |
| 85 | |
| 86 | // Materialize concatenates all the underlying Buffer's data into a single |
| 87 | // contiguous buffer using CopyTo. |
| 88 | func (s BufferSlice) Materialize() []byte { |
| 89 | l := s.Len() |
| 90 | if l == 0 { |
| 91 | return nil |
| 92 | } |
| 93 | out := make([]byte, l) |
| 94 | s.CopyTo(out) |
| 95 | return out |
| 96 | } |
| 97 | |
| 98 | // MaterializeToBuffer functions like Materialize except that it writes the data |
| 99 | // to a single Buffer pulled from the given BufferPool. |
| 100 | // |
| 101 | // As a special case, if the input BufferSlice only actually has one Buffer, this |
| 102 | // function simply increases the refcount before returning said Buffer. Freeing this |
| 103 | // buffer won't release it until the BufferSlice is itself released. |
| 104 | func (s BufferSlice) MaterializeToBuffer(pool BufferPool) Buffer { |
| 105 | if len(s) == 1 { |
| 106 | s[0].Ref() |
| 107 | return s[0] |
| 108 | } |
| 109 | sLen := s.Len() |
| 110 | if sLen == 0 { |
| 111 | return emptyBuffer{} |
| 112 | } |
| 113 | buf := pool.Get(sLen) |
| 114 | s.CopyTo(*buf) |
| 115 | return NewBuffer(buf, pool) |
| 116 | } |
| 117 | |
| 118 | // Reader returns a new Reader for the input slice after taking references to |
| 119 | // each underlying buffer. |
| 120 | func (s BufferSlice) Reader() Reader { |
| 121 | s.Ref() |
| 122 | return &sliceReader{ |
| 123 | data: s, |
| 124 | len: s.Len(), |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | // Reader exposes a BufferSlice's data as an io.Reader, allowing it to interface |
| 129 | // with other parts systems. It also provides an additional convenience method |
| 130 | // Remaining(), which returns the number of unread bytes remaining in the slice. |
| 131 | // Buffers will be freed as they are read. |
| 132 | type Reader interface { |
| 133 | io.Reader |
| 134 | io.ByteReader |
| 135 | // Close frees the underlying BufferSlice and never returns an error. Subsequent |
| 136 | // calls to Read will return (0, io.EOF). |
| 137 | Close() error |
| 138 | // Remaining returns the number of unread bytes remaining in the slice. |
| 139 | Remaining() int |
| 140 | // Reset frees the currently held buffer slice and starts reading from the |
| 141 | // provided slice. This allows reusing the reader object. |
| 142 | Reset(s BufferSlice) |
| 143 | } |
| 144 | |
| 145 | type sliceReader struct { |
| 146 | data BufferSlice |
| 147 | len int |
| 148 | // The index into data[0].ReadOnlyData(). |
| 149 | bufferIdx int |
| 150 | } |
| 151 | |
| 152 | func (r *sliceReader) Remaining() int { |
| 153 | return r.len |
| 154 | } |
| 155 | |
| 156 | func (r *sliceReader) Reset(s BufferSlice) { |
| 157 | r.data.Free() |
| 158 | s.Ref() |
| 159 | r.data = s |
| 160 | r.len = s.Len() |
| 161 | r.bufferIdx = 0 |
| 162 | } |
| 163 | |
| 164 | func (r *sliceReader) Close() error { |
| 165 | r.data.Free() |
| 166 | r.data = nil |
| 167 | r.len = 0 |
| 168 | return nil |
| 169 | } |
| 170 | |
| 171 | func (r *sliceReader) freeFirstBufferIfEmpty() bool { |
| 172 | if len(r.data) == 0 || r.bufferIdx != len(r.data[0].ReadOnlyData()) { |
| 173 | return false |
| 174 | } |
| 175 | |
| 176 | r.data[0].Free() |
| 177 | r.data = r.data[1:] |
| 178 | r.bufferIdx = 0 |
| 179 | return true |
| 180 | } |
| 181 | |
| 182 | func (r *sliceReader) Read(buf []byte) (n int, _ error) { |
| 183 | if r.len == 0 { |
| 184 | return 0, io.EOF |
| 185 | } |
| 186 | |
| 187 | for len(buf) != 0 && r.len != 0 { |
| 188 | // Copy as much as possible from the first Buffer in the slice into the |
| 189 | // given byte slice. |
| 190 | data := r.data[0].ReadOnlyData() |
| 191 | copied := copy(buf, data[r.bufferIdx:]) |
| 192 | r.len -= copied // Reduce len by the number of bytes copied. |
| 193 | r.bufferIdx += copied // Increment the buffer index. |
| 194 | n += copied // Increment the total number of bytes read. |
| 195 | buf = buf[copied:] // Shrink the given byte slice. |
| 196 | |
| 197 | // If we have copied all the data from the first Buffer, free it and advance to |
| 198 | // the next in the slice. |
| 199 | r.freeFirstBufferIfEmpty() |
| 200 | } |
| 201 | |
| 202 | return n, nil |
| 203 | } |
| 204 | |
| 205 | func (r *sliceReader) ReadByte() (byte, error) { |
| 206 | if r.len == 0 { |
| 207 | return 0, io.EOF |
| 208 | } |
| 209 | |
| 210 | // There may be any number of empty buffers in the slice, clear them all until a |
| 211 | // non-empty buffer is reached. This is guaranteed to exit since r.len is not 0. |
| 212 | for r.freeFirstBufferIfEmpty() { |
| 213 | } |
| 214 | |
| 215 | b := r.data[0].ReadOnlyData()[r.bufferIdx] |
| 216 | r.len-- |
| 217 | r.bufferIdx++ |
| 218 | // Free the first buffer in the slice if the last byte was read |
| 219 | r.freeFirstBufferIfEmpty() |
| 220 | return b, nil |
| 221 | } |
| 222 | |
| 223 | var _ io.Writer = (*writer)(nil) |
| 224 | |
| 225 | type writer struct { |
| 226 | buffers *BufferSlice |
| 227 | pool BufferPool |
| 228 | } |
| 229 | |
| 230 | func (w *writer) Write(p []byte) (n int, err error) { |
| 231 | b := Copy(p, w.pool) |
| 232 | *w.buffers = append(*w.buffers, b) |
| 233 | return b.Len(), nil |
| 234 | } |
| 235 | |
| 236 | // NewWriter wraps the given BufferSlice and BufferPool to implement the |
| 237 | // io.Writer interface. Every call to Write copies the contents of the given |
| 238 | // buffer into a new Buffer pulled from the given pool and the Buffer is |
| 239 | // added to the given BufferSlice. |
| 240 | func NewWriter(buffers *BufferSlice, pool BufferPool) io.Writer { |
| 241 | return &writer{buffers: buffers, pool: pool} |
| 242 | } |
| 243 | |
| 244 | // ReadAll reads from r until an error or EOF and returns the data it read. |
| 245 | // A successful call returns err == nil, not err == EOF. Because ReadAll is |
| 246 | // defined to read from src until EOF, it does not treat an EOF from Read |
| 247 | // as an error to be reported. |
| 248 | // |
| 249 | // Important: A failed call returns a non-nil error and may also return |
| 250 | // partially read buffers. It is the responsibility of the caller to free the |
| 251 | // BufferSlice returned, or its memory will not be reused. |
| 252 | func ReadAll(r io.Reader, pool BufferPool) (BufferSlice, error) { |
| 253 | var result BufferSlice |
| 254 | if wt, ok := r.(io.WriterTo); ok { |
| 255 | // This is more optimal since wt knows the size of chunks it wants to |
| 256 | // write and, hence, we can allocate buffers of an optimal size to fit |
| 257 | // them. E.g. might be a single big chunk, and we wouldn't chop it |
| 258 | // into pieces. |
| 259 | w := NewWriter(&result, pool) |
| 260 | _, err := wt.WriteTo(w) |
| 261 | return result, err |
| 262 | } |
| 263 | nextBuffer: |
| 264 | for { |
| 265 | buf := pool.Get(readAllBufSize) |
| 266 | // We asked for 32KiB but may have been given a bigger buffer. |
| 267 | // Use all of it if that's the case. |
| 268 | *buf = (*buf)[:cap(*buf)] |
| 269 | usedCap := 0 |
| 270 | for { |
| 271 | n, err := r.Read((*buf)[usedCap:]) |
| 272 | usedCap += n |
| 273 | if err != nil { |
| 274 | if usedCap == 0 { |
| 275 | // Nothing in this buf, put it back |
| 276 | pool.Put(buf) |
| 277 | } else { |
| 278 | *buf = (*buf)[:usedCap] |
| 279 | result = append(result, NewBuffer(buf, pool)) |
| 280 | } |
| 281 | if err == io.EOF { |
| 282 | err = nil |
| 283 | } |
| 284 | return result, err |
| 285 | } |
| 286 | if len(*buf) == usedCap { |
| 287 | result = append(result, NewBuffer(buf, pool)) |
| 288 | continue nextBuffer |
| 289 | } |
| 290 | } |
| 291 | } |
| 292 | } |