blob: 0c2756305a24082c1c82f00ca1be92a6fc749b7b [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import "time"
4
5type OffsetFetchResponseBlock struct {
6 Offset int64
7 LeaderEpoch int32
8 Metadata string
9 Err KError
10}
11
12func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
13 b.Offset, err = pd.getInt64()
14 if err != nil {
15 return err
16 }
17
18 if version >= 5 {
19 b.LeaderEpoch, err = pd.getInt32()
20 if err != nil {
21 return err
22 }
23 } else {
24 b.LeaderEpoch = -1
25 }
26
27 b.Metadata, err = pd.getString()
28 if err != nil {
29 return err
30 }
31
32 b.Err, err = pd.getKError()
33 if err != nil {
34 return err
35 }
36
37 _, err = pd.getEmptyTaggedFieldArray()
38 return err
39}
40
41func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
42 pe.putInt64(b.Offset)
43
44 if version >= 5 {
45 pe.putInt32(b.LeaderEpoch)
46 }
47 err = pe.putString(b.Metadata)
48 if err != nil {
49 return err
50 }
51
52 pe.putKError(b.Err)
53
54 pe.putEmptyTaggedFieldArray()
55 return nil
56}
57
58type OffsetFetchResponse struct {
59 Version int16
60 ThrottleTimeMs int32
61 Blocks map[string]map[int32]*OffsetFetchResponseBlock
62 Err KError
63}
64
65func (r *OffsetFetchResponse) setVersion(v int16) {
66 r.Version = v
67}
68
69func (r *OffsetFetchResponse) encode(pe packetEncoder) (err error) {
70 if r.Version >= 3 {
71 pe.putInt32(r.ThrottleTimeMs)
72 }
73 err = pe.putArrayLength(len(r.Blocks))
74 if err != nil {
75 return err
76 }
77
78 for topic, partitions := range r.Blocks {
79 err = pe.putString(topic)
80 if err != nil {
81 return err
82 }
83
84 err = pe.putArrayLength(len(partitions))
85 if err != nil {
86 return err
87 }
88 for partition, block := range partitions {
89 pe.putInt32(partition)
90 if err := block.encode(pe, r.Version); err != nil {
91 return err
92 }
93 }
94 pe.putEmptyTaggedFieldArray()
95 }
96 if r.Version >= 2 {
97 pe.putKError(r.Err)
98 }
99 pe.putEmptyTaggedFieldArray()
100 return nil
101}
102
103func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
104 r.Version = version
105
106 if version >= 3 {
107 r.ThrottleTimeMs, err = pd.getInt32()
108 if err != nil {
109 return err
110 }
111 }
112
113 numTopics, err := pd.getArrayLength()
114 if err != nil {
115 return err
116 }
117
118 if numTopics > 0 {
119 r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
120 for i := 0; i < numTopics; i++ {
121 name, err := pd.getString()
122 if err != nil {
123 return err
124 }
125
126 numBlocks, err := pd.getArrayLength()
127 if err != nil {
128 return err
129 }
130
131 r.Blocks[name] = nil
132 if numBlocks > 0 {
133 r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
134 }
135 for j := 0; j < numBlocks; j++ {
136 id, err := pd.getInt32()
137 if err != nil {
138 return err
139 }
140
141 block := new(OffsetFetchResponseBlock)
142 err = block.decode(pd, version)
143 if err != nil {
144 return err
145 }
146
147 r.Blocks[name][id] = block
148 }
149
150 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
151 return err
152 }
153 }
154 }
155
156 if version >= 2 {
157 r.Err, err = pd.getKError()
158 if err != nil {
159 return err
160 }
161 }
162
163 _, err = pd.getEmptyTaggedFieldArray()
164 return err
165}
166
167func (r *OffsetFetchResponse) key() int16 {
168 return apiKeyOffsetFetch
169}
170
171func (r *OffsetFetchResponse) version() int16 {
172 return r.Version
173}
174
175func (r *OffsetFetchResponse) headerVersion() int16 {
176 if r.Version >= 6 {
177 return 1
178 }
179
180 return 0
181}
182
183func (r *OffsetFetchResponse) isValidVersion() bool {
184 return r.Version >= 0 && r.Version <= 7
185}
186
187func (r *OffsetFetchResponse) isFlexible() bool {
188 return r.isFlexibleVersion(r.Version)
189}
190
191func (r *OffsetFetchResponse) isFlexibleVersion(version int16) bool {
192 return version >= 6
193}
194
195func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
196 switch r.Version {
197 case 7:
198 return V2_5_0_0
199 case 6:
200 return V2_4_0_0
201 case 5:
202 return V2_1_0_0
203 case 4:
204 return V2_0_0_0
205 case 3:
206 return V0_11_0_0
207 case 2:
208 return V0_10_2_0
209 case 1:
210 return V0_8_2_0
211 case 0:
212 return V0_8_2_0
213 default:
214 return V2_5_0_0
215 }
216}
217
218func (r *OffsetFetchResponse) throttleTime() time.Duration {
219 return time.Duration(r.ThrottleTimeMs) * time.Millisecond
220}
221
222func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {
223 if r.Blocks == nil {
224 return nil
225 }
226
227 if r.Blocks[topic] == nil {
228 return nil
229 }
230
231 return r.Blocks[topic][partition]
232}
233
234func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock) {
235 if r.Blocks == nil {
236 r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock)
237 }
238 partitions := r.Blocks[topic]
239 if partitions == nil {
240 partitions = make(map[int32]*OffsetFetchResponseBlock)
241 r.Blocks[topic] = partitions
242 }
243 partitions[partition] = block
244}