blob: 20620b33e7bc5a6bbe031d7486899e3eb257c6b8 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import (
4 "sync"
5)
6
7type metadataRefresh func(topics []string) error
8
9// currentRefresh makes sure sarama does not issue metadata requests
10// in parallel. If we need to refresh the metadata for a list of topics,
11// this struct will check if a refresh is already ongoing, and if so, it will
12// accumulate the list of topics to refresh in the next refresh.
13// When the current refresh is over, it will queue a new metadata refresh call
14// with the accumulated list of topics.
15type currentRefresh struct {
16 // This is the function that gets called when to refresh the metadata.
17 // It is called with the list of all topics that need to be refreshed
18 // or with nil if all topics need to be refreshed.
19 refresh func(topics []string) error
20
21 mu sync.Mutex
22 ongoing bool
23 topicsMap map[string]struct{}
24 topics []string
25 allTopics bool
26 chans []chan error
27}
28
29// addTopicsFrom adds topics from the next refresh to the current refresh.
30// You need to hold the lock to call this method.
31func (r *currentRefresh) addTopicsFrom(next *nextRefresh) {
32 if next.allTopics {
33 r.allTopics = true
34 return
35 }
36 if len(next.topics) > 0 {
37 r.addTopics(next.topics)
38 }
39}
40
41// nextRefresh holds the list of topics we will need
42// to refresh in the next refresh.
43// When a refresh is ongoing, calls to RefreshMetadata() are
44// accumulated in this struct, so that we can immediately issue another
45// refresh when the current refresh is over.
46type nextRefresh struct {
47 mu sync.Mutex
48 topics []string
49 allTopics bool
50}
51
52// addTopics adds topics to the refresh.
53// You need to hold the lock to call this method.
54func (r *currentRefresh) addTopics(topics []string) {
55 if len(topics) == 0 {
56 r.allTopics = true
57 return
58 }
59 for _, topic := range topics {
60 if _, ok := r.topicsMap[topic]; ok {
61 continue
62 }
63 r.topicsMap[topic] = struct{}{}
64 r.topics = append(r.topics, topic)
65 }
66}
67
68func (r *nextRefresh) addTopics(topics []string) {
69 if len(topics) == 0 {
70 r.allTopics = true
71 // All topics are requested, so we can clear the topics
72 // that were previously accumulated.
73 r.topics = r.topics[:0]
74 return
75 }
76 r.topics = append(r.topics, topics...)
77}
78
79func (r *nextRefresh) clear() {
80 r.topics = r.topics[:0]
81 r.allTopics = false
82}
83
84func (r *currentRefresh) hasTopics(topics []string) bool {
85 if len(topics) == 0 {
86 // This means that the caller wants to know if the refresh is for all topics.
87 // In this case, we return true if the refresh is for all topics, or false if it is not.
88 return r.allTopics
89 }
90 if r.allTopics {
91 return true
92 }
93 for _, topic := range topics {
94 if _, ok := r.topicsMap[topic]; !ok {
95 return false
96 }
97 }
98 return true
99}
100
101// start starts a new refresh.
102// The refresh is started in a new goroutine, and this function
103// returns a channel on which the caller can wait for the refresh
104// to complete.
105// You need to hold the lock to call this method.
106func (r *currentRefresh) start() chan error {
107 r.ongoing = true
108 ch := r.wait()
109 topics := r.topics
110 if r.allTopics {
111 topics = nil
112 }
113 go func() {
114 err := r.refresh(topics)
115 r.mu.Lock()
116 defer r.mu.Unlock()
117
118 r.ongoing = false
119 for _, ch := range r.chans {
120 ch <- err
121 close(ch)
122 }
123 r.clear()
124 }()
125 return ch
126}
127
128// clear clears the refresh state.
129// You need to hold the lock to call this method.
130func (r *currentRefresh) clear() {
131 r.topics = r.topics[:0]
132 for key := range r.topicsMap {
133 delete(r.topicsMap, key)
134 }
135 r.allTopics = false
136 r.chans = r.chans[:0]
137}
138
139// wait returns the channel on which you can wait for the refresh
140// to complete.
141// You need to hold the lock to call this method.
142func (r *currentRefresh) wait() chan error {
143 if !r.ongoing {
144 panic("waiting for a refresh that is not ongoing")
145 }
146 ch := make(chan error, 1)
147 r.chans = append(r.chans, ch)
148 return ch
149}
150
151// singleFlightMetadataRefresher helps managing metadata refreshes.
152// It makes sure a sarama client never issues more than one metadata refresh
153// in parallel.
154type singleFlightMetadataRefresher struct {
155 current *currentRefresh
156 next *nextRefresh
157}
158
159func newSingleFlightRefresher(f func(topics []string) error) metadataRefresh {
160 return newMetadataRefresh(f).Refresh
161}
162
163func newMetadataRefresh(f func(topics []string) error) *singleFlightMetadataRefresher {
164 return &singleFlightMetadataRefresher{
165 current: &currentRefresh{
166 topicsMap: make(map[string]struct{}),
167 refresh: f,
168 },
169 next: &nextRefresh{},
170 }
171}
172
173// Refresh is the function that clients call when they want to refresh
174// the metadata. This function blocks until a refresh is issued, and its
175// result is received, for the list of topics the caller provided.
176// If a refresh was already ongoing for this list of topics, the function
177// waits on that refresh to complete, and returns its result.
178// If a refresh was already ongoing for a different list of topics, the function
179// accumulates the list of topics to refresh in the next refresh, and queues that refresh.
180// If no refresh is ongoing, it will start a new refresh, and return its result.
181func (m *singleFlightMetadataRefresher) Refresh(topics []string) error {
182 for {
183 ch, queued := m.refreshOrQueue(topics)
184 if !queued {
185 return <-ch
186 }
187 <-ch
188 }
189}
190
191// refreshOrQueue returns a channel the refresh needs to wait on, and a boolean
192// that indicates whether waiting on the channel will return the result of that refresh
193// or whether the refresh was "queued" and the caller needs to wait for the channel to
194// return, and then call refreshOrQueue again.
195// When calling refreshOrQueue, three things can happen:
196// 1. either no refresh is ongoing.
197// In this case, a new refresh is started, and the channel that's returned will
198// contain the result of that refresh, so it returns "false" as the second return value.
199// 2. a refresh is ongoing, and it contains the topics we need.
200// In this case, the channel that's returned will contain the result of that refresh,
201// so it returns "false" as the second return value.
202// In this case, the channel that's returned will contain the result of that refresh,
203// so it returns "false" as the second return value.
204// 3. a refresh is already ongoing, but doesn't contain the topics we need. In this case,
205// the caller needs to wait for the refresh to finish, and then call refreshOrQueue again.
206// The channel that's returned is for the current refresh (not the one the caller is
207// interested in), so it returns "true" as the second return value. The caller needs to
208// wait on the channel, disregard the value, and call refreshOrQueue again.
209func (m *singleFlightMetadataRefresher) refreshOrQueue(topics []string) (chan error, bool) {
210 m.current.mu.Lock()
211 defer m.current.mu.Unlock()
212 if !m.current.ongoing {
213 // If no refresh is ongoing, we can start a new one, in which
214 // we add the topics that have been accumulated in the next refresh
215 // and the topics that have been provided by the caller.
216 m.next.mu.Lock()
217 m.current.addTopicsFrom(m.next)
218 m.next.clear()
219 m.next.mu.Unlock()
220 m.current.addTopics(topics)
221 ch := m.current.start()
222 return ch, false
223 }
224 if m.current.hasTopics(topics) {
225 // A refresh is ongoing, and we were lucky: it is refreshing the topics we need already:
226 // we just have to wait for it to finish and return its results.
227 ch := m.current.wait()
228 return ch, false
229 }
230 // There is a refresh ongoing, but it is not refreshing the topics we need.
231 // We need to wait for it to finish, and then start a new refresh.
232 ch := m.current.wait()
233 m.next.mu.Lock()
234 m.next.addTopics(topics)
235 m.next.mu.Unlock()
236 // This is where we wait for that refresh to finish, and the loop will take care
237 // of starting the new one.
238 return ch, true
239}