blob: 196c075c5babd29983045bbe4ec7682429cd8d81 [file] [log] [blame]
package sarama
import "time"
// PartitionMetadata contains each partition in the topic.
type PartitionMetadata struct {
// Version defines the protocol version to use for encode and decode
Version int16
// Err contains the partition error, or 0 if there was no error.
Err KError
// ID contains the partition index.
ID int32
// Leader contains the ID of the leader broker.
Leader int32
// LeaderEpoch contains the leader epoch of this partition.
LeaderEpoch int32
// Replicas contains the set of all nodes that host this partition.
Replicas []int32
// Isr contains the set of nodes that are in sync with the leader for this partition.
Isr []int32
// OfflineReplicas contains the set of offline replicas of this partition.
OfflineReplicas []int32
}
func (p *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) {
p.Version = version
p.Err, err = pd.getKError()
if err != nil {
return err
}
if p.ID, err = pd.getInt32(); err != nil {
return err
}
if p.Leader, err = pd.getInt32(); err != nil {
return err
}
if p.Version >= 7 {
if p.LeaderEpoch, err = pd.getInt32(); err != nil {
return err
}
}
p.Replicas, err = pd.getInt32Array()
if err != nil {
return err
}
p.Isr, err = pd.getInt32Array()
if err != nil {
return err
}
if p.Version >= 5 {
p.OfflineReplicas, err = pd.getInt32Array()
if err != nil {
return err
}
}
_, err = pd.getEmptyTaggedFieldArray()
return err
}
func (p *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) {
p.Version = version
pe.putKError(p.Err)
pe.putInt32(p.ID)
pe.putInt32(p.Leader)
if p.Version >= 7 {
pe.putInt32(p.LeaderEpoch)
}
err = pe.putInt32Array(p.Replicas)
if err != nil {
return err
}
err = pe.putInt32Array(p.Isr)
if err != nil {
return err
}
if p.Version >= 5 {
err = pe.putInt32Array(p.OfflineReplicas)
if err != nil {
return err
}
}
pe.putEmptyTaggedFieldArray()
return nil
}
// TopicMetadata contains each topic in the response.
type TopicMetadata struct {
// Version defines the protocol version to use for encode and decode
Version int16
// Err contains the topic error, or 0 if there was no error.
Err KError
// Name contains the topic name.
Name string
Uuid Uuid
// IsInternal contains a True if the topic is internal.
IsInternal bool
// Partitions contains each partition in the topic.
Partitions []*PartitionMetadata
TopicAuthorizedOperations int32 // Only valid for Version >= 8
}
func (t *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
t.Version = version
t.Err, err = pd.getKError()
if err != nil {
return err
}
t.Name, err = pd.getString()
if err != nil {
return err
}
if t.Version >= 10 {
uuid, err := pd.getRawBytes(16)
if err != nil {
return err
}
t.Uuid = [16]byte{}
for i := 0; i < 16; i++ {
t.Uuid[i] = uuid[i]
}
}
if t.Version >= 1 {
t.IsInternal, err = pd.getBool()
if err != nil {
return err
}
}
n, err := pd.getArrayLength()
if err != nil {
return err
}
t.Partitions = make([]*PartitionMetadata, n)
for i := 0; i < n; i++ {
block := &PartitionMetadata{}
if err := block.decode(pd, t.Version); err != nil {
return err
}
t.Partitions[i] = block
}
if t.Version >= 8 {
t.TopicAuthorizedOperations, err = pd.getInt32()
if err != nil {
return err
}
}
_, err = pd.getEmptyTaggedFieldArray()
return err
}
func (t *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
t.Version = version
pe.putKError(t.Err)
err = pe.putString(t.Name)
if err != nil {
return err
}
if t.Version >= 10 {
err = pe.putRawBytes(t.Uuid[:])
if err != nil {
return err
}
}
if t.Version >= 1 {
pe.putBool(t.IsInternal)
}
err = pe.putArrayLength(len(t.Partitions))
if err != nil {
return err
}
for _, block := range t.Partitions {
if err := block.encode(pe, t.Version); err != nil {
return err
}
}
if t.Version >= 8 {
pe.putInt32(t.TopicAuthorizedOperations)
}
pe.putEmptyTaggedFieldArray()
return nil
}
type MetadataResponse struct {
// Version defines the protocol version to use for encode and decode
Version int16
// ThrottleTimeMs contains the duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
ThrottleTimeMs int32
// Brokers contains each broker in the response.
Brokers []*Broker
// ClusterID contains the cluster ID that responding broker belongs to.
ClusterID *string
// ControllerID contains the ID of the controller broker.
ControllerID int32
// Topics contains each topic in the response.
Topics []*TopicMetadata
ClusterAuthorizedOperations int32 // Only valid for Version >= 8
}
func (r *MetadataResponse) setVersion(v int16) {
r.Version = v
}
func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.Version >= 3 {
if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
return err
}
}
brokerArrayLen, err := pd.getArrayLength()
if err != nil {
return err
}
r.Brokers = make([]*Broker, brokerArrayLen)
for i := 0; i < brokerArrayLen; i++ {
r.Brokers[i] = new(Broker)
err = r.Brokers[i].decode(pd, version)
if err != nil {
return err
}
}
if r.Version >= 2 {
r.ClusterID, err = pd.getNullableString()
if err != nil {
return err
}
}
if r.Version >= 1 {
if r.ControllerID, err = pd.getInt32(); err != nil {
return err
}
}
topicArrayLen, err := pd.getArrayLength()
if err != nil {
return err
}
r.Topics = make([]*TopicMetadata, topicArrayLen)
for i := 0; i < topicArrayLen; i++ {
r.Topics[i] = new(TopicMetadata)
err = r.Topics[i].decode(pd, version)
if err != nil {
return err
}
}
if r.Version >= 8 {
r.ClusterAuthorizedOperations, err = pd.getInt32()
if err != nil {
return err
}
}
_, err = pd.getEmptyTaggedFieldArray()
return err
}
func (r *MetadataResponse) encode(pe packetEncoder) (err error) {
if r.Version >= 3 {
pe.putInt32(r.ThrottleTimeMs)
}
err = pe.putArrayLength(len(r.Brokers))
if err != nil {
return err
}
for _, broker := range r.Brokers {
err = broker.encode(pe, r.Version)
if err != nil {
return err
}
}
if r.Version >= 2 {
err = pe.putNullableString(r.ClusterID)
if err != nil {
return err
}
}
if r.Version >= 1 {
pe.putInt32(r.ControllerID)
}
err = pe.putArrayLength(len(r.Topics))
if err != nil {
return err
}
for _, block := range r.Topics {
if err := block.encode(pe, r.Version); err != nil {
return err
}
}
if r.Version >= 8 {
pe.putInt32(r.ClusterAuthorizedOperations)
}
pe.putEmptyTaggedFieldArray()
return nil
}
func (r *MetadataResponse) key() int16 {
return apiKeyMetadata
}
func (r *MetadataResponse) version() int16 {
return r.Version
}
func (r *MetadataResponse) headerVersion() int16 {
if r.Version < 9 {
return 0
} else {
return 1
}
}
func (r *MetadataResponse) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 10
}
func (r *MetadataResponse) isFlexible() bool {
return r.isFlexibleVersion(r.Version)
}
func (r *MetadataResponse) isFlexibleVersion(version int16) bool {
return version >= 9
}
func (r *MetadataResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 10:
return V2_8_0_0
case 9:
return V2_4_0_0
case 8:
return V2_3_0_0
case 7:
return V2_1_0_0
case 6:
return V2_0_0_0
case 5:
return V1_0_0_0
case 3, 4:
return V0_11_0_0
case 2:
return V0_10_1_0
case 1:
return V0_10_0_0
case 0:
return V0_8_2_0
default:
return V2_8_0_0
}
}
func (r *MetadataResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTimeMs) * time.Millisecond
}
// testing API
func (r *MetadataResponse) AddBroker(addr string, id int32) {
r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr})
}
func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
var tmatch *TopicMetadata
for _, tm := range r.Topics {
if tm.Name == topic {
tmatch = tm
goto foundTopic
}
}
tmatch = new(TopicMetadata)
tmatch.Name = topic
r.Topics = append(r.Topics, tmatch)
foundTopic:
tmatch.Err = err
return tmatch
}
func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) {
tmatch := r.AddTopic(topic, ErrNoError)
var pmatch *PartitionMetadata
for _, pm := range tmatch.Partitions {
if pm.ID == partition {
pmatch = pm
goto foundPartition
}
}
pmatch = new(PartitionMetadata)
pmatch.ID = partition
tmatch.Partitions = append(tmatch.Partitions, pmatch)
foundPartition:
pmatch.Leader = brokerID
pmatch.Replicas = replicas
if pmatch.Replicas == nil {
pmatch.Replicas = []int32{}
}
pmatch.Isr = isr
if pmatch.Isr == nil {
pmatch.Isr = []int32{}
}
pmatch.OfflineReplicas = offline
if pmatch.OfflineReplicas == nil {
pmatch.OfflineReplicas = []int32{}
}
pmatch.Err = err
}