blob: bdabe0a8a310385b86e9831afb7b374e29d5dfdf [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import "time"
4
5type JoinGroupResponse struct {
6 // Version defines the protocol version to use for encode and decode
7 Version int16
8 // ThrottleTime contains the duration for which the request was throttled due
9 // to a quota violation, or zero if the request did not violate any quota.
10 ThrottleTime int32
11 // Err contains the error code, or 0 if there was no error.
12 Err KError
13 // GenerationId contains the generation ID of the group.
14 GenerationId int32
15 // GroupProtocol contains the group protocol selected by the coordinator.
16 GroupProtocol string
17 // LeaderId contains the leader of the group.
18 LeaderId string
19 // MemberId contains the member ID assigned by the group coordinator.
20 MemberId string
21 // Members contains the per-group-member information.
22 Members []GroupMember
23}
24
25func (r *JoinGroupResponse) setVersion(v int16) {
26 r.Version = v
27}
28
29type GroupMember struct {
30 // MemberId contains the group member ID.
31 MemberId string
32 // GroupInstanceId contains the unique identifier of the consumer instance
33 // provided by end user.
34 GroupInstanceId *string
35 // Metadata contains the group member metadata.
36 Metadata []byte
37}
38
39func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) {
40 members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members))
41 for _, member := range r.Members {
42 meta := new(ConsumerGroupMemberMetadata)
43 if err := decode(member.Metadata, meta, nil); err != nil {
44 return nil, err
45 }
46 members[member.MemberId] = *meta
47 }
48 return members, nil
49}
50
51func (r *JoinGroupResponse) encode(pe packetEncoder) error {
52 if r.Version >= 2 {
53 pe.putInt32(r.ThrottleTime)
54 }
55 pe.putKError(r.Err)
56 pe.putInt32(r.GenerationId)
57
58 if err := pe.putString(r.GroupProtocol); err != nil {
59 return err
60 }
61 if err := pe.putString(r.LeaderId); err != nil {
62 return err
63 }
64 if err := pe.putString(r.MemberId); err != nil {
65 return err
66 }
67
68 if err := pe.putArrayLength(len(r.Members)); err != nil {
69 return err
70 }
71
72 for _, member := range r.Members {
73 if err := pe.putString(member.MemberId); err != nil {
74 return err
75 }
76 if r.Version >= 5 {
77 if err := pe.putNullableString(member.GroupInstanceId); err != nil {
78 return err
79 }
80 }
81 if err := pe.putBytes(member.Metadata); err != nil {
82 return err
83 }
84 pe.putEmptyTaggedFieldArray()
85 }
86
87 pe.putEmptyTaggedFieldArray()
88 return nil
89}
90
91func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) {
92 r.Version = version
93
94 if version >= 2 {
95 if r.ThrottleTime, err = pd.getInt32(); err != nil {
96 return
97 }
98 }
99
100 r.Err, err = pd.getKError()
101 if err != nil {
102 return err
103 }
104
105 if r.GenerationId, err = pd.getInt32(); err != nil {
106 return
107 }
108
109 if r.GroupProtocol, err = pd.getString(); err != nil {
110 return
111 }
112
113 if r.LeaderId, err = pd.getString(); err != nil {
114 return
115 }
116
117 if r.MemberId, err = pd.getString(); err != nil {
118 return
119 }
120
121 n, err := pd.getArrayLength()
122 if err != nil {
123 return err
124 }
125 if n == 0 {
126 _, err = pd.getEmptyTaggedFieldArray()
127 return err
128 }
129
130 r.Members = make([]GroupMember, n)
131 for i := 0; i < n; i++ {
132 memberId, err := pd.getString()
133 if err != nil {
134 return err
135 }
136
137 var groupInstanceId *string = nil
138 if r.Version >= 5 {
139 groupInstanceId, err = pd.getNullableString()
140 if err != nil {
141 return err
142 }
143 }
144
145 memberMetadata, err := pd.getBytes()
146 if err != nil {
147 return err
148 }
149
150 r.Members[i] = GroupMember{MemberId: memberId, GroupInstanceId: groupInstanceId, Metadata: memberMetadata}
151
152 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
153 return err
154 }
155 }
156
157 _, err = pd.getEmptyTaggedFieldArray()
158 return err
159}
160
161func (r *JoinGroupResponse) key() int16 {
162 return apiKeyJoinGroup
163}
164
165func (r *JoinGroupResponse) version() int16 {
166 return r.Version
167}
168
169func (r *JoinGroupResponse) headerVersion() int16 {
170 if r.Version >= 6 {
171 return 1
172 }
173 return 0
174}
175
176func (r *JoinGroupResponse) isValidVersion() bool {
177 return r.Version >= 0 && r.Version <= 6
178}
179
180func (r *JoinGroupResponse) isFlexible() bool {
181 return r.isFlexibleVersion(r.Version)
182}
183
184func (r *JoinGroupResponse) isFlexibleVersion(version int16) bool {
185 return version >= 6
186}
187
188func (r *JoinGroupResponse) requiredVersion() KafkaVersion {
189 switch r.Version {
190 case 6:
191 return V2_4_0_0
192 case 5:
193 return V2_3_0_0
194 case 4:
195 return V2_2_0_0
196 case 3:
197 return V2_0_0_0
198 case 2:
199 return V0_11_0_0
200 case 1:
201 return V0_10_1_0
202 case 0:
203 return V0_10_0_0
204 default:
205 return V2_3_0_0
206 }
207}
208
209func (r *JoinGroupResponse) throttleTime() time.Duration {
210 return time.Duration(r.ThrottleTime) * time.Millisecond
211}