[VOL-5486] Fix deprecated versions

Change-Id: If0b888d6c2f33b2f415c8b03b08dc994bb3df3f4
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/github.com/IBM/sarama/create_topics_request.go b/vendor/github.com/IBM/sarama/create_topics_request.go
new file mode 100644
index 0000000..5684e27
--- /dev/null
+++ b/vendor/github.com/IBM/sarama/create_topics_request.go
@@ -0,0 +1,258 @@
+package sarama
+
+import (
+	"time"
+)
+
+type CreateTopicsRequest struct {
+	// Version defines the protocol version to use for encode and decode
+	Version int16
+	// TopicDetails contains the topics to create.
+	TopicDetails map[string]*TopicDetail
+	// Timeout contains how long to wait before timing out the request.
+	Timeout time.Duration
+	// ValidateOnly if true, check that the topics can be created as specified,
+	// but don't create anything.
+	ValidateOnly bool
+}
+
+func (c *CreateTopicsRequest) setVersion(v int16) {
+	c.Version = v
+}
+
+func NewCreateTopicsRequest(
+	version KafkaVersion,
+	topicDetails map[string]*TopicDetail,
+	timeout time.Duration,
+	validateOnly bool,
+) *CreateTopicsRequest {
+	r := &CreateTopicsRequest{
+		TopicDetails: topicDetails,
+		Timeout:      timeout,
+		ValidateOnly: validateOnly,
+	}
+	switch {
+	case version.IsAtLeast(V2_4_0_0):
+		// Version 5 is the first flexible version
+		// Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464)
+		r.Version = 5
+	case version.IsAtLeast(V2_0_0_0):
+		// Version 3 is the same as version 2 (brokers response before throttling)
+		r.Version = 3
+	case version.IsAtLeast(V0_11_0_0):
+		// Version 2 is the same as version 1 (response has ThrottleTime)
+		r.Version = 2
+	case version.IsAtLeast(V0_10_2_0):
+		// Version 1 adds validateOnly.
+		r.Version = 1
+	}
+	return r
+}
+
+func (c *CreateTopicsRequest) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(c.TopicDetails)); err != nil {
+		return err
+	}
+	for topic, detail := range c.TopicDetails {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := detail.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	pe.putInt32(int32(c.Timeout / time.Millisecond))
+
+	if c.Version >= 1 {
+		pe.putBool(c.ValidateOnly)
+	}
+
+	pe.putEmptyTaggedFieldArray()
+	return nil
+}
+
+func (c *CreateTopicsRequest) decode(pd packetDecoder, version int16) (err error) {
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	c.TopicDetails = make(map[string]*TopicDetail, n)
+
+	for i := 0; i < n; i++ {
+		topic, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		c.TopicDetails[topic] = new(TopicDetail)
+		if err = c.TopicDetails[topic].decode(pd, version); err != nil {
+			return err
+		}
+	}
+
+	timeout, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	c.Timeout = time.Duration(timeout) * time.Millisecond
+
+	if version >= 1 {
+		c.ValidateOnly, err = pd.getBool()
+		if err != nil {
+			return err
+		}
+
+		c.Version = version
+	}
+	_, err = pd.getEmptyTaggedFieldArray()
+	return err
+}
+
+func (c *CreateTopicsRequest) key() int16 {
+	return apiKeyCreateTopics
+}
+
+func (c *CreateTopicsRequest) version() int16 {
+	return c.Version
+}
+
+func (c *CreateTopicsRequest) headerVersion() int16 {
+	if c.Version >= 5 {
+		return 2
+	}
+	return 1
+}
+
+func (c *CreateTopicsRequest) isFlexible() bool {
+	return c.isFlexibleVersion(c.Version)
+}
+
+func (c *CreateTopicsRequest) isFlexibleVersion(version int16) bool {
+	return version >= 5
+}
+
+func (c *CreateTopicsRequest) isValidVersion() bool {
+	return c.Version >= 0 && c.Version <= 5
+}
+
+func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
+	switch c.Version {
+	case 5:
+		return V2_4_0_0
+	case 4:
+		return V2_4_0_0
+	case 3:
+		return V2_0_0_0
+	case 2:
+		return V0_11_0_0
+	case 1:
+		return V0_10_2_0
+	case 0:
+		return V0_10_1_0
+	default:
+		return V2_8_0_0
+	}
+}
+
+type TopicDetail struct {
+	// NumPartitions contains the number of partitions to create in the topic, or
+	// -1 if we are either specifying a manual partition assignment or using the
+	// default partitions.
+	NumPartitions int32
+	// ReplicationFactor contains the number of replicas to create for each
+	// partition in the topic, or -1 if we are either specifying a manual
+	// partition assignment or using the default replication factor.
+	ReplicationFactor int16
+	// ReplicaAssignment contains the manual partition assignment, or the empty
+	// array if we are using automatic assignment.
+	ReplicaAssignment map[int32][]int32
+	// ConfigEntries contains the custom topic configurations to set.
+	ConfigEntries map[string]*string
+}
+
+func (t *TopicDetail) encode(pe packetEncoder) error {
+	pe.putInt32(t.NumPartitions)
+	pe.putInt16(t.ReplicationFactor)
+
+	if err := pe.putArrayLength(len(t.ReplicaAssignment)); err != nil {
+		return err
+	}
+	for partition, assignment := range t.ReplicaAssignment {
+		pe.putInt32(partition)
+		if err := pe.putInt32Array(assignment); err != nil {
+			return err
+		}
+		pe.putEmptyTaggedFieldArray()
+	}
+
+	if err := pe.putArrayLength(len(t.ConfigEntries)); err != nil {
+		return err
+	}
+	for configKey, configValue := range t.ConfigEntries {
+		if err := pe.putString(configKey); err != nil {
+			return err
+		}
+		if err := pe.putNullableString(configValue); err != nil {
+			return err
+		}
+		pe.putEmptyTaggedFieldArray()
+	}
+
+	pe.putEmptyTaggedFieldArray()
+	return nil
+}
+
+func (t *TopicDetail) decode(pd packetDecoder, version int16) (err error) {
+	if t.NumPartitions, err = pd.getInt32(); err != nil {
+		return err
+	}
+	if t.ReplicationFactor, err = pd.getInt16(); err != nil {
+		return err
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	if n > 0 {
+		t.ReplicaAssignment = make(map[int32][]int32, n)
+		for i := 0; i < n; i++ {
+			replica, err := pd.getInt32()
+			if err != nil {
+				return err
+			}
+			if t.ReplicaAssignment[replica], err = pd.getInt32Array(); err != nil {
+				return err
+			}
+			if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+				return err
+			}
+		}
+	}
+
+	n, err = pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	if n > 0 {
+		t.ConfigEntries = make(map[string]*string, n)
+		for i := 0; i < n; i++ {
+			configKey, err := pd.getString()
+			if err != nil {
+				return err
+			}
+			if t.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
+				return err
+			}
+			if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+				return err
+			}
+		}
+	}
+
+	_, err = pd.getEmptyTaggedFieldArray()
+	return err
+}