blob: dcc274dc570ca2f2e342c70a008e91fd01c1355f [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import "time"
4
5type DescribeGroupsResponse struct {
6 // Version defines the protocol version to use for encode and decode
7 Version int16
8 // ThrottleTimeMs contains the duration in milliseconds for which the
9 // request was throttled due to a quota violation, or zero if the request
10 // did not violate any quota.
11 ThrottleTimeMs int32
12 // Groups contains each described group.
13 Groups []*GroupDescription
14}
15
16func (r *DescribeGroupsResponse) setVersion(v int16) {
17 r.Version = v
18}
19
20func (r *DescribeGroupsResponse) encode(pe packetEncoder) (err error) {
21 if r.Version >= 1 {
22 pe.putInt32(r.ThrottleTimeMs)
23 }
24 if err := pe.putArrayLength(len(r.Groups)); err != nil {
25 return err
26 }
27
28 for _, block := range r.Groups {
29 if err := block.encode(pe, r.Version); err != nil {
30 return err
31 }
32 }
33
34 pe.putEmptyTaggedFieldArray()
35 return nil
36}
37
38func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err error) {
39 r.Version = version
40 if r.Version >= 1 {
41 if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
42 return err
43 }
44 }
45 if numGroups, err := pd.getArrayLength(); err != nil {
46 return err
47 } else if numGroups > 0 {
48 r.Groups = make([]*GroupDescription, numGroups)
49 for i := 0; i < numGroups; i++ {
50 block := &GroupDescription{}
51 if err := block.decode(pd, r.Version); err != nil {
52 return err
53 }
54 r.Groups[i] = block
55 }
56 }
57
58 _, err = pd.getEmptyTaggedFieldArray()
59 return err
60}
61
62func (r *DescribeGroupsResponse) key() int16 {
63 return apiKeyDescribeGroups
64}
65
66func (r *DescribeGroupsResponse) version() int16 {
67 return r.Version
68}
69
70func (r *DescribeGroupsResponse) headerVersion() int16 {
71 if r.Version >= 5 {
72 return 1
73 }
74 return 0
75}
76
77func (r *DescribeGroupsResponse) isValidVersion() bool {
78 return r.Version >= 0 && r.Version <= 5
79}
80
81func (r *DescribeGroupsResponse) isFlexible() bool {
82 return r.isFlexibleVersion(r.Version)
83}
84
85func (r *DescribeGroupsResponse) isFlexibleVersion(version int16) bool {
86 return version >= 5
87}
88
89func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
90 switch r.Version {
91 case 5:
92 return V2_4_0_0
93 case 4:
94 return V2_4_0_0
95 case 3:
96 return V2_3_0_0
97 case 2:
98 return V2_0_0_0
99 case 1:
100 return V0_11_0_0
101 case 0:
102 return V0_9_0_0
103 default:
104 return V2_4_0_0
105 }
106}
107
108func (r *DescribeGroupsResponse) throttleTime() time.Duration {
109 return time.Duration(r.ThrottleTimeMs) * time.Millisecond
110}
111
112// GroupDescription contains each described group.
113type GroupDescription struct {
114 // Version defines the protocol version to use for encode and decode
115 Version int16
116 // Err contains the describe error as the KError type.
117 Err KError
118 // ErrorCode contains the describe error, or 0 if there was no error.
119 ErrorCode int16
120 // GroupId contains the group ID string.
121 GroupId string
122 // State contains the group state string, or the empty string.
123 State string
124 // ProtocolType contains the group protocol type, or the empty string.
125 ProtocolType string
126 // Protocol contains the group protocol data, or the empty string.
127 Protocol string
128 // Members contains the group members.
129 Members map[string]*GroupMemberDescription
130 // AuthorizedOperations contains a 32-bit bitfield to represent authorized
131 // operations for this group.
132 AuthorizedOperations int32
133}
134
135func (gd *GroupDescription) encode(pe packetEncoder, version int16) (err error) {
136 gd.Version = version
137 pe.putInt16(gd.ErrorCode)
138
139 if err := pe.putString(gd.GroupId); err != nil {
140 return err
141 }
142 if err := pe.putString(gd.State); err != nil {
143 return err
144 }
145 if err := pe.putString(gd.ProtocolType); err != nil {
146 return err
147 }
148 if err := pe.putString(gd.Protocol); err != nil {
149 return err
150 }
151
152 if err := pe.putArrayLength(len(gd.Members)); err != nil {
153 return err
154 }
155
156 for _, block := range gd.Members {
157 if err := block.encode(pe, gd.Version); err != nil {
158 return err
159 }
160 }
161
162 if gd.Version >= 3 {
163 pe.putInt32(gd.AuthorizedOperations)
164 }
165
166 pe.putEmptyTaggedFieldArray()
167 return nil
168}
169
170func (gd *GroupDescription) decode(pd packetDecoder, version int16) (err error) {
171 gd.Version = version
172 if gd.ErrorCode, err = pd.getInt16(); err != nil {
173 return err
174 }
175
176 gd.Err = KError(gd.ErrorCode)
177
178 if gd.GroupId, err = pd.getString(); err != nil {
179 return err
180 }
181 if gd.State, err = pd.getString(); err != nil {
182 return err
183 }
184 if gd.ProtocolType, err = pd.getString(); err != nil {
185 return err
186 }
187 if gd.Protocol, err = pd.getString(); err != nil {
188 return err
189 }
190
191 if numMembers, err := pd.getArrayLength(); err != nil {
192 return err
193 } else if numMembers > 0 {
194 gd.Members = make(map[string]*GroupMemberDescription, numMembers)
195 for i := 0; i < numMembers; i++ {
196 block := &GroupMemberDescription{}
197 if err := block.decode(pd, gd.Version); err != nil {
198 return err
199 }
200 gd.Members[block.MemberId] = block
201 }
202 }
203
204 if gd.Version >= 3 {
205 if gd.AuthorizedOperations, err = pd.getInt32(); err != nil {
206 return err
207 }
208 }
209
210 _, err = pd.getEmptyTaggedFieldArray()
211 return err
212}
213
214// GroupMemberDescription contains the group members.
215type GroupMemberDescription struct {
216 // Version defines the protocol version to use for encode and decode
217 Version int16
218 // MemberId contains the member ID assigned by the group coordinator.
219 MemberId string
220 // GroupInstanceId contains the unique identifier of the consumer instance
221 // provided by end user.
222 GroupInstanceId *string
223 // ClientId contains the client ID used in the member's latest join group
224 // request.
225 ClientId string
226 // ClientHost contains the client host.
227 ClientHost string
228 // MemberMetadata contains the metadata corresponding to the current group
229 // protocol in use.
230 MemberMetadata []byte
231 // MemberAssignment contains the current assignment provided by the group
232 // leader.
233 MemberAssignment []byte
234}
235
236func (gmd *GroupMemberDescription) encode(pe packetEncoder, version int16) (err error) {
237 gmd.Version = version
238 if err := pe.putString(gmd.MemberId); err != nil {
239 return err
240 }
241 if gmd.Version >= 4 {
242 if err := pe.putNullableString(gmd.GroupInstanceId); err != nil {
243 return err
244 }
245 }
246 if err := pe.putString(gmd.ClientId); err != nil {
247 return err
248 }
249 if err := pe.putString(gmd.ClientHost); err != nil {
250 return err
251 }
252 if err := pe.putBytes(gmd.MemberMetadata); err != nil {
253 return err
254 }
255 if err := pe.putBytes(gmd.MemberAssignment); err != nil {
256 return err
257 }
258
259 pe.putEmptyTaggedFieldArray()
260 return nil
261}
262
263func (gmd *GroupMemberDescription) decode(pd packetDecoder, version int16) (err error) {
264 gmd.Version = version
265 if gmd.MemberId, err = pd.getString(); err != nil {
266 return err
267 }
268 if gmd.Version >= 4 {
269 if gmd.GroupInstanceId, err = pd.getNullableString(); err != nil {
270 return err
271 }
272 }
273 if gmd.ClientId, err = pd.getString(); err != nil {
274 return err
275 }
276 if gmd.ClientHost, err = pd.getString(); err != nil {
277 return err
278 }
279 if gmd.MemberMetadata, err = pd.getBytes(); err != nil {
280 return err
281 }
282 if gmd.MemberAssignment, err = pd.getBytes(); err != nil {
283 return err
284 }
285
286 _, err = pd.getEmptyTaggedFieldArray()
287 return err
288}
289
290func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
291 if len(gmd.MemberAssignment) == 0 {
292 return nil, nil
293 }
294 assignment := new(ConsumerGroupMemberAssignment)
295 err := decode(gmd.MemberAssignment, assignment, nil)
296 return assignment, err
297}
298
299func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) {
300 if len(gmd.MemberMetadata) == 0 {
301 return nil, nil
302 }
303 metadata := new(ConsumerGroupMemberMetadata)
304 err := decode(gmd.MemberMetadata, metadata, nil)
305 return metadata, err
306}