blob: b109cd91830a06e38559b96da7865566948e9cb2 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3type SyncGroupRequestAssignment struct {
4 // MemberId contains the ID of the member to assign.
5 MemberId string
6 // Assignment contains the member assignment.
7 Assignment []byte
8}
9
10func (a *SyncGroupRequestAssignment) encode(pe packetEncoder, version int16) (err error) {
11 if err := pe.putString(a.MemberId); err != nil {
12 return err
13 }
14
15 if err := pe.putBytes(a.Assignment); err != nil {
16 return err
17 }
18
19 pe.putEmptyTaggedFieldArray()
20 return nil
21}
22
23func (a *SyncGroupRequestAssignment) decode(pd packetDecoder, version int16) (err error) {
24 if a.MemberId, err = pd.getString(); err != nil {
25 return err
26 }
27
28 if a.Assignment, err = pd.getBytes(); err != nil {
29 return err
30 }
31
32 _, err = pd.getEmptyTaggedFieldArray()
33 return err
34}
35
36type SyncGroupRequest struct {
37 // Version defines the protocol version to use for encode and decode
38 Version int16
39 // GroupId contains the unique group identifier.
40 GroupId string
41 // GenerationId contains the generation of the group.
42 GenerationId int32
43 // MemberId contains the member ID assigned by the group.
44 MemberId string
45 // GroupInstanceId contains the unique identifier of the consumer instance provided by end user.
46 GroupInstanceId *string
47 // GroupAssignments contains each assignment.
48 GroupAssignments []SyncGroupRequestAssignment
49}
50
51func (s *SyncGroupRequest) setVersion(v int16) {
52 s.Version = v
53}
54
55func (s *SyncGroupRequest) encode(pe packetEncoder) (err error) {
56 if err := pe.putString(s.GroupId); err != nil {
57 return err
58 }
59
60 pe.putInt32(s.GenerationId)
61
62 if err := pe.putString(s.MemberId); err != nil {
63 return err
64 }
65
66 if s.Version >= 3 {
67 if err := pe.putNullableString(s.GroupInstanceId); err != nil {
68 return err
69 }
70 }
71
72 if err := pe.putArrayLength(len(s.GroupAssignments)); err != nil {
73 return err
74 }
75 for _, block := range s.GroupAssignments {
76 if err := block.encode(pe, s.Version); err != nil {
77 return err
78 }
79 }
80
81 pe.putEmptyTaggedFieldArray()
82 return nil
83}
84
85func (s *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) {
86 s.Version = version
87 if s.GroupId, err = pd.getString(); err != nil {
88 return err
89 }
90
91 if s.GenerationId, err = pd.getInt32(); err != nil {
92 return err
93 }
94
95 if s.MemberId, err = pd.getString(); err != nil {
96 return err
97 }
98
99 if s.Version >= 3 {
100 if s.GroupInstanceId, err = pd.getNullableString(); err != nil {
101 return err
102 }
103 }
104
105 if numAssignments, err := pd.getArrayLength(); err != nil {
106 return err
107 } else if numAssignments > 0 {
108 s.GroupAssignments = make([]SyncGroupRequestAssignment, numAssignments)
109 for i := 0; i < numAssignments; i++ {
110 var block SyncGroupRequestAssignment
111 if err := block.decode(pd, s.Version); err != nil {
112 return err
113 }
114 s.GroupAssignments[i] = block
115 }
116 }
117
118 _, err = pd.getEmptyTaggedFieldArray()
119 return err
120}
121
122func (r *SyncGroupRequest) key() int16 {
123 return apiKeySyncGroup
124}
125
126func (r *SyncGroupRequest) version() int16 {
127 return r.Version
128}
129
130func (r *SyncGroupRequest) headerVersion() int16 {
131 if r.Version >= 4 {
132 return 2
133 }
134 return 1
135}
136
137func (r *SyncGroupRequest) isValidVersion() bool {
138 return r.Version >= 0 && r.Version <= 4
139}
140
141func (r *SyncGroupRequest) isFlexible() bool {
142 return r.isFlexibleVersion(r.Version)
143}
144
145func (r *SyncGroupRequest) isFlexibleVersion(version int16) bool {
146 return version >= 4
147}
148
149func (r *SyncGroupRequest) requiredVersion() KafkaVersion {
150 switch r.Version {
151 case 4:
152 return V2_4_0_0
153 case 3:
154 return V2_3_0_0
155 case 2:
156 return V2_0_0_0
157 case 1:
158 return V0_11_0_0
159 case 0:
160 return V0_9_0_0
161 default:
162 return V2_3_0_0
163 }
164}
165
166func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) {
167 r.GroupAssignments = append(r.GroupAssignments, SyncGroupRequestAssignment{
168 MemberId: memberId,
169 Assignment: memberAssignment,
170 })
171}
172
173func (r *SyncGroupRequest) AddGroupAssignmentMember(
174 memberId string,
175 memberAssignment *ConsumerGroupMemberAssignment,
176) error {
177 bin, err := encode(memberAssignment, nil)
178 if err != nil {
179 return err
180 }
181
182 r.AddGroupAssignment(memberId, bin)
183 return nil
184}