blob: 75e2decefe6f5f9e472e7ceef1fb22092c092e91 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import "time"
4
5type CreatePartitionsRequest struct {
6 Version int16
7 TopicPartitions map[string]*TopicPartition
8 Timeout time.Duration
9 ValidateOnly bool
10}
11
12func (c *CreatePartitionsRequest) setVersion(v int16) {
13 c.Version = v
14}
15
16func (c *CreatePartitionsRequest) encode(pe packetEncoder) error {
17 if err := pe.putArrayLength(len(c.TopicPartitions)); err != nil {
18 return err
19 }
20
21 for topic, partition := range c.TopicPartitions {
22 if err := pe.putString(topic); err != nil {
23 return err
24 }
25 if err := partition.encode(pe); err != nil {
26 return err
27 }
28 }
29
30 pe.putInt32(int32(c.Timeout / time.Millisecond))
31
32 pe.putBool(c.ValidateOnly)
33
34 return nil
35}
36
37func (c *CreatePartitionsRequest) decode(pd packetDecoder, version int16) (err error) {
38 n, err := pd.getArrayLength()
39 if err != nil {
40 return err
41 }
42 c.TopicPartitions = make(map[string]*TopicPartition, n)
43 for i := 0; i < n; i++ {
44 topic, err := pd.getString()
45 if err != nil {
46 return err
47 }
48 c.TopicPartitions[topic] = new(TopicPartition)
49 if err := c.TopicPartitions[topic].decode(pd, version); err != nil {
50 return err
51 }
52 }
53
54 timeout, err := pd.getInt32()
55 if err != nil {
56 return err
57 }
58 c.Timeout = time.Duration(timeout) * time.Millisecond
59
60 if c.ValidateOnly, err = pd.getBool(); err != nil {
61 return err
62 }
63
64 return nil
65}
66
67func (r *CreatePartitionsRequest) key() int16 {
68 return apiKeyCreatePartitions
69}
70
71func (r *CreatePartitionsRequest) version() int16 {
72 return r.Version
73}
74
75func (r *CreatePartitionsRequest) headerVersion() int16 {
76 return 1
77}
78
79func (r *CreatePartitionsRequest) isValidVersion() bool {
80 return r.Version >= 0 && r.Version <= 1
81}
82
83func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion {
84 switch r.Version {
85 case 1:
86 return V2_0_0_0
87 case 0:
88 return V1_0_0_0
89 default:
90 return V2_0_0_0
91 }
92}
93
94type TopicPartition struct {
95 Count int32
96 Assignment [][]int32
97}
98
99func (t *TopicPartition) encode(pe packetEncoder) error {
100 pe.putInt32(t.Count)
101
102 if len(t.Assignment) == 0 {
103 pe.putInt32(-1)
104 return nil
105 }
106
107 if err := pe.putArrayLength(len(t.Assignment)); err != nil {
108 return err
109 }
110
111 for _, assign := range t.Assignment {
112 if err := pe.putInt32Array(assign); err != nil {
113 return err
114 }
115 }
116
117 return nil
118}
119
120func (t *TopicPartition) decode(pd packetDecoder, version int16) (err error) {
121 if t.Count, err = pd.getInt32(); err != nil {
122 return err
123 }
124
125 n, err := pd.getInt32()
126 if err != nil {
127 return err
128 }
129 if n <= 0 {
130 return nil
131 }
132 t.Assignment = make([][]int32, n)
133
134 for i := 0; i < int(n); i++ {
135 if t.Assignment[i], err = pd.getInt32Array(); err != nil {
136 return err
137 }
138 }
139
140 return nil
141}