blob: f8be692496a604c5615303c97b880c035eb66db4 [file] [log] [blame]
package sarama
import (
"fmt"
"time"
)
type CreateTopicsResponse struct {
// Version defines the protocol version to use for encode and decode
Version int16
// ThrottleTime contains the duration for which the request was throttled due
// to a quota violation, or zero if the request did not violate any quota.
ThrottleTime time.Duration
// TopicErrors contains a map of any errors for the topics we tried to create.
TopicErrors map[string]*TopicError
// TopicResults contains a map of the results for the topics we tried to create.
TopicResults map[string]*CreatableTopicResult
}
func (c *CreateTopicsResponse) setVersion(v int16) {
c.Version = v
}
func (c *CreateTopicsResponse) encode(pe packetEncoder) error {
if c.Version >= 2 {
pe.putDurationMs(c.ThrottleTime)
}
if err := pe.putArrayLength(len(c.TopicErrors)); err != nil {
return err
}
for topic, topicError := range c.TopicErrors {
if err := pe.putString(topic); err != nil {
return err
}
if err := topicError.encode(pe, c.Version); err != nil {
return err
}
if c.Version >= 5 {
result, ok := c.TopicResults[topic]
if !ok {
return fmt.Errorf("expected TopicResult for topic, %s, for V5 protocol", topic)
}
if err := result.encode(pe, c.Version); err != nil {
return err
}
}
}
pe.putEmptyTaggedFieldArray()
return nil
}
func (c *CreateTopicsResponse) decode(pd packetDecoder, version int16) (err error) {
c.Version = version
if version >= 2 {
if c.ThrottleTime, err = pd.getDurationMs(); err != nil {
return err
}
}
n, err := pd.getArrayLength()
if err != nil {
return err
}
c.TopicErrors = make(map[string]*TopicError, n)
if version >= 5 {
c.TopicResults = make(map[string]*CreatableTopicResult, n)
}
for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
c.TopicErrors[topic] = new(TopicError)
if err := c.TopicErrors[topic].decode(pd, version); err != nil {
return err
}
if version >= 5 {
c.TopicResults[topic] = &CreatableTopicResult{}
if err := c.TopicResults[topic].decode(pd, version); err != nil {
return err
}
}
}
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
return nil
}
func (c *CreateTopicsResponse) key() int16 {
return apiKeyCreateTopics
}
func (c *CreateTopicsResponse) version() int16 {
return c.Version
}
func (c *CreateTopicsResponse) headerVersion() int16 {
if c.Version >= 5 {
return 1
}
return 0
}
func (c *CreateTopicsResponse) isFlexible() bool {
return c.isFlexibleVersion(c.Version)
}
func (c *CreateTopicsResponse) isFlexibleVersion(version int16) bool {
return version >= 5
}
func (c *CreateTopicsResponse) isValidVersion() bool {
return c.Version >= 0 && c.Version <= 5
}
func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
switch c.Version {
case 5:
return V2_4_0_0
case 4:
return V2_4_0_0
case 3:
return V2_0_0_0
case 2:
return V0_11_0_0
case 1:
return V0_10_2_0
case 0:
return V0_10_1_0
default:
return V2_8_0_0
}
}
func (r *CreateTopicsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
type TopicError struct {
Err KError
ErrMsg *string
}
func (t *TopicError) Error() string {
text := t.Err.Error()
if t.ErrMsg != nil {
text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
}
return text
}
func (t *TopicError) Unwrap() error {
return t.Err
}
func (t *TopicError) encode(pe packetEncoder, version int16) error {
pe.putKError(t.Err)
if version >= 1 {
if err := pe.putNullableString(t.ErrMsg); err != nil {
return err
}
}
return nil
}
func (t *TopicError) decode(pd packetDecoder, version int16) (err error) {
t.Err, err = pd.getKError()
if err != nil {
return err
}
if version >= 1 {
if t.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}
}
return nil
}
// CreatableTopicResult struct {
type CreatableTopicResult struct {
// TopicConfigErrorCode contains a Optional topic config error returned if configs are not returned in the response.
TopicConfigErrorCode KError
// NumPartitions contains a Number of partitions of the topic.
NumPartitions int32
// ReplicationFactor contains a Replication factor of the topic.
ReplicationFactor int16
// Configs contains a Configuration of the topic.
Configs map[string]*CreatableTopicConfigs
}
func (r *CreatableTopicResult) encode(pe packetEncoder, version int16) error {
pe.putInt32(r.NumPartitions)
pe.putInt16(r.ReplicationFactor)
if err := pe.putArrayLength(len(r.Configs)); err != nil {
return err
}
for name, config := range r.Configs {
if err := pe.putString(name); err != nil {
return err
}
if err := config.encode(pe, version); err != nil {
return err
}
}
if r.TopicConfigErrorCode == ErrNoError {
pe.putEmptyTaggedFieldArray()
return nil
}
// TODO: refactor to helper for tagged fields
pe.putUVarint(1) // number of tagged fields
pe.putUVarint(0) // tag
pe.putUVarint(2) // value length
pe.putKError(r.TopicConfigErrorCode) // tag value
return nil
}
func (r *CreatableTopicResult) decode(pd packetDecoder, version int16) (err error) {
r.NumPartitions, err = pd.getInt32()
if err != nil {
return err
}
r.ReplicationFactor, err = pd.getInt16()
if err != nil {
return err
}
n, err := pd.getArrayLength()
if err != nil {
return err
}
r.Configs = make(map[string]*CreatableTopicConfigs, n)
for i := 0; i < n; i++ {
name, err := pd.getString()
if err != nil {
return err
}
r.Configs[name] = &CreatableTopicConfigs{}
if err := r.Configs[name].decode(pd, version); err != nil {
return err
}
}
err = pd.getTaggedFieldArray(taggedFieldDecoders{
0: func(pd packetDecoder) error {
r.TopicConfigErrorCode, err = pd.getKError()
if err != nil {
return err
}
return nil
},
})
if err != nil {
return err
}
return nil
}
// CreatableTopicConfigs contains a Configuration of the topic.
type CreatableTopicConfigs struct {
// Value contains the configuration value.
Value *string
// ReadOnly contains a True if the configuration is read-only.
ReadOnly bool
// ConfigSource contains the configuration source.
ConfigSource ConfigSource
// IsSensitive contains a True if this configuration is sensitive.
IsSensitive bool
}
func (c *CreatableTopicConfigs) encode(pe packetEncoder, version int16) (err error) {
if err = pe.putNullableString(c.Value); err != nil {
return err
}
pe.putBool(c.ReadOnly)
pe.putInt8(int8(c.ConfigSource))
pe.putBool(c.IsSensitive)
pe.putEmptyTaggedFieldArray()
return nil
}
func (c *CreatableTopicConfigs) decode(pd packetDecoder, version int16) (err error) {
c.Value, err = pd.getNullableString()
if err != nil {
return err
}
c.ReadOnly, err = pd.getBool()
if err != nil {
return err
}
source, err := pd.getInt8()
if err != nil {
return err
}
c.ConfigSource = ConfigSource(source)
c.IsSensitive, err = pd.getBool()
if err != nil {
return err
}
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
return nil
}