| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 1 | /* |
| 2 | * |
| 3 | * Copyright 2024 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | // Package mem provides utilities that facilitate memory reuse in byte slices |
| 20 | // that are used as buffers. |
| 21 | // |
| 22 | // # Experimental |
| 23 | // |
| 24 | // Notice: All APIs in this package are EXPERIMENTAL and may be changed or |
| 25 | // removed in a later release. |
| 26 | package mem |
| 27 | |
| 28 | import ( |
| 29 | "fmt" |
| 30 | "sync" |
| 31 | "sync/atomic" |
| 32 | ) |
| 33 | |
| 34 | // A Buffer represents a reference counted piece of data (in bytes) that can be |
| 35 | // acquired by a call to NewBuffer() or Copy(). A reference to a Buffer may be |
| 36 | // released by calling Free(), which invokes the free function given at creation |
| 37 | // only after all references are released. |
| 38 | // |
| 39 | // Note that a Buffer is not safe for concurrent access and instead each |
| 40 | // goroutine should use its own reference to the data, which can be acquired via |
| 41 | // a call to Ref(). |
| 42 | // |
| 43 | // Attempts to access the underlying data after releasing the reference to the |
| 44 | // Buffer will panic. |
| 45 | type Buffer interface { |
| 46 | // ReadOnlyData returns the underlying byte slice. Note that it is undefined |
| 47 | // behavior to modify the contents of this slice in any way. |
| 48 | ReadOnlyData() []byte |
| 49 | // Ref increases the reference counter for this Buffer. |
| 50 | Ref() |
| 51 | // Free decrements this Buffer's reference counter and frees the underlying |
| 52 | // byte slice if the counter reaches 0 as a result of this call. |
| 53 | Free() |
| 54 | // Len returns the Buffer's size. |
| 55 | Len() int |
| 56 | |
| 57 | split(n int) (left, right Buffer) |
| 58 | read(buf []byte) (int, Buffer) |
| 59 | } |
| 60 | |
| 61 | var ( |
| 62 | bufferPoolingThreshold = 1 << 10 |
| 63 | |
| 64 | bufferObjectPool = sync.Pool{New: func() any { return new(buffer) }} |
| 65 | refObjectPool = sync.Pool{New: func() any { return new(atomic.Int32) }} |
| 66 | ) |
| 67 | |
| 68 | // IsBelowBufferPoolingThreshold returns true if the given size is less than or |
| 69 | // equal to the threshold for buffer pooling. This is used to determine whether |
| 70 | // to pool buffers or allocate them directly. |
| 71 | func IsBelowBufferPoolingThreshold(size int) bool { |
| 72 | return size <= bufferPoolingThreshold |
| 73 | } |
| 74 | |
| 75 | type buffer struct { |
| 76 | origData *[]byte |
| 77 | data []byte |
| 78 | refs *atomic.Int32 |
| 79 | pool BufferPool |
| 80 | } |
| 81 | |
| 82 | func newBuffer() *buffer { |
| 83 | return bufferObjectPool.Get().(*buffer) |
| 84 | } |
| 85 | |
| 86 | // NewBuffer creates a new Buffer from the given data, initializing the reference |
| 87 | // counter to 1. The data will then be returned to the given pool when all |
| 88 | // references to the returned Buffer are released. As a special case to avoid |
| 89 | // additional allocations, if the given buffer pool is nil, the returned buffer |
| 90 | // will be a "no-op" Buffer where invoking Buffer.Free() does nothing and the |
| 91 | // underlying data is never freed. |
| 92 | // |
| 93 | // Note that the backing array of the given data is not copied. |
| 94 | func NewBuffer(data *[]byte, pool BufferPool) Buffer { |
| 95 | // Use the buffer's capacity instead of the length, otherwise buffers may |
| 96 | // not be reused under certain conditions. For example, if a large buffer |
| 97 | // is acquired from the pool, but fewer bytes than the buffering threshold |
| 98 | // are written to it, the buffer will not be returned to the pool. |
| 99 | if pool == nil || IsBelowBufferPoolingThreshold(cap(*data)) { |
| 100 | return (SliceBuffer)(*data) |
| 101 | } |
| 102 | b := newBuffer() |
| 103 | b.origData = data |
| 104 | b.data = *data |
| 105 | b.pool = pool |
| 106 | b.refs = refObjectPool.Get().(*atomic.Int32) |
| 107 | b.refs.Add(1) |
| 108 | return b |
| 109 | } |
| 110 | |
| 111 | // Copy creates a new Buffer from the given data, initializing the reference |
| 112 | // counter to 1. |
| 113 | // |
| 114 | // It acquires a []byte from the given pool and copies over the backing array |
| 115 | // of the given data. The []byte acquired from the pool is returned to the |
| 116 | // pool when all references to the returned Buffer are released. |
| 117 | func Copy(data []byte, pool BufferPool) Buffer { |
| 118 | if IsBelowBufferPoolingThreshold(len(data)) { |
| 119 | buf := make(SliceBuffer, len(data)) |
| 120 | copy(buf, data) |
| 121 | return buf |
| 122 | } |
| 123 | |
| 124 | buf := pool.Get(len(data)) |
| 125 | copy(*buf, data) |
| 126 | return NewBuffer(buf, pool) |
| 127 | } |
| 128 | |
| 129 | func (b *buffer) ReadOnlyData() []byte { |
| 130 | if b.refs == nil { |
| 131 | panic("Cannot read freed buffer") |
| 132 | } |
| 133 | return b.data |
| 134 | } |
| 135 | |
| 136 | func (b *buffer) Ref() { |
| 137 | if b.refs == nil { |
| 138 | panic("Cannot ref freed buffer") |
| 139 | } |
| 140 | b.refs.Add(1) |
| 141 | } |
| 142 | |
| 143 | func (b *buffer) Free() { |
| 144 | if b.refs == nil { |
| 145 | panic("Cannot free freed buffer") |
| 146 | } |
| 147 | |
| 148 | refs := b.refs.Add(-1) |
| 149 | switch { |
| 150 | case refs > 0: |
| 151 | return |
| 152 | case refs == 0: |
| 153 | if b.pool != nil { |
| 154 | b.pool.Put(b.origData) |
| 155 | } |
| 156 | |
| 157 | refObjectPool.Put(b.refs) |
| 158 | b.origData = nil |
| 159 | b.data = nil |
| 160 | b.refs = nil |
| 161 | b.pool = nil |
| 162 | bufferObjectPool.Put(b) |
| 163 | default: |
| 164 | panic("Cannot free freed buffer") |
| 165 | } |
| 166 | } |
| 167 | |
| 168 | func (b *buffer) Len() int { |
| 169 | return len(b.ReadOnlyData()) |
| 170 | } |
| 171 | |
| 172 | func (b *buffer) split(n int) (Buffer, Buffer) { |
| 173 | if b.refs == nil { |
| 174 | panic("Cannot split freed buffer") |
| 175 | } |
| 176 | |
| 177 | b.refs.Add(1) |
| 178 | split := newBuffer() |
| 179 | split.origData = b.origData |
| 180 | split.data = b.data[n:] |
| 181 | split.refs = b.refs |
| 182 | split.pool = b.pool |
| 183 | |
| 184 | b.data = b.data[:n] |
| 185 | |
| 186 | return b, split |
| 187 | } |
| 188 | |
| 189 | func (b *buffer) read(buf []byte) (int, Buffer) { |
| 190 | if b.refs == nil { |
| 191 | panic("Cannot read freed buffer") |
| 192 | } |
| 193 | |
| 194 | n := copy(buf, b.data) |
| 195 | if n == len(b.data) { |
| 196 | b.Free() |
| 197 | return n, nil |
| 198 | } |
| 199 | |
| 200 | b.data = b.data[n:] |
| 201 | return n, b |
| 202 | } |
| 203 | |
| 204 | func (b *buffer) String() string { |
| 205 | return fmt.Sprintf("mem.Buffer(%p, data: %p, length: %d)", b, b.ReadOnlyData(), len(b.ReadOnlyData())) |
| 206 | } |
| 207 | |
| 208 | // ReadUnsafe reads bytes from the given Buffer into the provided slice. |
| 209 | // It does not perform safety checks. |
| 210 | func ReadUnsafe(dst []byte, buf Buffer) (int, Buffer) { |
| 211 | return buf.read(dst) |
| 212 | } |
| 213 | |
| 214 | // SplitUnsafe modifies the receiver to point to the first n bytes while it |
| 215 | // returns a new reference to the remaining bytes. The returned Buffer |
| 216 | // functions just like a normal reference acquired using Ref(). |
| 217 | func SplitUnsafe(buf Buffer, n int) (left, right Buffer) { |
| 218 | return buf.split(n) |
| 219 | } |
| 220 | |
| 221 | type emptyBuffer struct{} |
| 222 | |
| 223 | func (e emptyBuffer) ReadOnlyData() []byte { |
| 224 | return nil |
| 225 | } |
| 226 | |
| 227 | func (e emptyBuffer) Ref() {} |
| 228 | func (e emptyBuffer) Free() {} |
| 229 | |
| 230 | func (e emptyBuffer) Len() int { |
| 231 | return 0 |
| 232 | } |
| 233 | |
| 234 | func (e emptyBuffer) split(int) (left, right Buffer) { |
| 235 | return e, e |
| 236 | } |
| 237 | |
| 238 | func (e emptyBuffer) read([]byte) (int, Buffer) { |
| 239 | return 0, e |
| 240 | } |
| 241 | |
| 242 | // SliceBuffer is a Buffer implementation that wraps a byte slice. It provides |
| 243 | // methods for reading, splitting, and managing the byte slice. |
| 244 | type SliceBuffer []byte |
| 245 | |
| 246 | // ReadOnlyData returns the byte slice. |
| 247 | func (s SliceBuffer) ReadOnlyData() []byte { return s } |
| 248 | |
| 249 | // Ref is a noop implementation of Ref. |
| 250 | func (s SliceBuffer) Ref() {} |
| 251 | |
| 252 | // Free is a noop implementation of Free. |
| 253 | func (s SliceBuffer) Free() {} |
| 254 | |
| 255 | // Len is a noop implementation of Len. |
| 256 | func (s SliceBuffer) Len() int { return len(s) } |
| 257 | |
| 258 | func (s SliceBuffer) split(n int) (left, right Buffer) { |
| 259 | return s[:n], s[n:] |
| 260 | } |
| 261 | |
| 262 | func (s SliceBuffer) read(buf []byte) (int, Buffer) { |
| 263 | n := copy(buf, s) |
| 264 | if n == len(s) { |
| 265 | return n, nil |
| 266 | } |
| 267 | return n, s[n:] |
| 268 | } |