[VOL-5486] Fix deprecated versions
Change-Id: If0b888d6c2f33b2f415c8b03b08dc994bb3df3f4
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/github.com/IBM/sarama/offset_request.go b/vendor/github.com/IBM/sarama/offset_request.go
new file mode 100644
index 0000000..01fbb33
--- /dev/null
+++ b/vendor/github.com/IBM/sarama/offset_request.go
@@ -0,0 +1,236 @@
+package sarama
+
+type offsetRequestBlock struct {
+ // currentLeaderEpoch contains the current leader epoch (used in version 4+).
+ currentLeaderEpoch int32
+ // timestamp contains the current timestamp.
+ timestamp int64
+ // maxNumOffsets contains the maximum number of offsets to report.
+ maxNumOffsets int32 // Only used in version 0
+}
+
+func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
+ if version >= 4 {
+ pe.putInt32(b.currentLeaderEpoch)
+ }
+
+ pe.putInt64(b.timestamp)
+
+ if version == 0 {
+ pe.putInt32(b.maxNumOffsets)
+ }
+
+ return nil
+}
+
+func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) {
+ b.currentLeaderEpoch = -1
+ if version >= 4 {
+ if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
+ return err
+ }
+ }
+
+ if b.timestamp, err = pd.getInt64(); err != nil {
+ return err
+ }
+
+ if version == 0 {
+ if b.maxNumOffsets, err = pd.getInt32(); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+type OffsetRequest struct {
+ Version int16
+ IsolationLevel IsolationLevel
+ replicaID int32
+ isReplicaIDSet bool
+ blocks map[string]map[int32]*offsetRequestBlock
+}
+
+func NewOffsetRequest(version KafkaVersion) *OffsetRequest {
+ request := &OffsetRequest{}
+ if version.IsAtLeast(V2_2_0_0) {
+ // Version 5 adds a new error code, OFFSET_NOT_AVAILABLE.
+ request.Version = 5
+ } else if version.IsAtLeast(V2_1_0_0) {
+ // Version 4 adds the current leader epoch, which is used for fencing.
+ request.Version = 4
+ } else if version.IsAtLeast(V2_0_0_0) {
+ // Version 3 is the same as version 2.
+ request.Version = 3
+ } else if version.IsAtLeast(V0_11_0_0) {
+ // Version 2 adds the isolation level, which is used for transactional reads.
+ request.Version = 2
+ } else if version.IsAtLeast(V0_10_1_0) {
+ // Version 1 removes MaxNumOffsets. From this version forward, only a single
+ // offset can be returned.
+ request.Version = 1
+ }
+ return request
+}
+
+func (r *OffsetRequest) setVersion(v int16) {
+ r.Version = v
+}
+
+func (r *OffsetRequest) encode(pe packetEncoder) error {
+ if r.isReplicaIDSet {
+ pe.putInt32(r.replicaID)
+ } else {
+ // default replica ID is always -1 for clients
+ pe.putInt32(-1)
+ }
+
+ if r.Version >= 2 {
+ pe.putBool(r.IsolationLevel == ReadCommitted)
+ }
+
+ err := pe.putArrayLength(len(r.blocks))
+ if err != nil {
+ return err
+ }
+ for topic, partitions := range r.blocks {
+ err = pe.putString(topic)
+ if err != nil {
+ return err
+ }
+ err = pe.putArrayLength(len(partitions))
+ if err != nil {
+ return err
+ }
+ for partition, block := range partitions {
+ pe.putInt32(partition)
+ if err = block.encode(pe, r.Version); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
+ r.Version = version
+
+ replicaID, err := pd.getInt32()
+ if err != nil {
+ return err
+ }
+ if replicaID >= 0 {
+ r.SetReplicaID(replicaID)
+ }
+
+ if r.Version >= 2 {
+ tmp, err := pd.getBool()
+ if err != nil {
+ return err
+ }
+
+ r.IsolationLevel = ReadUncommitted
+ if tmp {
+ r.IsolationLevel = ReadCommitted
+ }
+ }
+
+ blockCount, err := pd.getArrayLength()
+ if err != nil {
+ return err
+ }
+ if blockCount == 0 {
+ return nil
+ }
+ r.blocks = make(map[string]map[int32]*offsetRequestBlock)
+ for i := 0; i < blockCount; i++ {
+ topic, err := pd.getString()
+ if err != nil {
+ return err
+ }
+ partitionCount, err := pd.getArrayLength()
+ if err != nil {
+ return err
+ }
+ r.blocks[topic] = make(map[int32]*offsetRequestBlock)
+ for j := 0; j < partitionCount; j++ {
+ partition, err := pd.getInt32()
+ if err != nil {
+ return err
+ }
+ block := &offsetRequestBlock{}
+ if err := block.decode(pd, version); err != nil {
+ return err
+ }
+ r.blocks[topic][partition] = block
+ }
+ }
+ return nil
+}
+
+func (r *OffsetRequest) key() int16 {
+ return apiKeyListOffsets
+}
+
+func (r *OffsetRequest) version() int16 {
+ return r.Version
+}
+
+func (r *OffsetRequest) headerVersion() int16 {
+ return 1
+}
+
+func (r *OffsetRequest) isValidVersion() bool {
+ return r.Version >= 0 && r.Version <= 5
+}
+
+func (r *OffsetRequest) requiredVersion() KafkaVersion {
+ switch r.Version {
+ case 5:
+ return V2_2_0_0
+ case 4:
+ return V2_1_0_0
+ case 3:
+ return V2_0_0_0
+ case 2:
+ return V0_11_0_0
+ case 1:
+ return V0_10_1_0
+ case 0:
+ return V0_8_2_0
+ default:
+ return V2_0_0_0
+ }
+}
+
+func (r *OffsetRequest) SetReplicaID(id int32) {
+ r.replicaID = id
+ r.isReplicaIDSet = true
+}
+
+func (r *OffsetRequest) ReplicaID() int32 {
+ if r.isReplicaIDSet {
+ return r.replicaID
+ }
+ return -1
+}
+
+func (r *OffsetRequest) AddBlock(topic string, partitionID int32, timestamp int64, maxOffsets int32) {
+ if r.blocks == nil {
+ r.blocks = make(map[string]map[int32]*offsetRequestBlock)
+ }
+
+ if r.blocks[topic] == nil {
+ r.blocks[topic] = make(map[int32]*offsetRequestBlock)
+ }
+
+ tmp := new(offsetRequestBlock)
+ tmp.currentLeaderEpoch = -1
+ tmp.timestamp = timestamp
+ if r.Version == 0 {
+ tmp.maxNumOffsets = maxOffsets
+ }
+
+ r.blocks[topic][partitionID] = tmp
+}