blob: e8cbc46cd0badc59ee7eea339b3a657e97effb74 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3type GroupProtocol struct {
4 // Name contains the protocol name.
5 Name string
6 // Metadata contains the protocol metadata.
7 Metadata []byte
8}
9
10func (p *GroupProtocol) decode(pd packetDecoder) (err error) {
11 p.Name, err = pd.getString()
12 if err != nil {
13 return err
14 }
15 p.Metadata, err = pd.getBytes()
16 if err != nil {
17 return err
18 }
19 _, err = pd.getEmptyTaggedFieldArray()
20 return err
21}
22
23func (p *GroupProtocol) encode(pe packetEncoder) (err error) {
24 if err := pe.putString(p.Name); err != nil {
25 return err
26 }
27 if err := pe.putBytes(p.Metadata); err != nil {
28 return err
29 }
30 pe.putEmptyTaggedFieldArray()
31 return nil
32}
33
34type JoinGroupRequest struct {
35 // Version defines the protocol version to use for encode and decode
36 Version int16
37 // GroupId contains the group identifier.
38 GroupId string
39 // SessionTimeout specifies that the coordinator should consider the consumer
40 // dead if it receives no heartbeat after this timeout in milliseconds.
41 SessionTimeout int32
42 // RebalanceTimeout contains the maximum time in milliseconds that the
43 // coordinator will wait for each member to rejoin when rebalancing the
44 // group.
45 RebalanceTimeout int32
46 // MemberId contains the member id assigned by the group coordinator.
47 MemberId string
48 // GroupInstanceId contains the unique identifier of the consumer instance
49 // provided by end user.
50 GroupInstanceId *string
51 // ProtocolType contains the unique name the for class of protocols
52 // implemented by the group we want to join.
53 ProtocolType string
54 // GroupProtocols contains the list of protocols that the member supports.
55 // deprecated; use OrderedGroupProtocols
56 GroupProtocols map[string][]byte
57 // OrderedGroupProtocols contains an ordered list of protocols that the member
58 // supports.
59 OrderedGroupProtocols []*GroupProtocol
60}
61
62func (r *JoinGroupRequest) setVersion(v int16) {
63 r.Version = v
64}
65
66func (r *JoinGroupRequest) encode(pe packetEncoder) error {
67 if err := pe.putString(r.GroupId); err != nil {
68 return err
69 }
70 pe.putInt32(r.SessionTimeout)
71 if r.Version >= 1 {
72 pe.putInt32(r.RebalanceTimeout)
73 }
74 if err := pe.putString(r.MemberId); err != nil {
75 return err
76 }
77 if r.Version >= 5 {
78 if err := pe.putNullableString(r.GroupInstanceId); err != nil {
79 return err
80 }
81 }
82 if err := pe.putString(r.ProtocolType); err != nil {
83 return err
84 }
85
86 if len(r.GroupProtocols) > 0 {
87 if len(r.OrderedGroupProtocols) > 0 {
88 return PacketEncodingError{"cannot specify both GroupProtocols and OrderedGroupProtocols on JoinGroupRequest"}
89 }
90
91 if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
92 return err
93 }
94 for name, metadata := range r.GroupProtocols {
95 if err := pe.putString(name); err != nil {
96 return err
97 }
98 if err := pe.putBytes(metadata); err != nil {
99 return err
100 }
101 pe.putEmptyTaggedFieldArray()
102 }
103 } else {
104 if err := pe.putArrayLength(len(r.OrderedGroupProtocols)); err != nil {
105 return err
106 }
107 for _, protocol := range r.OrderedGroupProtocols {
108 if err := protocol.encode(pe); err != nil {
109 return err
110 }
111 }
112 }
113
114 pe.putEmptyTaggedFieldArray()
115 return nil
116}
117
118func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) {
119 r.Version = version
120
121 if r.GroupId, err = pd.getString(); err != nil {
122 return
123 }
124
125 if r.SessionTimeout, err = pd.getInt32(); err != nil {
126 return
127 }
128
129 if version >= 1 {
130 if r.RebalanceTimeout, err = pd.getInt32(); err != nil {
131 return err
132 }
133 }
134
135 if r.MemberId, err = pd.getString(); err != nil {
136 return
137 }
138
139 if version >= 5 {
140 if r.GroupInstanceId, err = pd.getNullableString(); err != nil {
141 return
142 }
143 }
144
145 if r.ProtocolType, err = pd.getString(); err != nil {
146 return
147 }
148
149 n, err := pd.getArrayLength()
150 if err != nil {
151 return err
152 }
153 if n == 0 {
154 return nil
155 }
156
157 r.GroupProtocols = make(map[string][]byte)
158 for i := 0; i < n; i++ {
159 protocol := &GroupProtocol{}
160 if err := protocol.decode(pd); err != nil {
161 return err
162 }
163 r.GroupProtocols[protocol.Name] = protocol.Metadata
164 r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, protocol)
165 }
166
167 _, err = pd.getEmptyTaggedFieldArray()
168 return err
169}
170
171func (r *JoinGroupRequest) key() int16 {
172 return apiKeyJoinGroup
173}
174
175func (r *JoinGroupRequest) version() int16 {
176 return r.Version
177}
178
179func (r *JoinGroupRequest) headerVersion() int16 {
180 if r.Version >= 6 {
181 return 2
182 }
183 return 1
184}
185
186func (r *JoinGroupRequest) isValidVersion() bool {
187 return r.Version >= 0 && r.Version <= 6
188}
189
190func (r *JoinGroupRequest) isFlexible() bool {
191 return r.isFlexibleVersion(r.Version)
192}
193
194func (r *JoinGroupRequest) isFlexibleVersion(version int16) bool {
195 return version >= 6
196}
197
198func (r *JoinGroupRequest) requiredVersion() KafkaVersion {
199 switch r.Version {
200 case 6:
201 return V2_4_0_0
202 case 5:
203 return V2_3_0_0
204 case 4:
205 return V2_2_0_0
206 case 3:
207 return V2_0_0_0
208 case 2:
209 return V0_11_0_0
210 case 1:
211 return V0_10_1_0
212 case 0:
213 return V0_10_0_0
214 default:
215 return V2_3_0_0
216 }
217}
218
219func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
220 r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, &GroupProtocol{
221 Name: name,
222 Metadata: metadata,
223 })
224}
225
226func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
227 bin, err := encode(metadata, nil)
228 if err != nil {
229 return err
230 }
231
232 r.AddGroupProtocol(name, bin)
233 return nil
234}