blob: f00e5ab3a5de21b84b212f1af85eec32b3ee7c74 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import "encoding/base64"
4
5type Uuid [16]byte
6
7func (u Uuid) String() string {
8 return base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString(u[:])
9}
10
11var NullUUID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
12
13type MetadataRequest struct {
14 // Version defines the protocol version to use for encode and decode
15 Version int16
16 // Topics contains the topics to fetch metadata for.
17 Topics []string
18 // AllowAutoTopicCreation contains a If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so.
19 AllowAutoTopicCreation bool
20 IncludeClusterAuthorizedOperations bool // version 8 and up
21 IncludeTopicAuthorizedOperations bool // version 8 and up
22}
23
24func (r *MetadataRequest) setVersion(v int16) {
25 r.Version = v
26}
27
28func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest {
29 m := &MetadataRequest{Topics: topics}
30 if version.IsAtLeast(V2_8_0_0) {
31 m.Version = 10
32 } else if version.IsAtLeast(V2_4_0_0) {
33 m.Version = 9
34 } else if version.IsAtLeast(V2_4_0_0) {
35 m.Version = 8
36 } else if version.IsAtLeast(V2_1_0_0) {
37 m.Version = 7
38 } else if version.IsAtLeast(V2_0_0_0) {
39 m.Version = 6
40 } else if version.IsAtLeast(V1_0_0_0) {
41 m.Version = 5
42 } else if version.IsAtLeast(V0_11_0_0) {
43 m.Version = 4
44 } else if version.IsAtLeast(V0_10_1_0) {
45 m.Version = 2
46 } else if version.IsAtLeast(V0_10_0_0) {
47 m.Version = 1
48 }
49 return m
50}
51
52func (r *MetadataRequest) encode(pe packetEncoder) (err error) {
53 if r.Version < 0 || r.Version > 10 {
54 return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
55 }
56 if r.Version == 0 || len(r.Topics) > 0 {
57 if err := pe.putArrayLength(len(r.Topics)); err != nil {
58 return err
59 }
60 if r.Version <= 9 {
61 for _, topicName := range r.Topics {
62 if err := pe.putString(topicName); err != nil {
63 return err
64 }
65 pe.putEmptyTaggedFieldArray()
66 }
67 } else { // r.Version = 10
68 for _, topicName := range r.Topics {
69 if err := pe.putRawBytes(NullUUID); err != nil {
70 return err
71 }
72 // Avoid implicit memory aliasing in for loop
73 tn := topicName
74 if err := pe.putNullableString(&tn); err != nil {
75 return err
76 }
77 pe.putEmptyTaggedFieldArray()
78 }
79 }
80 } else {
81 if err := pe.putArrayLength(-1); err != nil {
82 return err
83 }
84 }
85
86 if r.Version > 3 {
87 pe.putBool(r.AllowAutoTopicCreation)
88 }
89 if r.Version > 7 {
90 pe.putBool(r.IncludeClusterAuthorizedOperations)
91 pe.putBool(r.IncludeTopicAuthorizedOperations)
92 }
93 pe.putEmptyTaggedFieldArray()
94 return nil
95}
96
97func (r *MetadataRequest) decode(pd packetDecoder, version int16) (err error) {
98 r.Version = version
99 size, err := pd.getArrayLength()
100 if err != nil {
101 return err
102 }
103 if size > 0 {
104 r.Topics = make([]string, size)
105 }
106 if version <= 9 {
107 for i := range r.Topics {
108 topic, err := pd.getString()
109 if err != nil {
110 return err
111 }
112 r.Topics[i] = topic
113 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
114 return err
115 }
116 }
117 } else {
118 for i := range r.Topics {
119 if _, err = pd.getRawBytes(16); err != nil { // skip UUID
120 return err
121 }
122 topic, err := pd.getNullableString()
123 if err != nil {
124 return err
125 }
126 if topic != nil {
127 r.Topics[i] = *topic
128 }
129
130 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
131 return err
132 }
133 }
134 }
135
136 if r.Version >= 4 {
137 if r.AllowAutoTopicCreation, err = pd.getBool(); err != nil {
138 return err
139 }
140 }
141
142 if r.Version > 7 {
143 includeClusterAuthz, err := pd.getBool()
144 if err != nil {
145 return err
146 }
147 r.IncludeClusterAuthorizedOperations = includeClusterAuthz
148 includeTopicAuthz, err := pd.getBool()
149 if err != nil {
150 return err
151 }
152 r.IncludeTopicAuthorizedOperations = includeTopicAuthz
153 }
154
155 _, err = pd.getEmptyTaggedFieldArray()
156 return err
157}
158
159func (r *MetadataRequest) key() int16 {
160 return apiKeyMetadata
161}
162
163func (r *MetadataRequest) version() int16 {
164 return r.Version
165}
166
167func (r *MetadataRequest) headerVersion() int16 {
168 if r.Version >= 9 {
169 return 2
170 }
171 return 1
172}
173
174func (r *MetadataRequest) isValidVersion() bool {
175 return r.Version >= 0 && r.Version <= 10
176}
177
178func (r *MetadataRequest) isFlexible() bool {
179 return r.isFlexibleVersion(r.Version)
180}
181
182func (r *MetadataRequest) isFlexibleVersion(version int16) bool {
183 return version >= 9
184}
185
186func (r *MetadataRequest) requiredVersion() KafkaVersion {
187 switch r.Version {
188 case 10:
189 return V2_8_0_0
190 case 9:
191 return V2_4_0_0
192 case 8:
193 return V2_3_0_0
194 case 7:
195 return V2_1_0_0
196 case 6:
197 return V2_0_0_0
198 case 5:
199 return V1_0_0_0
200 case 3, 4:
201 return V0_11_0_0
202 case 2:
203 return V0_10_1_0
204 case 1:
205 return V0_10_0_0
206 case 0:
207 return V0_8_2_0
208 default:
209 return V2_8_0_0
210 }
211}