blob: 196c075c5babd29983045bbe4ec7682429cd8d81 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import "time"
4
5// PartitionMetadata contains each partition in the topic.
6type PartitionMetadata struct {
7 // Version defines the protocol version to use for encode and decode
8 Version int16
9 // Err contains the partition error, or 0 if there was no error.
10 Err KError
11 // ID contains the partition index.
12 ID int32
13 // Leader contains the ID of the leader broker.
14 Leader int32
15 // LeaderEpoch contains the leader epoch of this partition.
16 LeaderEpoch int32
17 // Replicas contains the set of all nodes that host this partition.
18 Replicas []int32
19 // Isr contains the set of nodes that are in sync with the leader for this partition.
20 Isr []int32
21 // OfflineReplicas contains the set of offline replicas of this partition.
22 OfflineReplicas []int32
23}
24
25func (p *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) {
26 p.Version = version
27 p.Err, err = pd.getKError()
28 if err != nil {
29 return err
30 }
31
32 if p.ID, err = pd.getInt32(); err != nil {
33 return err
34 }
35
36 if p.Leader, err = pd.getInt32(); err != nil {
37 return err
38 }
39
40 if p.Version >= 7 {
41 if p.LeaderEpoch, err = pd.getInt32(); err != nil {
42 return err
43 }
44 }
45
46 p.Replicas, err = pd.getInt32Array()
47 if err != nil {
48 return err
49 }
50
51 p.Isr, err = pd.getInt32Array()
52 if err != nil {
53 return err
54 }
55
56 if p.Version >= 5 {
57 p.OfflineReplicas, err = pd.getInt32Array()
58 if err != nil {
59 return err
60 }
61 }
62
63 _, err = pd.getEmptyTaggedFieldArray()
64 return err
65}
66
67func (p *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) {
68 p.Version = version
69 pe.putKError(p.Err)
70
71 pe.putInt32(p.ID)
72
73 pe.putInt32(p.Leader)
74
75 if p.Version >= 7 {
76 pe.putInt32(p.LeaderEpoch)
77 }
78
79 err = pe.putInt32Array(p.Replicas)
80 if err != nil {
81 return err
82 }
83
84 err = pe.putInt32Array(p.Isr)
85 if err != nil {
86 return err
87 }
88
89 if p.Version >= 5 {
90 err = pe.putInt32Array(p.OfflineReplicas)
91 if err != nil {
92 return err
93 }
94 }
95
96 pe.putEmptyTaggedFieldArray()
97 return nil
98}
99
100// TopicMetadata contains each topic in the response.
101type TopicMetadata struct {
102 // Version defines the protocol version to use for encode and decode
103 Version int16
104 // Err contains the topic error, or 0 if there was no error.
105 Err KError
106 // Name contains the topic name.
107 Name string
108 Uuid Uuid
109 // IsInternal contains a True if the topic is internal.
110 IsInternal bool
111 // Partitions contains each partition in the topic.
112 Partitions []*PartitionMetadata
113 TopicAuthorizedOperations int32 // Only valid for Version >= 8
114}
115
116func (t *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
117 t.Version = version
118 t.Err, err = pd.getKError()
119 if err != nil {
120 return err
121 }
122
123 t.Name, err = pd.getString()
124 if err != nil {
125 return err
126 }
127
128 if t.Version >= 10 {
129 uuid, err := pd.getRawBytes(16)
130 if err != nil {
131 return err
132 }
133 t.Uuid = [16]byte{}
134 for i := 0; i < 16; i++ {
135 t.Uuid[i] = uuid[i]
136 }
137 }
138
139 if t.Version >= 1 {
140 t.IsInternal, err = pd.getBool()
141 if err != nil {
142 return err
143 }
144 }
145
146 n, err := pd.getArrayLength()
147 if err != nil {
148 return err
149 }
150 t.Partitions = make([]*PartitionMetadata, n)
151 for i := 0; i < n; i++ {
152 block := &PartitionMetadata{}
153 if err := block.decode(pd, t.Version); err != nil {
154 return err
155 }
156 t.Partitions[i] = block
157 }
158
159 if t.Version >= 8 {
160 t.TopicAuthorizedOperations, err = pd.getInt32()
161 if err != nil {
162 return err
163 }
164 }
165
166 _, err = pd.getEmptyTaggedFieldArray()
167 return err
168}
169
170func (t *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
171 t.Version = version
172 pe.putKError(t.Err)
173
174 err = pe.putString(t.Name)
175 if err != nil {
176 return err
177 }
178
179 if t.Version >= 10 {
180 err = pe.putRawBytes(t.Uuid[:])
181 if err != nil {
182 return err
183 }
184 }
185
186 if t.Version >= 1 {
187 pe.putBool(t.IsInternal)
188 }
189
190 err = pe.putArrayLength(len(t.Partitions))
191 if err != nil {
192 return err
193 }
194 for _, block := range t.Partitions {
195 if err := block.encode(pe, t.Version); err != nil {
196 return err
197 }
198 }
199
200 if t.Version >= 8 {
201 pe.putInt32(t.TopicAuthorizedOperations)
202 }
203
204 pe.putEmptyTaggedFieldArray()
205 return nil
206}
207
208type MetadataResponse struct {
209 // Version defines the protocol version to use for encode and decode
210 Version int16
211 // ThrottleTimeMs contains the duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
212 ThrottleTimeMs int32
213 // Brokers contains each broker in the response.
214 Brokers []*Broker
215 // ClusterID contains the cluster ID that responding broker belongs to.
216 ClusterID *string
217 // ControllerID contains the ID of the controller broker.
218 ControllerID int32
219 // Topics contains each topic in the response.
220 Topics []*TopicMetadata
221 ClusterAuthorizedOperations int32 // Only valid for Version >= 8
222}
223
224func (r *MetadataResponse) setVersion(v int16) {
225 r.Version = v
226}
227
228func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
229 r.Version = version
230 if r.Version >= 3 {
231 if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
232 return err
233 }
234 }
235
236 brokerArrayLen, err := pd.getArrayLength()
237 if err != nil {
238 return err
239 }
240
241 r.Brokers = make([]*Broker, brokerArrayLen)
242 for i := 0; i < brokerArrayLen; i++ {
243 r.Brokers[i] = new(Broker)
244 err = r.Brokers[i].decode(pd, version)
245 if err != nil {
246 return err
247 }
248 }
249
250 if r.Version >= 2 {
251 r.ClusterID, err = pd.getNullableString()
252 if err != nil {
253 return err
254 }
255 }
256
257 if r.Version >= 1 {
258 if r.ControllerID, err = pd.getInt32(); err != nil {
259 return err
260 }
261 }
262
263 topicArrayLen, err := pd.getArrayLength()
264 if err != nil {
265 return err
266 }
267
268 r.Topics = make([]*TopicMetadata, topicArrayLen)
269 for i := 0; i < topicArrayLen; i++ {
270 r.Topics[i] = new(TopicMetadata)
271 err = r.Topics[i].decode(pd, version)
272 if err != nil {
273 return err
274 }
275 }
276
277 if r.Version >= 8 {
278 r.ClusterAuthorizedOperations, err = pd.getInt32()
279 if err != nil {
280 return err
281 }
282 }
283
284 _, err = pd.getEmptyTaggedFieldArray()
285 return err
286}
287
288func (r *MetadataResponse) encode(pe packetEncoder) (err error) {
289 if r.Version >= 3 {
290 pe.putInt32(r.ThrottleTimeMs)
291 }
292
293 err = pe.putArrayLength(len(r.Brokers))
294 if err != nil {
295 return err
296 }
297
298 for _, broker := range r.Brokers {
299 err = broker.encode(pe, r.Version)
300 if err != nil {
301 return err
302 }
303 }
304
305 if r.Version >= 2 {
306 err = pe.putNullableString(r.ClusterID)
307 if err != nil {
308 return err
309 }
310 }
311
312 if r.Version >= 1 {
313 pe.putInt32(r.ControllerID)
314 }
315
316 err = pe.putArrayLength(len(r.Topics))
317 if err != nil {
318 return err
319 }
320 for _, block := range r.Topics {
321 if err := block.encode(pe, r.Version); err != nil {
322 return err
323 }
324 }
325
326 if r.Version >= 8 {
327 pe.putInt32(r.ClusterAuthorizedOperations)
328 }
329
330 pe.putEmptyTaggedFieldArray()
331 return nil
332}
333
334func (r *MetadataResponse) key() int16 {
335 return apiKeyMetadata
336}
337
338func (r *MetadataResponse) version() int16 {
339 return r.Version
340}
341
342func (r *MetadataResponse) headerVersion() int16 {
343 if r.Version < 9 {
344 return 0
345 } else {
346 return 1
347 }
348}
349
350func (r *MetadataResponse) isValidVersion() bool {
351 return r.Version >= 0 && r.Version <= 10
352}
353
354func (r *MetadataResponse) isFlexible() bool {
355 return r.isFlexibleVersion(r.Version)
356}
357
358func (r *MetadataResponse) isFlexibleVersion(version int16) bool {
359 return version >= 9
360}
361
362func (r *MetadataResponse) requiredVersion() KafkaVersion {
363 switch r.Version {
364 case 10:
365 return V2_8_0_0
366 case 9:
367 return V2_4_0_0
368 case 8:
369 return V2_3_0_0
370 case 7:
371 return V2_1_0_0
372 case 6:
373 return V2_0_0_0
374 case 5:
375 return V1_0_0_0
376 case 3, 4:
377 return V0_11_0_0
378 case 2:
379 return V0_10_1_0
380 case 1:
381 return V0_10_0_0
382 case 0:
383 return V0_8_2_0
384 default:
385 return V2_8_0_0
386 }
387}
388
389func (r *MetadataResponse) throttleTime() time.Duration {
390 return time.Duration(r.ThrottleTimeMs) * time.Millisecond
391}
392
393// testing API
394
395func (r *MetadataResponse) AddBroker(addr string, id int32) {
396 r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr})
397}
398
399func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
400 var tmatch *TopicMetadata
401
402 for _, tm := range r.Topics {
403 if tm.Name == topic {
404 tmatch = tm
405 goto foundTopic
406 }
407 }
408
409 tmatch = new(TopicMetadata)
410 tmatch.Name = topic
411 r.Topics = append(r.Topics, tmatch)
412
413foundTopic:
414
415 tmatch.Err = err
416 return tmatch
417}
418
419func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) {
420 tmatch := r.AddTopic(topic, ErrNoError)
421 var pmatch *PartitionMetadata
422
423 for _, pm := range tmatch.Partitions {
424 if pm.ID == partition {
425 pmatch = pm
426 goto foundPartition
427 }
428 }
429
430 pmatch = new(PartitionMetadata)
431 pmatch.ID = partition
432 tmatch.Partitions = append(tmatch.Partitions, pmatch)
433
434foundPartition:
435 pmatch.Leader = brokerID
436 pmatch.Replicas = replicas
437 if pmatch.Replicas == nil {
438 pmatch.Replicas = []int32{}
439 }
440 pmatch.Isr = isr
441 if pmatch.Isr == nil {
442 pmatch.Isr = []int32{}
443 }
444 pmatch.OfflineReplicas = offline
445 if pmatch.OfflineReplicas == nil {
446 pmatch.OfflineReplicas = []int32{}
447 }
448 pmatch.Err = err
449}