blob: 5684e2718a5ce664e01171ccfcaf6e858e242a42 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import (
4 "time"
5)
6
7type CreateTopicsRequest struct {
8 // Version defines the protocol version to use for encode and decode
9 Version int16
10 // TopicDetails contains the topics to create.
11 TopicDetails map[string]*TopicDetail
12 // Timeout contains how long to wait before timing out the request.
13 Timeout time.Duration
14 // ValidateOnly if true, check that the topics can be created as specified,
15 // but don't create anything.
16 ValidateOnly bool
17}
18
19func (c *CreateTopicsRequest) setVersion(v int16) {
20 c.Version = v
21}
22
23func NewCreateTopicsRequest(
24 version KafkaVersion,
25 topicDetails map[string]*TopicDetail,
26 timeout time.Duration,
27 validateOnly bool,
28) *CreateTopicsRequest {
29 r := &CreateTopicsRequest{
30 TopicDetails: topicDetails,
31 Timeout: timeout,
32 ValidateOnly: validateOnly,
33 }
34 switch {
35 case version.IsAtLeast(V2_4_0_0):
36 // Version 5 is the first flexible version
37 // Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464)
38 r.Version = 5
39 case version.IsAtLeast(V2_0_0_0):
40 // Version 3 is the same as version 2 (brokers response before throttling)
41 r.Version = 3
42 case version.IsAtLeast(V0_11_0_0):
43 // Version 2 is the same as version 1 (response has ThrottleTime)
44 r.Version = 2
45 case version.IsAtLeast(V0_10_2_0):
46 // Version 1 adds validateOnly.
47 r.Version = 1
48 }
49 return r
50}
51
52func (c *CreateTopicsRequest) encode(pe packetEncoder) error {
53 if err := pe.putArrayLength(len(c.TopicDetails)); err != nil {
54 return err
55 }
56 for topic, detail := range c.TopicDetails {
57 if err := pe.putString(topic); err != nil {
58 return err
59 }
60 if err := detail.encode(pe); err != nil {
61 return err
62 }
63 }
64
65 pe.putInt32(int32(c.Timeout / time.Millisecond))
66
67 if c.Version >= 1 {
68 pe.putBool(c.ValidateOnly)
69 }
70
71 pe.putEmptyTaggedFieldArray()
72 return nil
73}
74
75func (c *CreateTopicsRequest) decode(pd packetDecoder, version int16) (err error) {
76 n, err := pd.getArrayLength()
77 if err != nil {
78 return err
79 }
80
81 c.TopicDetails = make(map[string]*TopicDetail, n)
82
83 for i := 0; i < n; i++ {
84 topic, err := pd.getString()
85 if err != nil {
86 return err
87 }
88 c.TopicDetails[topic] = new(TopicDetail)
89 if err = c.TopicDetails[topic].decode(pd, version); err != nil {
90 return err
91 }
92 }
93
94 timeout, err := pd.getInt32()
95 if err != nil {
96 return err
97 }
98 c.Timeout = time.Duration(timeout) * time.Millisecond
99
100 if version >= 1 {
101 c.ValidateOnly, err = pd.getBool()
102 if err != nil {
103 return err
104 }
105
106 c.Version = version
107 }
108 _, err = pd.getEmptyTaggedFieldArray()
109 return err
110}
111
112func (c *CreateTopicsRequest) key() int16 {
113 return apiKeyCreateTopics
114}
115
116func (c *CreateTopicsRequest) version() int16 {
117 return c.Version
118}
119
120func (c *CreateTopicsRequest) headerVersion() int16 {
121 if c.Version >= 5 {
122 return 2
123 }
124 return 1
125}
126
127func (c *CreateTopicsRequest) isFlexible() bool {
128 return c.isFlexibleVersion(c.Version)
129}
130
131func (c *CreateTopicsRequest) isFlexibleVersion(version int16) bool {
132 return version >= 5
133}
134
135func (c *CreateTopicsRequest) isValidVersion() bool {
136 return c.Version >= 0 && c.Version <= 5
137}
138
139func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
140 switch c.Version {
141 case 5:
142 return V2_4_0_0
143 case 4:
144 return V2_4_0_0
145 case 3:
146 return V2_0_0_0
147 case 2:
148 return V0_11_0_0
149 case 1:
150 return V0_10_2_0
151 case 0:
152 return V0_10_1_0
153 default:
154 return V2_8_0_0
155 }
156}
157
158type TopicDetail struct {
159 // NumPartitions contains the number of partitions to create in the topic, or
160 // -1 if we are either specifying a manual partition assignment or using the
161 // default partitions.
162 NumPartitions int32
163 // ReplicationFactor contains the number of replicas to create for each
164 // partition in the topic, or -1 if we are either specifying a manual
165 // partition assignment or using the default replication factor.
166 ReplicationFactor int16
167 // ReplicaAssignment contains the manual partition assignment, or the empty
168 // array if we are using automatic assignment.
169 ReplicaAssignment map[int32][]int32
170 // ConfigEntries contains the custom topic configurations to set.
171 ConfigEntries map[string]*string
172}
173
174func (t *TopicDetail) encode(pe packetEncoder) error {
175 pe.putInt32(t.NumPartitions)
176 pe.putInt16(t.ReplicationFactor)
177
178 if err := pe.putArrayLength(len(t.ReplicaAssignment)); err != nil {
179 return err
180 }
181 for partition, assignment := range t.ReplicaAssignment {
182 pe.putInt32(partition)
183 if err := pe.putInt32Array(assignment); err != nil {
184 return err
185 }
186 pe.putEmptyTaggedFieldArray()
187 }
188
189 if err := pe.putArrayLength(len(t.ConfigEntries)); err != nil {
190 return err
191 }
192 for configKey, configValue := range t.ConfigEntries {
193 if err := pe.putString(configKey); err != nil {
194 return err
195 }
196 if err := pe.putNullableString(configValue); err != nil {
197 return err
198 }
199 pe.putEmptyTaggedFieldArray()
200 }
201
202 pe.putEmptyTaggedFieldArray()
203 return nil
204}
205
206func (t *TopicDetail) decode(pd packetDecoder, version int16) (err error) {
207 if t.NumPartitions, err = pd.getInt32(); err != nil {
208 return err
209 }
210 if t.ReplicationFactor, err = pd.getInt16(); err != nil {
211 return err
212 }
213
214 n, err := pd.getArrayLength()
215 if err != nil {
216 return err
217 }
218
219 if n > 0 {
220 t.ReplicaAssignment = make(map[int32][]int32, n)
221 for i := 0; i < n; i++ {
222 replica, err := pd.getInt32()
223 if err != nil {
224 return err
225 }
226 if t.ReplicaAssignment[replica], err = pd.getInt32Array(); err != nil {
227 return err
228 }
229 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
230 return err
231 }
232 }
233 }
234
235 n, err = pd.getArrayLength()
236 if err != nil {
237 return err
238 }
239
240 if n > 0 {
241 t.ConfigEntries = make(map[string]*string, n)
242 for i := 0; i < n; i++ {
243 configKey, err := pd.getString()
244 if err != nil {
245 return err
246 }
247 if t.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
248 return err
249 }
250 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
251 return err
252 }
253 }
254 }
255
256 _, err = pd.getEmptyTaggedFieldArray()
257 return err
258}