blob: 81c2f5fd761b8a9be51c2b4a15191bf87e9af840 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18// Package buffer provides an implementation of an unbounded buffer.
19package buffer
20
21import "sync"
22
23// Unbounded is an implementation of an unbounded buffer which does not use
24// extra goroutines. This is typically used for passing updates from one entity
25// to another within gRPC.
26//
27// All methods on this type are thread-safe and don't block on anything except
28// the underlying mutex used for synchronization.
29//
30// Unbounded supports values of any type to be stored in it by using a channel
31// of `interface{}`. This means that a call to Put() incurs an extra memory
32// allocation, and also that users need a type assertion while reading. For
33// performance critical code paths, using Unbounded is strongly discouraged and
34// defining a new type specific implementation of this buffer is preferred. See
35// internal/transport/transport.go for an example of this.
36type Unbounded struct {
37 c chan interface{}
Akash Kankanala761955c2024-02-21 19:32:20 +053038 closed bool
khenaidoo5fc5cea2021-08-11 17:39:16 -040039 mu sync.Mutex
40 backlog []interface{}
41}
42
43// NewUnbounded returns a new instance of Unbounded.
44func NewUnbounded() *Unbounded {
45 return &Unbounded{c: make(chan interface{}, 1)}
46}
47
48// Put adds t to the unbounded buffer.
49func (b *Unbounded) Put(t interface{}) {
50 b.mu.Lock()
Akash Kankanala761955c2024-02-21 19:32:20 +053051 defer b.mu.Unlock()
52 if b.closed {
53 return
54 }
khenaidoo5fc5cea2021-08-11 17:39:16 -040055 if len(b.backlog) == 0 {
56 select {
57 case b.c <- t:
khenaidoo5fc5cea2021-08-11 17:39:16 -040058 return
59 default:
60 }
61 }
62 b.backlog = append(b.backlog, t)
khenaidoo5fc5cea2021-08-11 17:39:16 -040063}
64
65// Load sends the earliest buffered data, if any, onto the read channel
66// returned by Get(). Users are expected to call this every time they read a
67// value from the read channel.
68func (b *Unbounded) Load() {
69 b.mu.Lock()
Akash Kankanala761955c2024-02-21 19:32:20 +053070 defer b.mu.Unlock()
71 if b.closed {
72 return
73 }
khenaidoo5fc5cea2021-08-11 17:39:16 -040074 if len(b.backlog) > 0 {
75 select {
76 case b.c <- b.backlog[0]:
77 b.backlog[0] = nil
78 b.backlog = b.backlog[1:]
79 default:
80 }
81 }
khenaidoo5fc5cea2021-08-11 17:39:16 -040082}
83
84// Get returns a read channel on which values added to the buffer, via Put(),
85// are sent on.
86//
87// Upon reading a value from this channel, users are expected to call Load() to
88// send the next buffered value onto the channel if there is any.
Akash Kankanala761955c2024-02-21 19:32:20 +053089//
90// If the unbounded buffer is closed, the read channel returned by this method
91// is closed.
khenaidoo5fc5cea2021-08-11 17:39:16 -040092func (b *Unbounded) Get() <-chan interface{} {
93 return b.c
94}
Akash Kankanala761955c2024-02-21 19:32:20 +053095
96// Close closes the unbounded buffer.
97func (b *Unbounded) Close() {
98 b.mu.Lock()
99 defer b.mu.Unlock()
100 if b.closed {
101 return
102 }
103 b.closed = true
104 close(b.c)
105}