| package sarama |
| |
| import "time" |
| |
| // PartitionMetadata contains each partition in the topic. |
| type PartitionMetadata struct { |
| // Version defines the protocol version to use for encode and decode |
| Version int16 |
| // Err contains the partition error, or 0 if there was no error. |
| Err KError |
| // ID contains the partition index. |
| ID int32 |
| // Leader contains the ID of the leader broker. |
| Leader int32 |
| // LeaderEpoch contains the leader epoch of this partition. |
| LeaderEpoch int32 |
| // Replicas contains the set of all nodes that host this partition. |
| Replicas []int32 |
| // Isr contains the set of nodes that are in sync with the leader for this partition. |
| Isr []int32 |
| // OfflineReplicas contains the set of offline replicas of this partition. |
| OfflineReplicas []int32 |
| } |
| |
| func (p *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) { |
| p.Version = version |
| p.Err, err = pd.getKError() |
| if err != nil { |
| return err |
| } |
| |
| if p.ID, err = pd.getInt32(); err != nil { |
| return err |
| } |
| |
| if p.Leader, err = pd.getInt32(); err != nil { |
| return err |
| } |
| |
| if p.Version >= 7 { |
| if p.LeaderEpoch, err = pd.getInt32(); err != nil { |
| return err |
| } |
| } |
| |
| p.Replicas, err = pd.getInt32Array() |
| if err != nil { |
| return err |
| } |
| |
| p.Isr, err = pd.getInt32Array() |
| if err != nil { |
| return err |
| } |
| |
| if p.Version >= 5 { |
| p.OfflineReplicas, err = pd.getInt32Array() |
| if err != nil { |
| return err |
| } |
| } |
| |
| _, err = pd.getEmptyTaggedFieldArray() |
| return err |
| } |
| |
| func (p *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) { |
| p.Version = version |
| pe.putKError(p.Err) |
| |
| pe.putInt32(p.ID) |
| |
| pe.putInt32(p.Leader) |
| |
| if p.Version >= 7 { |
| pe.putInt32(p.LeaderEpoch) |
| } |
| |
| err = pe.putInt32Array(p.Replicas) |
| if err != nil { |
| return err |
| } |
| |
| err = pe.putInt32Array(p.Isr) |
| if err != nil { |
| return err |
| } |
| |
| if p.Version >= 5 { |
| err = pe.putInt32Array(p.OfflineReplicas) |
| if err != nil { |
| return err |
| } |
| } |
| |
| pe.putEmptyTaggedFieldArray() |
| return nil |
| } |
| |
| // TopicMetadata contains each topic in the response. |
| type TopicMetadata struct { |
| // Version defines the protocol version to use for encode and decode |
| Version int16 |
| // Err contains the topic error, or 0 if there was no error. |
| Err KError |
| // Name contains the topic name. |
| Name string |
| Uuid Uuid |
| // IsInternal contains a True if the topic is internal. |
| IsInternal bool |
| // Partitions contains each partition in the topic. |
| Partitions []*PartitionMetadata |
| TopicAuthorizedOperations int32 // Only valid for Version >= 8 |
| } |
| |
| func (t *TopicMetadata) decode(pd packetDecoder, version int16) (err error) { |
| t.Version = version |
| t.Err, err = pd.getKError() |
| if err != nil { |
| return err |
| } |
| |
| t.Name, err = pd.getString() |
| if err != nil { |
| return err |
| } |
| |
| if t.Version >= 10 { |
| uuid, err := pd.getRawBytes(16) |
| if err != nil { |
| return err |
| } |
| t.Uuid = [16]byte{} |
| for i := 0; i < 16; i++ { |
| t.Uuid[i] = uuid[i] |
| } |
| } |
| |
| if t.Version >= 1 { |
| t.IsInternal, err = pd.getBool() |
| if err != nil { |
| return err |
| } |
| } |
| |
| n, err := pd.getArrayLength() |
| if err != nil { |
| return err |
| } |
| t.Partitions = make([]*PartitionMetadata, n) |
| for i := 0; i < n; i++ { |
| block := &PartitionMetadata{} |
| if err := block.decode(pd, t.Version); err != nil { |
| return err |
| } |
| t.Partitions[i] = block |
| } |
| |
| if t.Version >= 8 { |
| t.TopicAuthorizedOperations, err = pd.getInt32() |
| if err != nil { |
| return err |
| } |
| } |
| |
| _, err = pd.getEmptyTaggedFieldArray() |
| return err |
| } |
| |
| func (t *TopicMetadata) encode(pe packetEncoder, version int16) (err error) { |
| t.Version = version |
| pe.putKError(t.Err) |
| |
| err = pe.putString(t.Name) |
| if err != nil { |
| return err |
| } |
| |
| if t.Version >= 10 { |
| err = pe.putRawBytes(t.Uuid[:]) |
| if err != nil { |
| return err |
| } |
| } |
| |
| if t.Version >= 1 { |
| pe.putBool(t.IsInternal) |
| } |
| |
| err = pe.putArrayLength(len(t.Partitions)) |
| if err != nil { |
| return err |
| } |
| for _, block := range t.Partitions { |
| if err := block.encode(pe, t.Version); err != nil { |
| return err |
| } |
| } |
| |
| if t.Version >= 8 { |
| pe.putInt32(t.TopicAuthorizedOperations) |
| } |
| |
| pe.putEmptyTaggedFieldArray() |
| return nil |
| } |
| |
| type MetadataResponse struct { |
| // Version defines the protocol version to use for encode and decode |
| Version int16 |
| // ThrottleTimeMs contains the duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. |
| ThrottleTimeMs int32 |
| // Brokers contains each broker in the response. |
| Brokers []*Broker |
| // ClusterID contains the cluster ID that responding broker belongs to. |
| ClusterID *string |
| // ControllerID contains the ID of the controller broker. |
| ControllerID int32 |
| // Topics contains each topic in the response. |
| Topics []*TopicMetadata |
| ClusterAuthorizedOperations int32 // Only valid for Version >= 8 |
| } |
| |
| func (r *MetadataResponse) setVersion(v int16) { |
| r.Version = v |
| } |
| |
| func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { |
| r.Version = version |
| if r.Version >= 3 { |
| if r.ThrottleTimeMs, err = pd.getInt32(); err != nil { |
| return err |
| } |
| } |
| |
| brokerArrayLen, err := pd.getArrayLength() |
| if err != nil { |
| return err |
| } |
| |
| r.Brokers = make([]*Broker, brokerArrayLen) |
| for i := 0; i < brokerArrayLen; i++ { |
| r.Brokers[i] = new(Broker) |
| err = r.Brokers[i].decode(pd, version) |
| if err != nil { |
| return err |
| } |
| } |
| |
| if r.Version >= 2 { |
| r.ClusterID, err = pd.getNullableString() |
| if err != nil { |
| return err |
| } |
| } |
| |
| if r.Version >= 1 { |
| if r.ControllerID, err = pd.getInt32(); err != nil { |
| return err |
| } |
| } |
| |
| topicArrayLen, err := pd.getArrayLength() |
| if err != nil { |
| return err |
| } |
| |
| r.Topics = make([]*TopicMetadata, topicArrayLen) |
| for i := 0; i < topicArrayLen; i++ { |
| r.Topics[i] = new(TopicMetadata) |
| err = r.Topics[i].decode(pd, version) |
| if err != nil { |
| return err |
| } |
| } |
| |
| if r.Version >= 8 { |
| r.ClusterAuthorizedOperations, err = pd.getInt32() |
| if err != nil { |
| return err |
| } |
| } |
| |
| _, err = pd.getEmptyTaggedFieldArray() |
| return err |
| } |
| |
| func (r *MetadataResponse) encode(pe packetEncoder) (err error) { |
| if r.Version >= 3 { |
| pe.putInt32(r.ThrottleTimeMs) |
| } |
| |
| err = pe.putArrayLength(len(r.Brokers)) |
| if err != nil { |
| return err |
| } |
| |
| for _, broker := range r.Brokers { |
| err = broker.encode(pe, r.Version) |
| if err != nil { |
| return err |
| } |
| } |
| |
| if r.Version >= 2 { |
| err = pe.putNullableString(r.ClusterID) |
| if err != nil { |
| return err |
| } |
| } |
| |
| if r.Version >= 1 { |
| pe.putInt32(r.ControllerID) |
| } |
| |
| err = pe.putArrayLength(len(r.Topics)) |
| if err != nil { |
| return err |
| } |
| for _, block := range r.Topics { |
| if err := block.encode(pe, r.Version); err != nil { |
| return err |
| } |
| } |
| |
| if r.Version >= 8 { |
| pe.putInt32(r.ClusterAuthorizedOperations) |
| } |
| |
| pe.putEmptyTaggedFieldArray() |
| return nil |
| } |
| |
| func (r *MetadataResponse) key() int16 { |
| return apiKeyMetadata |
| } |
| |
| func (r *MetadataResponse) version() int16 { |
| return r.Version |
| } |
| |
| func (r *MetadataResponse) headerVersion() int16 { |
| if r.Version < 9 { |
| return 0 |
| } else { |
| return 1 |
| } |
| } |
| |
| func (r *MetadataResponse) isValidVersion() bool { |
| return r.Version >= 0 && r.Version <= 10 |
| } |
| |
| func (r *MetadataResponse) isFlexible() bool { |
| return r.isFlexibleVersion(r.Version) |
| } |
| |
| func (r *MetadataResponse) isFlexibleVersion(version int16) bool { |
| return version >= 9 |
| } |
| |
| func (r *MetadataResponse) requiredVersion() KafkaVersion { |
| switch r.Version { |
| case 10: |
| return V2_8_0_0 |
| case 9: |
| return V2_4_0_0 |
| case 8: |
| return V2_3_0_0 |
| case 7: |
| return V2_1_0_0 |
| case 6: |
| return V2_0_0_0 |
| case 5: |
| return V1_0_0_0 |
| case 3, 4: |
| return V0_11_0_0 |
| case 2: |
| return V0_10_1_0 |
| case 1: |
| return V0_10_0_0 |
| case 0: |
| return V0_8_2_0 |
| default: |
| return V2_8_0_0 |
| } |
| } |
| |
| func (r *MetadataResponse) throttleTime() time.Duration { |
| return time.Duration(r.ThrottleTimeMs) * time.Millisecond |
| } |
| |
| // testing API |
| |
| func (r *MetadataResponse) AddBroker(addr string, id int32) { |
| r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr}) |
| } |
| |
| func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata { |
| var tmatch *TopicMetadata |
| |
| for _, tm := range r.Topics { |
| if tm.Name == topic { |
| tmatch = tm |
| goto foundTopic |
| } |
| } |
| |
| tmatch = new(TopicMetadata) |
| tmatch.Name = topic |
| r.Topics = append(r.Topics, tmatch) |
| |
| foundTopic: |
| |
| tmatch.Err = err |
| return tmatch |
| } |
| |
| func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) { |
| tmatch := r.AddTopic(topic, ErrNoError) |
| var pmatch *PartitionMetadata |
| |
| for _, pm := range tmatch.Partitions { |
| if pm.ID == partition { |
| pmatch = pm |
| goto foundPartition |
| } |
| } |
| |
| pmatch = new(PartitionMetadata) |
| pmatch.ID = partition |
| tmatch.Partitions = append(tmatch.Partitions, pmatch) |
| |
| foundPartition: |
| pmatch.Leader = brokerID |
| pmatch.Replicas = replicas |
| if pmatch.Replicas == nil { |
| pmatch.Replicas = []int32{} |
| } |
| pmatch.Isr = isr |
| if pmatch.Isr == nil { |
| pmatch.Isr = []int32{} |
| } |
| pmatch.OfflineReplicas = offline |
| if pmatch.OfflineReplicas == nil { |
| pmatch.OfflineReplicas = []int32{} |
| } |
| pmatch.Err = err |
| } |