blob: 0785c35473ba86cbf41c6155a2e754a7aaf8d9eb [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8type CreatePartitionsResponse struct {
9 Version int16
10 ThrottleTime time.Duration
11 TopicPartitionErrors map[string]*TopicPartitionError
12}
13
14func (c *CreatePartitionsResponse) setVersion(v int16) {
15 c.Version = v
16}
17
18func (c *CreatePartitionsResponse) encode(pe packetEncoder) error {
19 pe.putDurationMs(c.ThrottleTime)
20 if err := pe.putArrayLength(len(c.TopicPartitionErrors)); err != nil {
21 return err
22 }
23
24 for topic, partitionError := range c.TopicPartitionErrors {
25 if err := pe.putString(topic); err != nil {
26 return err
27 }
28 if err := partitionError.encode(pe); err != nil {
29 return err
30 }
31 }
32
33 return nil
34}
35
36func (c *CreatePartitionsResponse) decode(pd packetDecoder, version int16) (err error) {
37 if c.ThrottleTime, err = pd.getDurationMs(); err != nil {
38 return err
39 }
40
41 n, err := pd.getArrayLength()
42 if err != nil {
43 return err
44 }
45
46 c.TopicPartitionErrors = make(map[string]*TopicPartitionError, n)
47 for i := 0; i < n; i++ {
48 topic, err := pd.getString()
49 if err != nil {
50 return err
51 }
52 c.TopicPartitionErrors[topic] = new(TopicPartitionError)
53 if err := c.TopicPartitionErrors[topic].decode(pd, version); err != nil {
54 return err
55 }
56 }
57
58 return nil
59}
60
61func (r *CreatePartitionsResponse) key() int16 {
62 return apiKeyCreatePartitions
63}
64
65func (r *CreatePartitionsResponse) version() int16 {
66 return r.Version
67}
68
69func (r *CreatePartitionsResponse) headerVersion() int16 {
70 return 0
71}
72
73func (r *CreatePartitionsResponse) isValidVersion() bool {
74 return r.Version >= 0 && r.Version <= 1
75}
76
77func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion {
78 switch r.Version {
79 case 1:
80 return V2_0_0_0
81 case 0:
82 return V1_0_0_0
83 default:
84 return V2_0_0_0
85 }
86}
87
88func (r *CreatePartitionsResponse) throttleTime() time.Duration {
89 return r.ThrottleTime
90}
91
92type TopicPartitionError struct {
93 Err KError
94 ErrMsg *string
95}
96
97func (t *TopicPartitionError) Error() string {
98 text := t.Err.Error()
99 if t.ErrMsg != nil {
100 text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
101 }
102 return text
103}
104
105func (t *TopicPartitionError) Unwrap() error {
106 return t.Err
107}
108
109func (t *TopicPartitionError) encode(pe packetEncoder) error {
110 pe.putKError(t.Err)
111
112 if err := pe.putNullableString(t.ErrMsg); err != nil {
113 return err
114 }
115
116 return nil
117}
118
119func (t *TopicPartitionError) decode(pd packetDecoder, version int16) (err error) {
120 t.Err, err = pd.getKError()
121 if err != nil {
122 return err
123 }
124
125 if t.ErrMsg, err = pd.getNullableString(); err != nil {
126 return err
127 }
128
129 return nil
130}