blob: 2d38960919940d6dd72a37331ba8da87b2f604c2 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import "errors"
4
5// ConsumerGroupMemberMetadata holds the metadata for consumer group
6// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
7type ConsumerGroupMemberMetadata struct {
8 Version int16
9 Topics []string
10 UserData []byte
11 OwnedPartitions []*OwnedPartition
12 GenerationID int32
13 RackID *string
14}
15
16func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
17 pe.putInt16(m.Version)
18
19 if err := pe.putStringArray(m.Topics); err != nil {
20 return err
21 }
22
23 if err := pe.putBytes(m.UserData); err != nil {
24 return err
25 }
26
27 if m.Version >= 1 {
28 if err := pe.putArrayLength(len(m.OwnedPartitions)); err != nil {
29 return err
30 }
31 for _, op := range m.OwnedPartitions {
32 if err := op.encode(pe); err != nil {
33 return err
34 }
35 }
36 }
37
38 if m.Version >= 2 {
39 pe.putInt32(m.GenerationID)
40 }
41
42 if m.Version >= 3 {
43 if err := pe.putNullableString(m.RackID); err != nil {
44 return err
45 }
46 }
47
48 return nil
49}
50
51func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
52 if m.Version, err = pd.getInt16(); err != nil {
53 return
54 }
55
56 if m.Topics, err = pd.getStringArray(); err != nil {
57 return
58 }
59
60 if m.UserData, err = pd.getBytes(); err != nil {
61 return
62 }
63 if m.Version >= 1 {
64 n, err := pd.getArrayLength()
65 if err != nil {
66 // permit missing data here in case of misbehaving 3rd party
67 // clients who incorrectly marked the member metadata as V1 in
68 // their JoinGroup request
69 if errors.Is(err, ErrInsufficientData) {
70 return nil
71 }
72 return err
73 }
74 if n > 0 {
75 m.OwnedPartitions = make([]*OwnedPartition, n)
76 for i := 0; i < n; i++ {
77 m.OwnedPartitions[i] = &OwnedPartition{}
78 if err := m.OwnedPartitions[i].decode(pd); err != nil {
79 return err
80 }
81 }
82 }
83 }
84
85 if m.Version >= 2 {
86 if m.GenerationID, err = pd.getInt32(); err != nil {
87 return err
88 }
89 }
90
91 if m.Version >= 3 {
92 if m.RackID, err = pd.getNullableString(); err != nil {
93 return err
94 }
95 }
96
97 return nil
98}
99
100type OwnedPartition struct {
101 Topic string
102 Partitions []int32
103}
104
105func (m *OwnedPartition) encode(pe packetEncoder) error {
106 if err := pe.putString(m.Topic); err != nil {
107 return err
108 }
109 if err := pe.putInt32Array(m.Partitions); err != nil {
110 return err
111 }
112 return nil
113}
114
115func (m *OwnedPartition) decode(pd packetDecoder) (err error) {
116 if m.Topic, err = pd.getString(); err != nil {
117 return err
118 }
119 if m.Partitions, err = pd.getInt32Array(); err != nil {
120 return err
121 }
122
123 return nil
124}
125
126// ConsumerGroupMemberAssignment holds the member assignment for a consume group
127// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
128type ConsumerGroupMemberAssignment struct {
129 Version int16
130 Topics map[string][]int32
131 UserData []byte
132}
133
134func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error {
135 pe.putInt16(m.Version)
136
137 if err := pe.putArrayLength(len(m.Topics)); err != nil {
138 return err
139 }
140
141 for topic, partitions := range m.Topics {
142 if err := pe.putString(topic); err != nil {
143 return err
144 }
145 if err := pe.putInt32Array(partitions); err != nil {
146 return err
147 }
148 }
149
150 if err := pe.putBytes(m.UserData); err != nil {
151 return err
152 }
153
154 return nil
155}
156
157func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) {
158 if m.Version, err = pd.getInt16(); err != nil {
159 return
160 }
161
162 var topicLen int
163 if topicLen, err = pd.getArrayLength(); err != nil {
164 return
165 }
166
167 m.Topics = make(map[string][]int32, topicLen)
168 for i := 0; i < topicLen; i++ {
169 var topic string
170 if topic, err = pd.getString(); err != nil {
171 return
172 }
173 if m.Topics[topic], err = pd.getInt32Array(); err != nil {
174 return
175 }
176 }
177
178 if m.UserData, err = pd.getBytes(); err != nil {
179 return
180 }
181
182 return nil
183}