[VOL-5486] Fix deprecated versions

Change-Id: I3e03ea246020547ae75fa92ce8cf5cbba7e8f3bb
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/github.com/IBM/sarama/metadata.go b/vendor/github.com/IBM/sarama/metadata.go
new file mode 100644
index 0000000..20620b3
--- /dev/null
+++ b/vendor/github.com/IBM/sarama/metadata.go
@@ -0,0 +1,239 @@
+package sarama
+
+import (
+	"sync"
+)
+
+type metadataRefresh func(topics []string) error
+
+// currentRefresh makes sure sarama does not issue metadata requests
+// in parallel. If we need to refresh the metadata for a list of topics,
+// this struct will check if a refresh is already ongoing, and if so, it will
+// accumulate the list of topics to refresh in the next refresh.
+// When the current refresh is over, it will queue a new metadata refresh call
+// with the accumulated list of topics.
+type currentRefresh struct {
+	// This is the function that gets called when to refresh the metadata.
+	// It is called with the list of all topics that need to be refreshed
+	// or with nil if all topics need to be refreshed.
+	refresh func(topics []string) error
+
+	mu        sync.Mutex
+	ongoing   bool
+	topicsMap map[string]struct{}
+	topics    []string
+	allTopics bool
+	chans     []chan error
+}
+
+// addTopicsFrom adds topics from the next refresh to the current refresh.
+// You need to hold the lock to call this method.
+func (r *currentRefresh) addTopicsFrom(next *nextRefresh) {
+	if next.allTopics {
+		r.allTopics = true
+		return
+	}
+	if len(next.topics) > 0 {
+		r.addTopics(next.topics)
+	}
+}
+
+// nextRefresh holds the list of topics we will need
+// to refresh in the next refresh.
+// When a refresh is ongoing, calls to RefreshMetadata() are
+// accumulated in this struct, so that we can immediately issue another
+// refresh when the current refresh is over.
+type nextRefresh struct {
+	mu        sync.Mutex
+	topics    []string
+	allTopics bool
+}
+
+// addTopics adds topics to the refresh.
+// You need to hold the lock to call this method.
+func (r *currentRefresh) addTopics(topics []string) {
+	if len(topics) == 0 {
+		r.allTopics = true
+		return
+	}
+	for _, topic := range topics {
+		if _, ok := r.topicsMap[topic]; ok {
+			continue
+		}
+		r.topicsMap[topic] = struct{}{}
+		r.topics = append(r.topics, topic)
+	}
+}
+
+func (r *nextRefresh) addTopics(topics []string) {
+	if len(topics) == 0 {
+		r.allTopics = true
+		// All topics are requested, so we can clear the topics
+		// that were previously accumulated.
+		r.topics = r.topics[:0]
+		return
+	}
+	r.topics = append(r.topics, topics...)
+}
+
+func (r *nextRefresh) clear() {
+	r.topics = r.topics[:0]
+	r.allTopics = false
+}
+
+func (r *currentRefresh) hasTopics(topics []string) bool {
+	if len(topics) == 0 {
+		// This means that the caller wants to know if the refresh is for all topics.
+		// In this case, we return true if the refresh is for all topics, or false if it is not.
+		return r.allTopics
+	}
+	if r.allTopics {
+		return true
+	}
+	for _, topic := range topics {
+		if _, ok := r.topicsMap[topic]; !ok {
+			return false
+		}
+	}
+	return true
+}
+
+// start starts a new refresh.
+// The refresh is started in a new goroutine, and this function
+// returns a channel on which the caller can wait for the refresh
+// to complete.
+// You need to hold the lock to call this method.
+func (r *currentRefresh) start() chan error {
+	r.ongoing = true
+	ch := r.wait()
+	topics := r.topics
+	if r.allTopics {
+		topics = nil
+	}
+	go func() {
+		err := r.refresh(topics)
+		r.mu.Lock()
+		defer r.mu.Unlock()
+
+		r.ongoing = false
+		for _, ch := range r.chans {
+			ch <- err
+			close(ch)
+		}
+		r.clear()
+	}()
+	return ch
+}
+
+// clear clears the refresh state.
+// You need to hold the lock to call this method.
+func (r *currentRefresh) clear() {
+	r.topics = r.topics[:0]
+	for key := range r.topicsMap {
+		delete(r.topicsMap, key)
+	}
+	r.allTopics = false
+	r.chans = r.chans[:0]
+}
+
+// wait returns the channel on which you can wait for the refresh
+// to complete.
+// You need to hold the lock to call this method.
+func (r *currentRefresh) wait() chan error {
+	if !r.ongoing {
+		panic("waiting for a refresh that is not ongoing")
+	}
+	ch := make(chan error, 1)
+	r.chans = append(r.chans, ch)
+	return ch
+}
+
+// singleFlightMetadataRefresher helps managing metadata refreshes.
+// It makes sure a sarama client never issues more than one metadata refresh
+// in parallel.
+type singleFlightMetadataRefresher struct {
+	current *currentRefresh
+	next    *nextRefresh
+}
+
+func newSingleFlightRefresher(f func(topics []string) error) metadataRefresh {
+	return newMetadataRefresh(f).Refresh
+}
+
+func newMetadataRefresh(f func(topics []string) error) *singleFlightMetadataRefresher {
+	return &singleFlightMetadataRefresher{
+		current: &currentRefresh{
+			topicsMap: make(map[string]struct{}),
+			refresh:   f,
+		},
+		next: &nextRefresh{},
+	}
+}
+
+// Refresh is the function that clients call when they want to refresh
+// the metadata. This function blocks until a refresh is issued, and its
+// result is received, for the list of topics the caller provided.
+// If a refresh was already ongoing for this list of topics, the function
+// waits on that refresh to complete, and returns its result.
+// If a refresh was already ongoing for a different list of topics, the function
+// accumulates the list of topics to refresh in the next refresh, and queues that refresh.
+// If no refresh is ongoing, it will start a new refresh, and return its result.
+func (m *singleFlightMetadataRefresher) Refresh(topics []string) error {
+	for {
+		ch, queued := m.refreshOrQueue(topics)
+		if !queued {
+			return <-ch
+		}
+		<-ch
+	}
+}
+
+// refreshOrQueue returns a channel the refresh needs to wait on, and a boolean
+// that indicates whether waiting on the channel will return the result of that refresh
+// or whether the refresh was "queued" and the caller needs to wait for the channel to
+// return, and then call refreshOrQueue again.
+// When calling refreshOrQueue, three things can happen:
+//  1. either no refresh is ongoing.
+//     In this case, a new refresh is started, and the channel that's returned will
+//     contain the result of that refresh, so it returns "false" as the second return value.
+//  2. a refresh is ongoing, and it contains the topics we need.
+//     In this case, the channel that's returned will contain the result of that refresh,
+//     so it returns "false" as the second return value.
+//     In this case, the channel that's returned will contain the result of that refresh,
+//     so it returns "false" as the second return value.
+//  3. a refresh is already ongoing, but doesn't contain the topics we need. In this case,
+//     the caller needs to wait for the refresh to finish, and then call refreshOrQueue again.
+//     The channel that's returned is for the current refresh (not the one the caller is
+//     interested in), so it returns "true" as the second return value. The caller needs to
+//     wait on the channel, disregard the value, and call refreshOrQueue again.
+func (m *singleFlightMetadataRefresher) refreshOrQueue(topics []string) (chan error, bool) {
+	m.current.mu.Lock()
+	defer m.current.mu.Unlock()
+	if !m.current.ongoing {
+		// If no refresh is ongoing, we can start a new one, in which
+		// we add the topics that have been accumulated in the next refresh
+		// and the topics that have been provided by the caller.
+		m.next.mu.Lock()
+		m.current.addTopicsFrom(m.next)
+		m.next.clear()
+		m.next.mu.Unlock()
+		m.current.addTopics(topics)
+		ch := m.current.start()
+		return ch, false
+	}
+	if m.current.hasTopics(topics) {
+		// A refresh is ongoing, and we were lucky: it is refreshing the topics we need already:
+		// we just have to wait for it to finish and return its results.
+		ch := m.current.wait()
+		return ch, false
+	}
+	// There is a refresh ongoing, but it is not refreshing the topics we need.
+	// We need to wait for it to finish, and then start a new refresh.
+	ch := m.current.wait()
+	m.next.mu.Lock()
+	m.next.addTopics(topics)
+	m.next.mu.Unlock()
+	// This is where we wait for that refresh to finish, and the loop will take care
+	// of starting the new one.
+	return ch, true
+}