blob: 0cfbe7f4d7a3239553f91e5510d3917a060046b0 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3type OffsetFetchRequest struct {
4 Version int16
5 ConsumerGroup string
6 RequireStable bool // requires v7+
7 partitions map[string][]int32
8}
9
10func (r *OffsetFetchRequest) setVersion(v int16) {
11 r.Version = v
12}
13
14func NewOffsetFetchRequest(
15 version KafkaVersion,
16 group string,
17 partitions map[string][]int32,
18) *OffsetFetchRequest {
19 request := &OffsetFetchRequest{
20 ConsumerGroup: group,
21 partitions: partitions,
22 }
23 if version.IsAtLeast(V2_5_0_0) {
24 // Version 7 is adding the require stable flag.
25 request.Version = 7
26 } else if version.IsAtLeast(V2_4_0_0) {
27 // Version 6 is the first flexible version.
28 request.Version = 6
29 } else if version.IsAtLeast(V2_1_0_0) {
30 // Version 3, 4, and 5 are the same as version 2.
31 request.Version = 5
32 } else if version.IsAtLeast(V2_0_0_0) {
33 request.Version = 4
34 } else if version.IsAtLeast(V0_11_0_0) {
35 request.Version = 3
36 } else if version.IsAtLeast(V0_10_2_0) {
37 // Starting in version 2, the request can contain a null topics array to indicate that offsets
38 // for all topics should be fetched. It also returns a top level error code
39 // for group or coordinator level errors.
40 request.Version = 2
41 } else if version.IsAtLeast(V0_8_2_0) {
42 // In version 0, the request read offsets from ZK.
43 //
44 // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
45 request.Version = 1
46 }
47
48 return request
49}
50
51func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
52 if r.Version < 0 || r.Version > 7 {
53 return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
54 }
55
56 err = pe.putString(r.ConsumerGroup)
57 if err != nil {
58 return err
59 }
60
61 if r.partitions == nil && r.Version >= 2 {
62 if err := pe.putArrayLength(-1); err != nil {
63 return err
64 }
65 } else {
66 if err = pe.putArrayLength(len(r.partitions)); err != nil {
67 return err
68 }
69 }
70
71 for topic, partitions := range r.partitions {
72 err = pe.putString(topic)
73 if err != nil {
74 return err
75 }
76
77 err = pe.putInt32Array(partitions)
78 if err != nil {
79 return err
80 }
81
82 pe.putEmptyTaggedFieldArray()
83 }
84
85 if r.RequireStable && r.Version < 7 {
86 return PacketEncodingError{"requireStable is not supported. use version 7 or later"}
87 }
88
89 if r.Version >= 7 {
90 pe.putBool(r.RequireStable)
91 }
92
93 pe.putEmptyTaggedFieldArray()
94 return nil
95}
96
97func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) {
98 r.Version = version
99 r.ConsumerGroup, err = pd.getString()
100 if err != nil {
101 return err
102 }
103
104 partitionCount, err := pd.getArrayLength()
105 if err != nil {
106 return err
107 }
108
109 if (partitionCount == 0 && version < 2) || partitionCount < 0 {
110 return nil
111 }
112
113 r.partitions = make(map[string][]int32, partitionCount)
114 for i := 0; i < partitionCount; i++ {
115 topic, err := pd.getString()
116 if err != nil {
117 return err
118 }
119
120 partitions, err := pd.getInt32Array()
121 if err != nil {
122 return err
123 }
124 _, err = pd.getEmptyTaggedFieldArray()
125 if err != nil {
126 return err
127 }
128
129 r.partitions[topic] = partitions
130 }
131
132 if r.Version >= 7 {
133 r.RequireStable, err = pd.getBool()
134 if err != nil {
135 return err
136 }
137 }
138
139 _, err = pd.getEmptyTaggedFieldArray()
140 return err
141}
142
143func (r *OffsetFetchRequest) key() int16 {
144 return apiKeyOffsetFetch
145}
146
147func (r *OffsetFetchRequest) version() int16 {
148 return r.Version
149}
150
151func (r *OffsetFetchRequest) headerVersion() int16 {
152 if r.Version >= 6 {
153 return 2
154 }
155
156 return 1
157}
158
159func (r *OffsetFetchRequest) isValidVersion() bool {
160 return r.Version >= 0 && r.Version <= 7
161}
162
163func (r *OffsetFetchRequest) isFlexible() bool {
164 return r.isFlexibleVersion(r.Version)
165}
166
167func (r *OffsetFetchRequest) isFlexibleVersion(version int16) bool {
168 return version >= 6
169}
170
171func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
172 switch r.Version {
173 case 7:
174 return V2_5_0_0
175 case 6:
176 return V2_4_0_0
177 case 5:
178 return V2_1_0_0
179 case 4:
180 return V2_0_0_0
181 case 3:
182 return V0_11_0_0
183 case 2:
184 return V0_10_2_0
185 case 1:
186 return V0_8_2_0
187 case 0:
188 return V0_8_2_0
189 default:
190 return V2_5_0_0
191 }
192}
193
194func (r *OffsetFetchRequest) ZeroPartitions() {
195 if r.partitions == nil && r.Version >= 2 {
196 r.partitions = make(map[string][]int32)
197 }
198}
199
200func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
201 if r.partitions == nil {
202 r.partitions = make(map[string][]int32)
203 }
204
205 r.partitions[topic] = append(r.partitions[topic], partitionID)
206}