blob: c702f0253022d517c8f32d65e08066324029f39f [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8type ConfigSource int8
9
10func (s ConfigSource) String() string {
11 switch s {
12 case SourceUnknown:
13 return "Unknown"
14 case SourceTopic:
15 return "Topic"
16 case SourceDynamicBroker:
17 return "DynamicBroker"
18 case SourceDynamicDefaultBroker:
19 return "DynamicDefaultBroker"
20 case SourceStaticBroker:
21 return "StaticBroker"
22 case SourceDefault:
23 return "Default"
24 }
25 return fmt.Sprintf("Source Invalid: %d", int(s))
26}
27
28const (
29 SourceUnknown ConfigSource = iota
30 SourceTopic
31 SourceDynamicBroker
32 SourceDynamicDefaultBroker
33 SourceStaticBroker
34 SourceDefault
35)
36
37type DescribeConfigError struct {
38 Err KError
39 ErrMsg string
40}
41
42func (c *DescribeConfigError) Error() string {
43 text := c.Err.Error()
44 if c.ErrMsg != "" {
45 text = fmt.Sprintf("%s - %s", text, c.ErrMsg)
46 }
47 return text
48}
49
50type DescribeConfigsResponse struct {
51 Version int16
52 ThrottleTime time.Duration
53 Resources []*ResourceResponse
54}
55
56func (r *DescribeConfigsResponse) setVersion(v int16) {
57 r.Version = v
58}
59
60type ResourceResponse struct {
61 ErrorCode int16
62 ErrorMsg string
63 Type ConfigResourceType
64 Name string
65 Configs []*ConfigEntry
66}
67
68type ConfigEntry struct {
69 Name string
70 Value string
71 ReadOnly bool
72 Default bool
73 Source ConfigSource
74 Sensitive bool
75 Synonyms []*ConfigSynonym
76}
77
78type ConfigSynonym struct {
79 ConfigName string
80 ConfigValue string
81 Source ConfigSource
82}
83
84func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
85 pe.putDurationMs(r.ThrottleTime)
86 if err = pe.putArrayLength(len(r.Resources)); err != nil {
87 return err
88 }
89
90 for _, c := range r.Resources {
91 if err = c.encode(pe, r.Version); err != nil {
92 return err
93 }
94 }
95
96 return nil
97}
98
99func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
100 r.Version = version
101 if r.ThrottleTime, err = pd.getDurationMs(); err != nil {
102 return err
103 }
104
105 n, err := pd.getArrayLength()
106 if err != nil {
107 return err
108 }
109
110 r.Resources = make([]*ResourceResponse, n)
111 for i := 0; i < n; i++ {
112 rr := &ResourceResponse{}
113 if err := rr.decode(pd, version); err != nil {
114 return err
115 }
116 r.Resources[i] = rr
117 }
118
119 return nil
120}
121
122func (r *DescribeConfigsResponse) key() int16 {
123 return apiKeyDescribeConfigs
124}
125
126func (r *DescribeConfigsResponse) version() int16 {
127 return r.Version
128}
129
130func (r *DescribeConfigsResponse) headerVersion() int16 {
131 return 0
132}
133
134func (r *DescribeConfigsResponse) isValidVersion() bool {
135 return r.Version >= 0 && r.Version <= 2
136}
137
138func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
139 switch r.Version {
140 case 2:
141 return V2_0_0_0
142 case 1:
143 return V1_1_0_0
144 case 0:
145 return V0_11_0_0
146 default:
147 return V2_0_0_0
148 }
149}
150
151func (r *DescribeConfigsResponse) throttleTime() time.Duration {
152 return r.ThrottleTime
153}
154
155func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
156 pe.putInt16(r.ErrorCode)
157
158 if err = pe.putString(r.ErrorMsg); err != nil {
159 return err
160 }
161
162 pe.putInt8(int8(r.Type))
163
164 if err = pe.putString(r.Name); err != nil {
165 return err
166 }
167
168 if err = pe.putArrayLength(len(r.Configs)); err != nil {
169 return err
170 }
171
172 for _, c := range r.Configs {
173 if err = c.encode(pe, version); err != nil {
174 return err
175 }
176 }
177 return nil
178}
179
180func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
181 ec, err := pd.getInt16()
182 if err != nil {
183 return err
184 }
185 r.ErrorCode = ec
186
187 em, err := pd.getString()
188 if err != nil {
189 return err
190 }
191 r.ErrorMsg = em
192
193 t, err := pd.getInt8()
194 if err != nil {
195 return err
196 }
197 r.Type = ConfigResourceType(t)
198
199 name, err := pd.getString()
200 if err != nil {
201 return err
202 }
203 r.Name = name
204
205 n, err := pd.getArrayLength()
206 if err != nil {
207 return err
208 }
209
210 r.Configs = make([]*ConfigEntry, n)
211 for i := 0; i < n; i++ {
212 c := &ConfigEntry{}
213 if err := c.decode(pd, version); err != nil {
214 return err
215 }
216 r.Configs[i] = c
217 }
218 return nil
219}
220
221func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {
222 if err = pe.putString(r.Name); err != nil {
223 return err
224 }
225
226 if err = pe.putString(r.Value); err != nil {
227 return err
228 }
229
230 pe.putBool(r.ReadOnly)
231
232 if version <= 0 {
233 pe.putBool(r.Default)
234 pe.putBool(r.Sensitive)
235 } else {
236 pe.putInt8(int8(r.Source))
237 pe.putBool(r.Sensitive)
238
239 if err := pe.putArrayLength(len(r.Synonyms)); err != nil {
240 return err
241 }
242 for _, c := range r.Synonyms {
243 if err = c.encode(pe, version); err != nil {
244 return err
245 }
246 }
247 }
248
249 return nil
250}
251
252// https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
253func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
254 if version == 0 {
255 r.Source = SourceUnknown
256 }
257 name, err := pd.getString()
258 if err != nil {
259 return err
260 }
261 r.Name = name
262
263 value, err := pd.getString()
264 if err != nil {
265 return err
266 }
267 r.Value = value
268
269 read, err := pd.getBool()
270 if err != nil {
271 return err
272 }
273 r.ReadOnly = read
274
275 if version == 0 {
276 defaultB, err := pd.getBool()
277 if err != nil {
278 return err
279 }
280 r.Default = defaultB
281 if defaultB {
282 r.Source = SourceDefault
283 }
284 } else {
285 source, err := pd.getInt8()
286 if err != nil {
287 return err
288 }
289 r.Source = ConfigSource(source)
290 r.Default = r.Source == SourceDefault
291 }
292
293 sensitive, err := pd.getBool()
294 if err != nil {
295 return err
296 }
297 r.Sensitive = sensitive
298
299 if version > 0 {
300 n, err := pd.getArrayLength()
301 if err != nil {
302 return err
303 }
304 r.Synonyms = make([]*ConfigSynonym, n)
305
306 for i := 0; i < n; i++ {
307 s := &ConfigSynonym{}
308 if err := s.decode(pd, version); err != nil {
309 return err
310 }
311 r.Synonyms[i] = s
312 }
313 }
314 return nil
315}
316
317func (c *ConfigSynonym) encode(pe packetEncoder, version int16) (err error) {
318 err = pe.putString(c.ConfigName)
319 if err != nil {
320 return err
321 }
322
323 err = pe.putString(c.ConfigValue)
324 if err != nil {
325 return err
326 }
327
328 pe.putInt8(int8(c.Source))
329
330 return nil
331}
332
333func (c *ConfigSynonym) decode(pd packetDecoder, version int16) error {
334 name, err := pd.getString()
335 if err != nil {
336 return err
337 }
338 c.ConfigName = name
339
340 value, err := pd.getString()
341 if err != nil {
342 return err
343 }
344 c.ConfigValue = value
345
346 source, err := pd.getInt8()
347 if err != nil {
348 return err
349 }
350 c.Source = ConfigSource(source)
351 return nil
352}