blob: 6d072dd720f5b5d308d7e782bead6db45c71fde8 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import "fmt"
4
5type fetchRequestBlock struct {
6 Version int16
7 // currentLeaderEpoch contains the current leader epoch of the partition.
8 currentLeaderEpoch int32
9 // fetchOffset contains the message offset.
10 fetchOffset int64
11 // logStartOffset contains the earliest available offset of the follower
12 // replica. The field is only used when the request is sent by the
13 // follower.
14 logStartOffset int64
15 // maxBytes contains the maximum bytes to fetch from this partition. See
16 // KIP-74 for cases where this limit may not be honored.
17 maxBytes int32
18}
19
20func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
21 b.Version = version
22 if b.Version >= 9 {
23 pe.putInt32(b.currentLeaderEpoch)
24 }
25 pe.putInt64(b.fetchOffset)
26 if b.Version >= 5 {
27 pe.putInt64(b.logStartOffset)
28 }
29 pe.putInt32(b.maxBytes)
30 return nil
31}
32
33func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) {
34 b.Version = version
35 if b.Version >= 9 {
36 if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
37 return err
38 }
39 }
40 if b.fetchOffset, err = pd.getInt64(); err != nil {
41 return err
42 }
43 if b.Version >= 5 {
44 if b.logStartOffset, err = pd.getInt64(); err != nil {
45 return err
46 }
47 }
48 if b.maxBytes, err = pd.getInt32(); err != nil {
49 return err
50 }
51 return nil
52}
53
54// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See
55// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
56// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
57type FetchRequest struct {
58 // Version defines the protocol version to use for encode and decode
59 Version int16
60 // ReplicaID contains the broker ID of the follower, of -1 if this request
61 // is from a consumer.
62 // ReplicaID int32
63 // MaxWaitTime contains the maximum time in milliseconds to wait for the response.
64 MaxWaitTime int32
65 // MinBytes contains the minimum bytes to accumulate in the response.
66 MinBytes int32
67 // MaxBytes contains the maximum bytes to fetch. See KIP-74 for cases
68 // where this limit may not be honored.
69 MaxBytes int32
70 // Isolation contains a This setting controls the visibility of
71 // transactional records. Using READ_UNCOMMITTED (isolation_level = 0)
72 // makes all records visible. With READ_COMMITTED (isolation_level = 1),
73 // non-transactional and COMMITTED transactional records are visible. To be
74 // more concrete, READ_COMMITTED returns all data from offsets smaller than
75 // the current LSO (last stable offset), and enables the inclusion of the
76 // list of aborted transactions in the result, which allows consumers to
77 // discard ABORTED transactional records
78 Isolation IsolationLevel
79 // SessionID contains the fetch session ID.
80 SessionID int32
81 // SessionEpoch contains the epoch of the partition leader as known to the
82 // follower replica or a consumer.
83 SessionEpoch int32
84 // blocks contains the topics to fetch.
85 blocks map[string]map[int32]*fetchRequestBlock
86 // forgotten contains in an incremental fetch request, the partitions to remove.
87 forgotten map[string][]int32
88 // RackID contains a Rack ID of the consumer making this request
89 RackID string
90}
91
92func (r *FetchRequest) setVersion(v int16) {
93 r.Version = v
94}
95
96type IsolationLevel int8
97
98const (
99 ReadUncommitted IsolationLevel = iota
100 ReadCommitted
101)
102
103func (r *FetchRequest) encode(pe packetEncoder) (err error) {
104 metricRegistry := pe.metricRegistry()
105
106 pe.putInt32(-1) // ReplicaID is always -1 for clients
107 pe.putInt32(r.MaxWaitTime)
108 pe.putInt32(r.MinBytes)
109 if r.Version >= 3 {
110 pe.putInt32(r.MaxBytes)
111 }
112 if r.Version >= 4 {
113 pe.putInt8(int8(r.Isolation))
114 }
115 if r.Version >= 7 {
116 pe.putInt32(r.SessionID)
117 pe.putInt32(r.SessionEpoch)
118 }
119 err = pe.putArrayLength(len(r.blocks))
120 if err != nil {
121 return err
122 }
123 for topic, blocks := range r.blocks {
124 err = pe.putString(topic)
125 if err != nil {
126 return err
127 }
128 err = pe.putArrayLength(len(blocks))
129 if err != nil {
130 return err
131 }
132 for partition, block := range blocks {
133 pe.putInt32(partition)
134 err = block.encode(pe, r.Version)
135 if err != nil {
136 return err
137 }
138 }
139 getOrRegisterTopicMeter("consumer-fetch-rate", topic, metricRegistry).Mark(1)
140 }
141 if r.Version >= 7 {
142 err = pe.putArrayLength(len(r.forgotten))
143 if err != nil {
144 return err
145 }
146 for topic, partitions := range r.forgotten {
147 err = pe.putString(topic)
148 if err != nil {
149 return err
150 }
151 err = pe.putArrayLength(len(partitions))
152 if err != nil {
153 return err
154 }
155 for _, partition := range partitions {
156 pe.putInt32(partition)
157 }
158 }
159 }
160 if r.Version >= 11 {
161 err = pe.putString(r.RackID)
162 if err != nil {
163 return err
164 }
165 }
166
167 return nil
168}
169
170func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
171 r.Version = version
172
173 if _, err = pd.getInt32(); err != nil {
174 return err
175 }
176 if r.MaxWaitTime, err = pd.getInt32(); err != nil {
177 return err
178 }
179 if r.MinBytes, err = pd.getInt32(); err != nil {
180 return err
181 }
182 if r.Version >= 3 {
183 if r.MaxBytes, err = pd.getInt32(); err != nil {
184 return err
185 }
186 }
187 if r.Version >= 4 {
188 isolation, err := pd.getInt8()
189 if err != nil {
190 return err
191 }
192 r.Isolation = IsolationLevel(isolation)
193 }
194 if r.Version >= 7 {
195 r.SessionID, err = pd.getInt32()
196 if err != nil {
197 return err
198 }
199 r.SessionEpoch, err = pd.getInt32()
200 if err != nil {
201 return err
202 }
203 }
204 topicCount, err := pd.getArrayLength()
205 if err != nil {
206 return err
207 }
208 if topicCount == 0 {
209 return nil
210 }
211 r.blocks = make(map[string]map[int32]*fetchRequestBlock)
212 for i := 0; i < topicCount; i++ {
213 topic, err := pd.getString()
214 if err != nil {
215 return err
216 }
217 partitionCount, err := pd.getArrayLength()
218 if err != nil {
219 return err
220 }
221 r.blocks[topic] = make(map[int32]*fetchRequestBlock)
222 for j := 0; j < partitionCount; j++ {
223 partition, err := pd.getInt32()
224 if err != nil {
225 return err
226 }
227 fetchBlock := &fetchRequestBlock{}
228 if err = fetchBlock.decode(pd, r.Version); err != nil {
229 return err
230 }
231 r.blocks[topic][partition] = fetchBlock
232 }
233 }
234
235 if r.Version >= 7 {
236 forgottenCount, err := pd.getArrayLength()
237 if err != nil {
238 return err
239 }
240 r.forgotten = make(map[string][]int32)
241 for i := 0; i < forgottenCount; i++ {
242 topic, err := pd.getString()
243 if err != nil {
244 return err
245 }
246 partitionCount, err := pd.getArrayLength()
247 if err != nil {
248 return err
249 }
250 if partitionCount < 0 {
251 return fmt.Errorf("partitionCount %d is invalid", partitionCount)
252 }
253 r.forgotten[topic] = make([]int32, partitionCount)
254
255 for j := 0; j < partitionCount; j++ {
256 partition, err := pd.getInt32()
257 if err != nil {
258 return err
259 }
260 r.forgotten[topic][j] = partition
261 }
262 }
263 }
264
265 if r.Version >= 11 {
266 r.RackID, err = pd.getString()
267 if err != nil {
268 return err
269 }
270 }
271
272 return nil
273}
274
275func (r *FetchRequest) key() int16 {
276 return apiKeyFetch
277}
278
279func (r *FetchRequest) version() int16 {
280 return r.Version
281}
282
283func (r *FetchRequest) headerVersion() int16 {
284 return 1
285}
286
287func (r *FetchRequest) isValidVersion() bool {
288 return r.Version >= 0 && r.Version <= 11
289}
290
291func (r *FetchRequest) requiredVersion() KafkaVersion {
292 switch r.Version {
293 case 11:
294 return V2_3_0_0
295 case 9, 10:
296 return V2_1_0_0
297 case 8:
298 return V2_0_0_0
299 case 7:
300 return V1_1_0_0
301 case 6:
302 return V1_0_0_0
303 case 4, 5:
304 return V0_11_0_0
305 case 3:
306 return V0_10_1_0
307 case 2:
308 return V0_10_0_0
309 case 1:
310 return V0_9_0_0
311 case 0:
312 return V0_8_2_0
313 default:
314 return V2_3_0_0
315 }
316}
317
318func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32, leaderEpoch int32) {
319 if r.blocks == nil {
320 r.blocks = make(map[string]map[int32]*fetchRequestBlock)
321 }
322
323 if r.Version >= 7 && r.forgotten == nil {
324 r.forgotten = make(map[string][]int32)
325 }
326
327 if r.blocks[topic] == nil {
328 r.blocks[topic] = make(map[int32]*fetchRequestBlock)
329 }
330
331 tmp := new(fetchRequestBlock)
332 tmp.Version = r.Version
333 tmp.maxBytes = maxBytes
334 tmp.fetchOffset = fetchOffset
335 if r.Version >= 9 {
336 tmp.currentLeaderEpoch = leaderEpoch
337 }
338
339 r.blocks[topic][partitionID] = tmp
340}