blob: 90a3fabbff7b072620e140dd35db889938f827ee [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import "time"
4
5type PartitionResult struct {
6 ErrorCode KError
7 ErrorMessage *string
8}
9
10func (b *PartitionResult) encode(pe packetEncoder, version int16) error {
11 pe.putKError(b.ErrorCode)
12 if err := pe.putNullableString(b.ErrorMessage); err != nil {
13 return err
14 }
15 pe.putEmptyTaggedFieldArray()
16 return nil
17}
18
19func (b *PartitionResult) decode(pd packetDecoder, version int16) (err error) {
20 b.ErrorCode, err = pd.getKError()
21 if err != nil {
22 return err
23 }
24 b.ErrorMessage, err = pd.getNullableString()
25 if err != nil {
26 return err
27 }
28 _, err = pd.getEmptyTaggedFieldArray()
29 return err
30}
31
32type ElectLeadersResponse struct {
33 Version int16
34 ThrottleTimeMs int32
35 ErrorCode KError
36 ReplicaElectionResults map[string]map[int32]*PartitionResult
37}
38
39func (r *ElectLeadersResponse) setVersion(v int16) {
40 r.Version = v
41}
42
43func (r *ElectLeadersResponse) encode(pe packetEncoder) error {
44 pe.putInt32(r.ThrottleTimeMs)
45
46 if r.Version > 0 {
47 pe.putKError(r.ErrorCode)
48 }
49
50 if err := pe.putArrayLength(len(r.ReplicaElectionResults)); err != nil {
51 return err
52 }
53 for topic, partitions := range r.ReplicaElectionResults {
54 if err := pe.putString(topic); err != nil {
55 return err
56 }
57 if err := pe.putArrayLength(len(partitions)); err != nil {
58 return err
59 }
60 for partition, result := range partitions {
61 pe.putInt32(partition)
62 if err := result.encode(pe, r.Version); err != nil {
63 return err
64 }
65 }
66 pe.putEmptyTaggedFieldArray()
67 }
68
69 pe.putEmptyTaggedFieldArray()
70 return nil
71}
72
73func (r *ElectLeadersResponse) decode(pd packetDecoder, version int16) (err error) {
74 r.Version = version
75 if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
76 return err
77 }
78 if r.Version > 0 {
79 r.ErrorCode, err = pd.getKError()
80 if err != nil {
81 return err
82 }
83 }
84
85 numTopics, err := pd.getArrayLength()
86 if err != nil {
87 return err
88 }
89
90 r.ReplicaElectionResults = make(map[string]map[int32]*PartitionResult, numTopics)
91 for i := 0; i < numTopics; i++ {
92 topic, err := pd.getString()
93 if err != nil {
94 return err
95 }
96
97 numPartitions, err := pd.getArrayLength()
98 if err != nil {
99 return err
100 }
101 r.ReplicaElectionResults[topic] = make(map[int32]*PartitionResult, numPartitions)
102 for j := 0; j < numPartitions; j++ {
103 partition, err := pd.getInt32()
104 if err != nil {
105 return err
106 }
107 result := new(PartitionResult)
108 if err := result.decode(pd, r.Version); err != nil {
109 return err
110 }
111 r.ReplicaElectionResults[topic][partition] = result
112 }
113 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
114 return err
115 }
116 }
117
118 _, err = pd.getEmptyTaggedFieldArray()
119 return err
120}
121
122func (r *ElectLeadersResponse) key() int16 {
123 return apiKeyElectLeaders
124}
125
126func (r *ElectLeadersResponse) version() int16 {
127 return r.Version
128}
129
130func (r *ElectLeadersResponse) headerVersion() int16 {
131 return 1
132}
133
134func (r *ElectLeadersResponse) isValidVersion() bool {
135 return r.Version >= 0 && r.Version <= 2
136}
137
138func (r *ElectLeadersResponse) isFlexible() bool {
139 return r.isFlexibleVersion(r.Version)
140}
141
142func (r *ElectLeadersResponse) isFlexibleVersion(version int16) bool {
143 return version >= 2
144}
145
146func (r *ElectLeadersResponse) requiredVersion() KafkaVersion {
147 switch r.Version {
148 case 2:
149 return V2_4_0_0
150 case 1:
151 return V0_11_0_0
152 case 0:
153 return V0_10_0_0
154 default:
155 return V2_4_0_0
156 }
157}
158
159func (r *ElectLeadersResponse) throttleTime() time.Duration {
160 return time.Duration(r.ThrottleTimeMs) * time.Millisecond
161}