| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 1 | package sarama |
| 2 | |
| 3 | import "time" |
| 4 | |
| 5 | // PartitionMetadata contains each partition in the topic. |
| 6 | type PartitionMetadata struct { |
| 7 | // Version defines the protocol version to use for encode and decode |
| 8 | Version int16 |
| 9 | // Err contains the partition error, or 0 if there was no error. |
| 10 | Err KError |
| 11 | // ID contains the partition index. |
| 12 | ID int32 |
| 13 | // Leader contains the ID of the leader broker. |
| 14 | Leader int32 |
| 15 | // LeaderEpoch contains the leader epoch of this partition. |
| 16 | LeaderEpoch int32 |
| 17 | // Replicas contains the set of all nodes that host this partition. |
| 18 | Replicas []int32 |
| 19 | // Isr contains the set of nodes that are in sync with the leader for this partition. |
| 20 | Isr []int32 |
| 21 | // OfflineReplicas contains the set of offline replicas of this partition. |
| 22 | OfflineReplicas []int32 |
| 23 | } |
| 24 | |
| 25 | func (p *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) { |
| 26 | p.Version = version |
| 27 | p.Err, err = pd.getKError() |
| 28 | if err != nil { |
| 29 | return err |
| 30 | } |
| 31 | |
| 32 | if p.ID, err = pd.getInt32(); err != nil { |
| 33 | return err |
| 34 | } |
| 35 | |
| 36 | if p.Leader, err = pd.getInt32(); err != nil { |
| 37 | return err |
| 38 | } |
| 39 | |
| 40 | if p.Version >= 7 { |
| 41 | if p.LeaderEpoch, err = pd.getInt32(); err != nil { |
| 42 | return err |
| 43 | } |
| 44 | } |
| 45 | |
| 46 | p.Replicas, err = pd.getInt32Array() |
| 47 | if err != nil { |
| 48 | return err |
| 49 | } |
| 50 | |
| 51 | p.Isr, err = pd.getInt32Array() |
| 52 | if err != nil { |
| 53 | return err |
| 54 | } |
| 55 | |
| 56 | if p.Version >= 5 { |
| 57 | p.OfflineReplicas, err = pd.getInt32Array() |
| 58 | if err != nil { |
| 59 | return err |
| 60 | } |
| 61 | } |
| 62 | |
| 63 | _, err = pd.getEmptyTaggedFieldArray() |
| 64 | return err |
| 65 | } |
| 66 | |
| 67 | func (p *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) { |
| 68 | p.Version = version |
| 69 | pe.putKError(p.Err) |
| 70 | |
| 71 | pe.putInt32(p.ID) |
| 72 | |
| 73 | pe.putInt32(p.Leader) |
| 74 | |
| 75 | if p.Version >= 7 { |
| 76 | pe.putInt32(p.LeaderEpoch) |
| 77 | } |
| 78 | |
| 79 | err = pe.putInt32Array(p.Replicas) |
| 80 | if err != nil { |
| 81 | return err |
| 82 | } |
| 83 | |
| 84 | err = pe.putInt32Array(p.Isr) |
| 85 | if err != nil { |
| 86 | return err |
| 87 | } |
| 88 | |
| 89 | if p.Version >= 5 { |
| 90 | err = pe.putInt32Array(p.OfflineReplicas) |
| 91 | if err != nil { |
| 92 | return err |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | pe.putEmptyTaggedFieldArray() |
| 97 | return nil |
| 98 | } |
| 99 | |
| 100 | // TopicMetadata contains each topic in the response. |
| 101 | type TopicMetadata struct { |
| 102 | // Version defines the protocol version to use for encode and decode |
| 103 | Version int16 |
| 104 | // Err contains the topic error, or 0 if there was no error. |
| 105 | Err KError |
| 106 | // Name contains the topic name. |
| 107 | Name string |
| 108 | Uuid Uuid |
| 109 | // IsInternal contains a True if the topic is internal. |
| 110 | IsInternal bool |
| 111 | // Partitions contains each partition in the topic. |
| 112 | Partitions []*PartitionMetadata |
| 113 | TopicAuthorizedOperations int32 // Only valid for Version >= 8 |
| 114 | } |
| 115 | |
| 116 | func (t *TopicMetadata) decode(pd packetDecoder, version int16) (err error) { |
| 117 | t.Version = version |
| 118 | t.Err, err = pd.getKError() |
| 119 | if err != nil { |
| 120 | return err |
| 121 | } |
| 122 | |
| 123 | t.Name, err = pd.getString() |
| 124 | if err != nil { |
| 125 | return err |
| 126 | } |
| 127 | |
| 128 | if t.Version >= 10 { |
| 129 | uuid, err := pd.getRawBytes(16) |
| 130 | if err != nil { |
| 131 | return err |
| 132 | } |
| 133 | t.Uuid = [16]byte{} |
| 134 | for i := 0; i < 16; i++ { |
| 135 | t.Uuid[i] = uuid[i] |
| 136 | } |
| 137 | } |
| 138 | |
| 139 | if t.Version >= 1 { |
| 140 | t.IsInternal, err = pd.getBool() |
| 141 | if err != nil { |
| 142 | return err |
| 143 | } |
| 144 | } |
| 145 | |
| 146 | n, err := pd.getArrayLength() |
| 147 | if err != nil { |
| 148 | return err |
| 149 | } |
| 150 | t.Partitions = make([]*PartitionMetadata, n) |
| 151 | for i := 0; i < n; i++ { |
| 152 | block := &PartitionMetadata{} |
| 153 | if err := block.decode(pd, t.Version); err != nil { |
| 154 | return err |
| 155 | } |
| 156 | t.Partitions[i] = block |
| 157 | } |
| 158 | |
| 159 | if t.Version >= 8 { |
| 160 | t.TopicAuthorizedOperations, err = pd.getInt32() |
| 161 | if err != nil { |
| 162 | return err |
| 163 | } |
| 164 | } |
| 165 | |
| 166 | _, err = pd.getEmptyTaggedFieldArray() |
| 167 | return err |
| 168 | } |
| 169 | |
| 170 | func (t *TopicMetadata) encode(pe packetEncoder, version int16) (err error) { |
| 171 | t.Version = version |
| 172 | pe.putKError(t.Err) |
| 173 | |
| 174 | err = pe.putString(t.Name) |
| 175 | if err != nil { |
| 176 | return err |
| 177 | } |
| 178 | |
| 179 | if t.Version >= 10 { |
| 180 | err = pe.putRawBytes(t.Uuid[:]) |
| 181 | if err != nil { |
| 182 | return err |
| 183 | } |
| 184 | } |
| 185 | |
| 186 | if t.Version >= 1 { |
| 187 | pe.putBool(t.IsInternal) |
| 188 | } |
| 189 | |
| 190 | err = pe.putArrayLength(len(t.Partitions)) |
| 191 | if err != nil { |
| 192 | return err |
| 193 | } |
| 194 | for _, block := range t.Partitions { |
| 195 | if err := block.encode(pe, t.Version); err != nil { |
| 196 | return err |
| 197 | } |
| 198 | } |
| 199 | |
| 200 | if t.Version >= 8 { |
| 201 | pe.putInt32(t.TopicAuthorizedOperations) |
| 202 | } |
| 203 | |
| 204 | pe.putEmptyTaggedFieldArray() |
| 205 | return nil |
| 206 | } |
| 207 | |
| 208 | type MetadataResponse struct { |
| 209 | // Version defines the protocol version to use for encode and decode |
| 210 | Version int16 |
| 211 | // ThrottleTimeMs contains the duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. |
| 212 | ThrottleTimeMs int32 |
| 213 | // Brokers contains each broker in the response. |
| 214 | Brokers []*Broker |
| 215 | // ClusterID contains the cluster ID that responding broker belongs to. |
| 216 | ClusterID *string |
| 217 | // ControllerID contains the ID of the controller broker. |
| 218 | ControllerID int32 |
| 219 | // Topics contains each topic in the response. |
| 220 | Topics []*TopicMetadata |
| 221 | ClusterAuthorizedOperations int32 // Only valid for Version >= 8 |
| 222 | } |
| 223 | |
| 224 | func (r *MetadataResponse) setVersion(v int16) { |
| 225 | r.Version = v |
| 226 | } |
| 227 | |
| 228 | func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { |
| 229 | r.Version = version |
| 230 | if r.Version >= 3 { |
| 231 | if r.ThrottleTimeMs, err = pd.getInt32(); err != nil { |
| 232 | return err |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | brokerArrayLen, err := pd.getArrayLength() |
| 237 | if err != nil { |
| 238 | return err |
| 239 | } |
| 240 | |
| 241 | r.Brokers = make([]*Broker, brokerArrayLen) |
| 242 | for i := 0; i < brokerArrayLen; i++ { |
| 243 | r.Brokers[i] = new(Broker) |
| 244 | err = r.Brokers[i].decode(pd, version) |
| 245 | if err != nil { |
| 246 | return err |
| 247 | } |
| 248 | } |
| 249 | |
| 250 | if r.Version >= 2 { |
| 251 | r.ClusterID, err = pd.getNullableString() |
| 252 | if err != nil { |
| 253 | return err |
| 254 | } |
| 255 | } |
| 256 | |
| 257 | if r.Version >= 1 { |
| 258 | if r.ControllerID, err = pd.getInt32(); err != nil { |
| 259 | return err |
| 260 | } |
| 261 | } |
| 262 | |
| 263 | topicArrayLen, err := pd.getArrayLength() |
| 264 | if err != nil { |
| 265 | return err |
| 266 | } |
| 267 | |
| 268 | r.Topics = make([]*TopicMetadata, topicArrayLen) |
| 269 | for i := 0; i < topicArrayLen; i++ { |
| 270 | r.Topics[i] = new(TopicMetadata) |
| 271 | err = r.Topics[i].decode(pd, version) |
| 272 | if err != nil { |
| 273 | return err |
| 274 | } |
| 275 | } |
| 276 | |
| 277 | if r.Version >= 8 { |
| 278 | r.ClusterAuthorizedOperations, err = pd.getInt32() |
| 279 | if err != nil { |
| 280 | return err |
| 281 | } |
| 282 | } |
| 283 | |
| 284 | _, err = pd.getEmptyTaggedFieldArray() |
| 285 | return err |
| 286 | } |
| 287 | |
| 288 | func (r *MetadataResponse) encode(pe packetEncoder) (err error) { |
| 289 | if r.Version >= 3 { |
| 290 | pe.putInt32(r.ThrottleTimeMs) |
| 291 | } |
| 292 | |
| 293 | err = pe.putArrayLength(len(r.Brokers)) |
| 294 | if err != nil { |
| 295 | return err |
| 296 | } |
| 297 | |
| 298 | for _, broker := range r.Brokers { |
| 299 | err = broker.encode(pe, r.Version) |
| 300 | if err != nil { |
| 301 | return err |
| 302 | } |
| 303 | } |
| 304 | |
| 305 | if r.Version >= 2 { |
| 306 | err = pe.putNullableString(r.ClusterID) |
| 307 | if err != nil { |
| 308 | return err |
| 309 | } |
| 310 | } |
| 311 | |
| 312 | if r.Version >= 1 { |
| 313 | pe.putInt32(r.ControllerID) |
| 314 | } |
| 315 | |
| 316 | err = pe.putArrayLength(len(r.Topics)) |
| 317 | if err != nil { |
| 318 | return err |
| 319 | } |
| 320 | for _, block := range r.Topics { |
| 321 | if err := block.encode(pe, r.Version); err != nil { |
| 322 | return err |
| 323 | } |
| 324 | } |
| 325 | |
| 326 | if r.Version >= 8 { |
| 327 | pe.putInt32(r.ClusterAuthorizedOperations) |
| 328 | } |
| 329 | |
| 330 | pe.putEmptyTaggedFieldArray() |
| 331 | return nil |
| 332 | } |
| 333 | |
| 334 | func (r *MetadataResponse) key() int16 { |
| 335 | return apiKeyMetadata |
| 336 | } |
| 337 | |
| 338 | func (r *MetadataResponse) version() int16 { |
| 339 | return r.Version |
| 340 | } |
| 341 | |
| 342 | func (r *MetadataResponse) headerVersion() int16 { |
| 343 | if r.Version < 9 { |
| 344 | return 0 |
| 345 | } else { |
| 346 | return 1 |
| 347 | } |
| 348 | } |
| 349 | |
| 350 | func (r *MetadataResponse) isValidVersion() bool { |
| 351 | return r.Version >= 0 && r.Version <= 10 |
| 352 | } |
| 353 | |
| 354 | func (r *MetadataResponse) isFlexible() bool { |
| 355 | return r.isFlexibleVersion(r.Version) |
| 356 | } |
| 357 | |
| 358 | func (r *MetadataResponse) isFlexibleVersion(version int16) bool { |
| 359 | return version >= 9 |
| 360 | } |
| 361 | |
| 362 | func (r *MetadataResponse) requiredVersion() KafkaVersion { |
| 363 | switch r.Version { |
| 364 | case 10: |
| 365 | return V2_8_0_0 |
| 366 | case 9: |
| 367 | return V2_4_0_0 |
| 368 | case 8: |
| 369 | return V2_3_0_0 |
| 370 | case 7: |
| 371 | return V2_1_0_0 |
| 372 | case 6: |
| 373 | return V2_0_0_0 |
| 374 | case 5: |
| 375 | return V1_0_0_0 |
| 376 | case 3, 4: |
| 377 | return V0_11_0_0 |
| 378 | case 2: |
| 379 | return V0_10_1_0 |
| 380 | case 1: |
| 381 | return V0_10_0_0 |
| 382 | case 0: |
| 383 | return V0_8_2_0 |
| 384 | default: |
| 385 | return V2_8_0_0 |
| 386 | } |
| 387 | } |
| 388 | |
| 389 | func (r *MetadataResponse) throttleTime() time.Duration { |
| 390 | return time.Duration(r.ThrottleTimeMs) * time.Millisecond |
| 391 | } |
| 392 | |
| 393 | // testing API |
| 394 | |
| 395 | func (r *MetadataResponse) AddBroker(addr string, id int32) { |
| 396 | r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr}) |
| 397 | } |
| 398 | |
| 399 | func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata { |
| 400 | var tmatch *TopicMetadata |
| 401 | |
| 402 | for _, tm := range r.Topics { |
| 403 | if tm.Name == topic { |
| 404 | tmatch = tm |
| 405 | goto foundTopic |
| 406 | } |
| 407 | } |
| 408 | |
| 409 | tmatch = new(TopicMetadata) |
| 410 | tmatch.Name = topic |
| 411 | r.Topics = append(r.Topics, tmatch) |
| 412 | |
| 413 | foundTopic: |
| 414 | |
| 415 | tmatch.Err = err |
| 416 | return tmatch |
| 417 | } |
| 418 | |
| 419 | func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) { |
| 420 | tmatch := r.AddTopic(topic, ErrNoError) |
| 421 | var pmatch *PartitionMetadata |
| 422 | |
| 423 | for _, pm := range tmatch.Partitions { |
| 424 | if pm.ID == partition { |
| 425 | pmatch = pm |
| 426 | goto foundPartition |
| 427 | } |
| 428 | } |
| 429 | |
| 430 | pmatch = new(PartitionMetadata) |
| 431 | pmatch.ID = partition |
| 432 | tmatch.Partitions = append(tmatch.Partitions, pmatch) |
| 433 | |
| 434 | foundPartition: |
| 435 | pmatch.Leader = brokerID |
| 436 | pmatch.Replicas = replicas |
| 437 | if pmatch.Replicas == nil { |
| 438 | pmatch.Replicas = []int32{} |
| 439 | } |
| 440 | pmatch.Isr = isr |
| 441 | if pmatch.Isr == nil { |
| 442 | pmatch.Isr = []int32{} |
| 443 | } |
| 444 | pmatch.OfflineReplicas = offline |
| 445 | if pmatch.OfflineReplicas == nil { |
| 446 | pmatch.OfflineReplicas = []int32{} |
| 447 | } |
| 448 | pmatch.Err = err |
| 449 | } |