| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame] | 1 | /* |
| 2 | * |
| 3 | * Copyright 2024 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | package mem |
| 20 | |
| 21 | import ( |
| bseeniva | 0b9cbcb | 2026-02-12 19:11:11 +0530 | [diff] [blame^] | 22 | "fmt" |
| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame] | 23 | "io" |
| 24 | ) |
| 25 | |
| 26 | const ( |
| 27 | // 32 KiB is what io.Copy uses. |
| 28 | readAllBufSize = 32 * 1024 |
| 29 | ) |
| 30 | |
| 31 | // BufferSlice offers a means to represent data that spans one or more Buffer |
| 32 | // instances. A BufferSlice is meant to be immutable after creation, and methods |
| 33 | // like Ref create and return copies of the slice. This is why all methods have |
| 34 | // value receivers rather than pointer receivers. |
| 35 | // |
| 36 | // Note that any of the methods that read the underlying buffers such as Ref, |
| 37 | // Len or CopyTo etc., will panic if any underlying buffers have already been |
| 38 | // freed. It is recommended to not directly interact with any of the underlying |
| 39 | // buffers directly, rather such interactions should be mediated through the |
| 40 | // various methods on this type. |
| 41 | // |
| 42 | // By convention, any APIs that return (mem.BufferSlice, error) should reduce |
| 43 | // the burden on the caller by never returning a mem.BufferSlice that needs to |
| 44 | // be freed if the error is non-nil, unless explicitly stated. |
| 45 | type BufferSlice []Buffer |
| 46 | |
| 47 | // Len returns the sum of the length of all the Buffers in this slice. |
| 48 | // |
| 49 | // # Warning |
| 50 | // |
| 51 | // Invoking the built-in len on a BufferSlice will return the number of buffers |
| 52 | // in the slice, and *not* the value returned by this function. |
| 53 | func (s BufferSlice) Len() int { |
| 54 | var length int |
| 55 | for _, b := range s { |
| 56 | length += b.Len() |
| 57 | } |
| 58 | return length |
| 59 | } |
| 60 | |
| 61 | // Ref invokes Ref on each buffer in the slice. |
| 62 | func (s BufferSlice) Ref() { |
| 63 | for _, b := range s { |
| 64 | b.Ref() |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | // Free invokes Buffer.Free() on each Buffer in the slice. |
| 69 | func (s BufferSlice) Free() { |
| 70 | for _, b := range s { |
| 71 | b.Free() |
| 72 | } |
| 73 | } |
| 74 | |
| 75 | // CopyTo copies each of the underlying Buffer's data into the given buffer, |
| 76 | // returning the number of bytes copied. Has the same semantics as the copy |
| 77 | // builtin in that it will copy as many bytes as it can, stopping when either dst |
| 78 | // is full or s runs out of data, returning the minimum of s.Len() and len(dst). |
| 79 | func (s BufferSlice) CopyTo(dst []byte) int { |
| 80 | off := 0 |
| 81 | for _, b := range s { |
| 82 | off += copy(dst[off:], b.ReadOnlyData()) |
| 83 | } |
| 84 | return off |
| 85 | } |
| 86 | |
| 87 | // Materialize concatenates all the underlying Buffer's data into a single |
| 88 | // contiguous buffer using CopyTo. |
| 89 | func (s BufferSlice) Materialize() []byte { |
| 90 | l := s.Len() |
| 91 | if l == 0 { |
| 92 | return nil |
| 93 | } |
| 94 | out := make([]byte, l) |
| 95 | s.CopyTo(out) |
| 96 | return out |
| 97 | } |
| 98 | |
| 99 | // MaterializeToBuffer functions like Materialize except that it writes the data |
| 100 | // to a single Buffer pulled from the given BufferPool. |
| 101 | // |
| 102 | // As a special case, if the input BufferSlice only actually has one Buffer, this |
| 103 | // function simply increases the refcount before returning said Buffer. Freeing this |
| 104 | // buffer won't release it until the BufferSlice is itself released. |
| 105 | func (s BufferSlice) MaterializeToBuffer(pool BufferPool) Buffer { |
| 106 | if len(s) == 1 { |
| 107 | s[0].Ref() |
| 108 | return s[0] |
| 109 | } |
| 110 | sLen := s.Len() |
| 111 | if sLen == 0 { |
| 112 | return emptyBuffer{} |
| 113 | } |
| 114 | buf := pool.Get(sLen) |
| 115 | s.CopyTo(*buf) |
| 116 | return NewBuffer(buf, pool) |
| 117 | } |
| 118 | |
| 119 | // Reader returns a new Reader for the input slice after taking references to |
| 120 | // each underlying buffer. |
| bseeniva | 0b9cbcb | 2026-02-12 19:11:11 +0530 | [diff] [blame^] | 121 | func (s BufferSlice) Reader() *Reader { |
| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame] | 122 | s.Ref() |
| bseeniva | 0b9cbcb | 2026-02-12 19:11:11 +0530 | [diff] [blame^] | 123 | return &Reader{ |
| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame] | 124 | data: s, |
| 125 | len: s.Len(), |
| 126 | } |
| 127 | } |
| 128 | |
| 129 | // Reader exposes a BufferSlice's data as an io.Reader, allowing it to interface |
| bseeniva | 0b9cbcb | 2026-02-12 19:11:11 +0530 | [diff] [blame^] | 130 | // with other systems. |
| 131 | // |
| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame] | 132 | // Buffers will be freed as they are read. |
| bseeniva | 0b9cbcb | 2026-02-12 19:11:11 +0530 | [diff] [blame^] | 133 | // |
| 134 | // A Reader can be constructed from a BufferSlice; alternatively the zero value |
| 135 | // of a Reader may be used after calling Reset on it. |
| 136 | type Reader struct { |
| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame] | 137 | data BufferSlice |
| 138 | len int |
| 139 | // The index into data[0].ReadOnlyData(). |
| 140 | bufferIdx int |
| 141 | } |
| 142 | |
| bseeniva | 0b9cbcb | 2026-02-12 19:11:11 +0530 | [diff] [blame^] | 143 | // Remaining returns the number of unread bytes remaining in the slice. |
| 144 | func (r *Reader) Remaining() int { |
| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame] | 145 | return r.len |
| 146 | } |
| 147 | |
| bseeniva | 0b9cbcb | 2026-02-12 19:11:11 +0530 | [diff] [blame^] | 148 | // Reset frees the currently held buffer slice and starts reading from the |
| 149 | // provided slice. This allows reusing the reader object. |
| 150 | func (r *Reader) Reset(s BufferSlice) { |
| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame] | 151 | r.data.Free() |
| 152 | s.Ref() |
| 153 | r.data = s |
| 154 | r.len = s.Len() |
| 155 | r.bufferIdx = 0 |
| 156 | } |
| 157 | |
| bseeniva | 0b9cbcb | 2026-02-12 19:11:11 +0530 | [diff] [blame^] | 158 | // Close frees the underlying BufferSlice and never returns an error. Subsequent |
| 159 | // calls to Read will return (0, io.EOF). |
| 160 | func (r *Reader) Close() error { |
| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame] | 161 | r.data.Free() |
| 162 | r.data = nil |
| 163 | r.len = 0 |
| 164 | return nil |
| 165 | } |
| 166 | |
| bseeniva | 0b9cbcb | 2026-02-12 19:11:11 +0530 | [diff] [blame^] | 167 | func (r *Reader) freeFirstBufferIfEmpty() bool { |
| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame] | 168 | if len(r.data) == 0 || r.bufferIdx != len(r.data[0].ReadOnlyData()) { |
| 169 | return false |
| 170 | } |
| 171 | |
| 172 | r.data[0].Free() |
| 173 | r.data = r.data[1:] |
| 174 | r.bufferIdx = 0 |
| 175 | return true |
| 176 | } |
| 177 | |
| bseeniva | 0b9cbcb | 2026-02-12 19:11:11 +0530 | [diff] [blame^] | 178 | func (r *Reader) Read(buf []byte) (n int, _ error) { |
| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame] | 179 | if r.len == 0 { |
| 180 | return 0, io.EOF |
| 181 | } |
| 182 | |
| 183 | for len(buf) != 0 && r.len != 0 { |
| 184 | // Copy as much as possible from the first Buffer in the slice into the |
| 185 | // given byte slice. |
| 186 | data := r.data[0].ReadOnlyData() |
| 187 | copied := copy(buf, data[r.bufferIdx:]) |
| 188 | r.len -= copied // Reduce len by the number of bytes copied. |
| 189 | r.bufferIdx += copied // Increment the buffer index. |
| 190 | n += copied // Increment the total number of bytes read. |
| 191 | buf = buf[copied:] // Shrink the given byte slice. |
| 192 | |
| 193 | // If we have copied all the data from the first Buffer, free it and advance to |
| 194 | // the next in the slice. |
| 195 | r.freeFirstBufferIfEmpty() |
| 196 | } |
| 197 | |
| 198 | return n, nil |
| 199 | } |
| 200 | |
| bseeniva | 0b9cbcb | 2026-02-12 19:11:11 +0530 | [diff] [blame^] | 201 | // ReadByte reads a single byte. |
| 202 | func (r *Reader) ReadByte() (byte, error) { |
| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame] | 203 | if r.len == 0 { |
| 204 | return 0, io.EOF |
| 205 | } |
| 206 | |
| 207 | // There may be any number of empty buffers in the slice, clear them all until a |
| 208 | // non-empty buffer is reached. This is guaranteed to exit since r.len is not 0. |
| 209 | for r.freeFirstBufferIfEmpty() { |
| 210 | } |
| 211 | |
| 212 | b := r.data[0].ReadOnlyData()[r.bufferIdx] |
| 213 | r.len-- |
| 214 | r.bufferIdx++ |
| 215 | // Free the first buffer in the slice if the last byte was read |
| 216 | r.freeFirstBufferIfEmpty() |
| 217 | return b, nil |
| 218 | } |
| 219 | |
| 220 | var _ io.Writer = (*writer)(nil) |
| 221 | |
| 222 | type writer struct { |
| 223 | buffers *BufferSlice |
| 224 | pool BufferPool |
| 225 | } |
| 226 | |
| 227 | func (w *writer) Write(p []byte) (n int, err error) { |
| 228 | b := Copy(p, w.pool) |
| 229 | *w.buffers = append(*w.buffers, b) |
| 230 | return b.Len(), nil |
| 231 | } |
| 232 | |
| 233 | // NewWriter wraps the given BufferSlice and BufferPool to implement the |
| 234 | // io.Writer interface. Every call to Write copies the contents of the given |
| 235 | // buffer into a new Buffer pulled from the given pool and the Buffer is |
| 236 | // added to the given BufferSlice. |
| 237 | func NewWriter(buffers *BufferSlice, pool BufferPool) io.Writer { |
| 238 | return &writer{buffers: buffers, pool: pool} |
| 239 | } |
| 240 | |
| 241 | // ReadAll reads from r until an error or EOF and returns the data it read. |
| 242 | // A successful call returns err == nil, not err == EOF. Because ReadAll is |
| 243 | // defined to read from src until EOF, it does not treat an EOF from Read |
| 244 | // as an error to be reported. |
| 245 | // |
| 246 | // Important: A failed call returns a non-nil error and may also return |
| 247 | // partially read buffers. It is the responsibility of the caller to free the |
| 248 | // BufferSlice returned, or its memory will not be reused. |
| 249 | func ReadAll(r io.Reader, pool BufferPool) (BufferSlice, error) { |
| 250 | var result BufferSlice |
| 251 | if wt, ok := r.(io.WriterTo); ok { |
| 252 | // This is more optimal since wt knows the size of chunks it wants to |
| 253 | // write and, hence, we can allocate buffers of an optimal size to fit |
| 254 | // them. E.g. might be a single big chunk, and we wouldn't chop it |
| 255 | // into pieces. |
| 256 | w := NewWriter(&result, pool) |
| 257 | _, err := wt.WriteTo(w) |
| 258 | return result, err |
| 259 | } |
| 260 | nextBuffer: |
| 261 | for { |
| 262 | buf := pool.Get(readAllBufSize) |
| 263 | // We asked for 32KiB but may have been given a bigger buffer. |
| 264 | // Use all of it if that's the case. |
| 265 | *buf = (*buf)[:cap(*buf)] |
| 266 | usedCap := 0 |
| 267 | for { |
| 268 | n, err := r.Read((*buf)[usedCap:]) |
| 269 | usedCap += n |
| 270 | if err != nil { |
| 271 | if usedCap == 0 { |
| 272 | // Nothing in this buf, put it back |
| 273 | pool.Put(buf) |
| 274 | } else { |
| 275 | *buf = (*buf)[:usedCap] |
| 276 | result = append(result, NewBuffer(buf, pool)) |
| 277 | } |
| 278 | if err == io.EOF { |
| 279 | err = nil |
| 280 | } |
| 281 | return result, err |
| 282 | } |
| 283 | if len(*buf) == usedCap { |
| 284 | result = append(result, NewBuffer(buf, pool)) |
| 285 | continue nextBuffer |
| 286 | } |
| 287 | } |
| 288 | } |
| 289 | } |
| bseeniva | 0b9cbcb | 2026-02-12 19:11:11 +0530 | [diff] [blame^] | 290 | |
| 291 | // Discard skips the next n bytes, returning the number of bytes discarded. |
| 292 | // |
| 293 | // It frees buffers as they are fully consumed. |
| 294 | // |
| 295 | // If Discard skips fewer than n bytes, it also returns an error. |
| 296 | func (r *Reader) Discard(n int) (discarded int, err error) { |
| 297 | total := n |
| 298 | for n > 0 && r.len > 0 { |
| 299 | curData := r.data[0].ReadOnlyData() |
| 300 | curSize := min(n, len(curData)-r.bufferIdx) |
| 301 | n -= curSize |
| 302 | r.len -= curSize |
| 303 | r.bufferIdx += curSize |
| 304 | if r.bufferIdx >= len(curData) { |
| 305 | r.data[0].Free() |
| 306 | r.data = r.data[1:] |
| 307 | r.bufferIdx = 0 |
| 308 | } |
| 309 | } |
| 310 | discarded = total - n |
| 311 | if n > 0 { |
| 312 | return discarded, fmt.Errorf("insufficient bytes in reader") |
| 313 | } |
| 314 | return discarded, nil |
| 315 | } |
| 316 | |
| 317 | // Peek returns the next n bytes without advancing the reader. |
| 318 | // |
| 319 | // Peek appends results to the provided res slice and returns the updated slice. |
| 320 | // This pattern allows re-using the storage of res if it has sufficient |
| 321 | // capacity. |
| 322 | // |
| 323 | // The returned subslices are views into the underlying buffers and are only |
| 324 | // valid until the reader is advanced past the corresponding buffer. |
| 325 | // |
| 326 | // If Peek returns fewer than n bytes, it also returns an error. |
| 327 | func (r *Reader) Peek(n int, res [][]byte) ([][]byte, error) { |
| 328 | for i := 0; n > 0 && i < len(r.data); i++ { |
| 329 | curData := r.data[i].ReadOnlyData() |
| 330 | start := 0 |
| 331 | if i == 0 { |
| 332 | start = r.bufferIdx |
| 333 | } |
| 334 | curSize := min(n, len(curData)-start) |
| 335 | if curSize == 0 { |
| 336 | continue |
| 337 | } |
| 338 | res = append(res, curData[start:start+curSize]) |
| 339 | n -= curSize |
| 340 | } |
| 341 | if n > 0 { |
| 342 | return nil, fmt.Errorf("insufficient bytes in reader") |
| 343 | } |
| 344 | return res, nil |
| 345 | } |