blob: 95de90eddf7dde14be3000f59e505c5f6034f58c [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import (
4 "errors"
5 "sort"
6 "time"
7
8 "github.com/rcrowley/go-metrics"
9)
10
11const (
12 invalidLeaderEpoch = -1
13 invalidPreferredReplicaID = -1
14)
15
16type AbortedTransaction struct {
17 // ProducerID contains the producer id associated with the aborted transaction.
18 ProducerID int64
19 // FirstOffset contains the first offset in the aborted transaction.
20 FirstOffset int64
21}
22
23func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
24 if t.ProducerID, err = pd.getInt64(); err != nil {
25 return err
26 }
27
28 if t.FirstOffset, err = pd.getInt64(); err != nil {
29 return err
30 }
31
32 return nil
33}
34
35func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
36 pe.putInt64(t.ProducerID)
37 pe.putInt64(t.FirstOffset)
38
39 return nil
40}
41
42type FetchResponseBlock struct {
43 // Err contains the error code, or 0 if there was no fetch error.
44 Err KError
45 // HighWatermarkOffset contains the current high water mark.
46 HighWaterMarkOffset int64
47 // LastStableOffset contains the last stable offset (or LSO) of the
48 // partition. This is the last offset such that the state of all
49 // transactional records prior to this offset have been decided (ABORTED or
50 // COMMITTED)
51 LastStableOffset int64
52 // LogStartOffset contains the current log start offset.
53 LogStartOffset int64
54 // AbortedTransactions contains the aborted transactions.
55 AbortedTransactions []*AbortedTransaction
56 // PreferredReadReplica contains the preferred read replica for the
57 // consumer to use on its next fetch request
58 PreferredReadReplica int32
59 // RecordsSet contains the record data.
60 RecordsSet []*Records
61
62 Partial bool
63 Records *Records // deprecated: use FetchResponseBlock.RecordsSet
64
65 // recordsNextOffset contains the next consecutive offset following this response block.
66 // This field is computed locally and is not part of the server's binary response.
67 recordsNextOffset *int64
68}
69
70func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
71 metricRegistry := pd.metricRegistry()
72 var sizeMetric metrics.Histogram
73 if metricRegistry != nil {
74 sizeMetric = getOrRegisterHistogram("consumer-fetch-response-size", metricRegistry)
75 }
76
77 b.Err, err = pd.getKError()
78 if err != nil {
79 return err
80 }
81
82 b.HighWaterMarkOffset, err = pd.getInt64()
83 if err != nil {
84 return err
85 }
86
87 if version >= 4 {
88 b.LastStableOffset, err = pd.getInt64()
89 if err != nil {
90 return err
91 }
92
93 if version >= 5 {
94 b.LogStartOffset, err = pd.getInt64()
95 if err != nil {
96 return err
97 }
98 }
99
100 numTransact, err := pd.getArrayLength()
101 if err != nil {
102 return err
103 }
104
105 if numTransact >= 0 {
106 b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
107 }
108
109 for i := 0; i < numTransact; i++ {
110 transact := new(AbortedTransaction)
111 if err = transact.decode(pd); err != nil {
112 return err
113 }
114 b.AbortedTransactions[i] = transact
115 }
116 }
117
118 if version >= 11 {
119 b.PreferredReadReplica, err = pd.getInt32()
120 if err != nil {
121 return err
122 }
123 } else {
124 b.PreferredReadReplica = -1
125 }
126
127 recordsSize, err := pd.getInt32()
128 if err != nil {
129 return err
130 }
131 if sizeMetric != nil {
132 sizeMetric.Update(int64(recordsSize))
133 }
134
135 recordsDecoder, err := pd.getSubset(int(recordsSize))
136 if err != nil {
137 return err
138 }
139
140 b.RecordsSet = []*Records{}
141
142 for recordsDecoder.remaining() > 0 {
143 records := &Records{}
144 if err := records.decode(recordsDecoder); err != nil {
145 // If we have at least one decoded records, this is not an error
146 if errors.Is(err, ErrInsufficientData) {
147 if len(b.RecordsSet) == 0 {
148 b.Partial = true
149 }
150 break
151 }
152 return err
153 }
154
155 b.recordsNextOffset, err = records.nextOffset()
156 if err != nil {
157 return err
158 }
159
160 partial, err := records.isPartial()
161 if err != nil {
162 return err
163 }
164
165 n, err := records.numRecords()
166 if err != nil {
167 return err
168 }
169
170 if n > 0 || (partial && len(b.RecordsSet) == 0) {
171 b.RecordsSet = append(b.RecordsSet, records)
172
173 if b.Records == nil {
174 b.Records = records
175 }
176 }
177
178 overflow, err := records.isOverflow()
179 if err != nil {
180 return err
181 }
182
183 if partial || overflow {
184 break
185 }
186 }
187
188 return nil
189}
190
191func (b *FetchResponseBlock) numRecords() (int, error) {
192 sum := 0
193
194 for _, records := range b.RecordsSet {
195 count, err := records.numRecords()
196 if err != nil {
197 return 0, err
198 }
199
200 sum += count
201 }
202
203 return sum, nil
204}
205
206func (b *FetchResponseBlock) isPartial() (bool, error) {
207 if b.Partial {
208 return true, nil
209 }
210
211 if len(b.RecordsSet) == 1 {
212 return b.RecordsSet[0].isPartial()
213 }
214
215 return false, nil
216}
217
218func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
219 pe.putKError(b.Err)
220
221 pe.putInt64(b.HighWaterMarkOffset)
222
223 if version >= 4 {
224 pe.putInt64(b.LastStableOffset)
225
226 if version >= 5 {
227 pe.putInt64(b.LogStartOffset)
228 }
229
230 if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
231 return err
232 }
233 for _, transact := range b.AbortedTransactions {
234 if err = transact.encode(pe); err != nil {
235 return err
236 }
237 }
238 }
239
240 if version >= 11 {
241 pe.putInt32(b.PreferredReadReplica)
242 }
243
244 pe.push(&lengthField{})
245 for _, records := range b.RecordsSet {
246 err = records.encode(pe)
247 if err != nil {
248 return err
249 }
250 }
251 return pe.pop()
252}
253
254func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
255 // I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
256 // plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
257 at := b.AbortedTransactions
258 sort.Slice(
259 at,
260 func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
261 )
262 return at
263}
264
265type FetchResponse struct {
266 // Version defines the protocol version to use for encode and decode
267 Version int16
268 // ThrottleTime contains the duration in milliseconds for which the request
269 // was throttled due to a quota violation, or zero if the request did not
270 // violate any quota.
271 ThrottleTime time.Duration
272 // ErrorCode contains the top level response error code.
273 ErrorCode int16
274 // SessionID contains the fetch session ID, or 0 if this is not part of a fetch session.
275 SessionID int32
276 // Blocks contains the response topics.
277 Blocks map[string]map[int32]*FetchResponseBlock
278
279 LogAppendTime bool
280 Timestamp time.Time
281}
282
283func (r *FetchResponse) setVersion(v int16) {
284 r.Version = v
285}
286
287func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
288 r.Version = version
289
290 if r.Version >= 1 {
291 if r.ThrottleTime, err = pd.getDurationMs(); err != nil {
292 return err
293 }
294 }
295
296 if r.Version >= 7 {
297 r.ErrorCode, err = pd.getInt16()
298 if err != nil {
299 return err
300 }
301 r.SessionID, err = pd.getInt32()
302 if err != nil {
303 return err
304 }
305 }
306
307 numTopics, err := pd.getArrayLength()
308 if err != nil {
309 return err
310 }
311
312 r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
313 for i := 0; i < numTopics; i++ {
314 name, err := pd.getString()
315 if err != nil {
316 return err
317 }
318
319 numBlocks, err := pd.getArrayLength()
320 if err != nil {
321 return err
322 }
323
324 r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
325
326 for j := 0; j < numBlocks; j++ {
327 id, err := pd.getInt32()
328 if err != nil {
329 return err
330 }
331
332 block := new(FetchResponseBlock)
333 err = block.decode(pd, version)
334 if err != nil {
335 return err
336 }
337 r.Blocks[name][id] = block
338 }
339 }
340
341 return nil
342}
343
344func (r *FetchResponse) encode(pe packetEncoder) (err error) {
345 if r.Version >= 1 {
346 pe.putDurationMs(r.ThrottleTime)
347 }
348
349 if r.Version >= 7 {
350 pe.putInt16(r.ErrorCode)
351 pe.putInt32(r.SessionID)
352 }
353
354 err = pe.putArrayLength(len(r.Blocks))
355 if err != nil {
356 return err
357 }
358
359 for topic, partitions := range r.Blocks {
360 err = pe.putString(topic)
361 if err != nil {
362 return err
363 }
364
365 err = pe.putArrayLength(len(partitions))
366 if err != nil {
367 return err
368 }
369
370 for id, block := range partitions {
371 pe.putInt32(id)
372 err = block.encode(pe, r.Version)
373 if err != nil {
374 return err
375 }
376 }
377 }
378 return nil
379}
380
381func (r *FetchResponse) key() int16 {
382 return apiKeyFetch
383}
384
385func (r *FetchResponse) version() int16 {
386 return r.Version
387}
388
389func (r *FetchResponse) headerVersion() int16 {
390 return 0
391}
392
393func (r *FetchResponse) isValidVersion() bool {
394 return r.Version >= 0 && r.Version <= 11
395}
396
397func (r *FetchResponse) requiredVersion() KafkaVersion {
398 switch r.Version {
399 case 11:
400 return V2_3_0_0
401 case 9, 10:
402 return V2_1_0_0
403 case 8:
404 return V2_0_0_0
405 case 7:
406 return V1_1_0_0
407 case 6:
408 return V1_0_0_0
409 case 4, 5:
410 return V0_11_0_0
411 case 3:
412 return V0_10_1_0
413 case 2:
414 return V0_10_0_0
415 case 1:
416 return V0_9_0_0
417 case 0:
418 return V0_8_2_0
419 default:
420 return V2_3_0_0
421 }
422}
423
424func (r *FetchResponse) throttleTime() time.Duration {
425 return r.ThrottleTime
426}
427
428func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
429 if r.Blocks == nil {
430 return nil
431 }
432
433 if r.Blocks[topic] == nil {
434 return nil
435 }
436
437 return r.Blocks[topic][partition]
438}
439
440func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
441 if r.Blocks == nil {
442 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
443 }
444 partitions, ok := r.Blocks[topic]
445 if !ok {
446 partitions = make(map[int32]*FetchResponseBlock)
447 r.Blocks[topic] = partitions
448 }
449 frb, ok := partitions[partition]
450 if !ok {
451 frb = new(FetchResponseBlock)
452 partitions[partition] = frb
453 }
454 frb.Err = err
455}
456
457func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
458 if r.Blocks == nil {
459 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
460 }
461 partitions, ok := r.Blocks[topic]
462 if !ok {
463 partitions = make(map[int32]*FetchResponseBlock)
464 r.Blocks[topic] = partitions
465 }
466 frb, ok := partitions[partition]
467 if !ok {
468 frb = new(FetchResponseBlock)
469 partitions[partition] = frb
470 }
471
472 return frb
473}
474
475func encodeKV(key, value Encoder) ([]byte, []byte) {
476 var kb []byte
477 var vb []byte
478 if key != nil {
479 kb, _ = key.Encode()
480 }
481 if value != nil {
482 vb, _ = value.Encode()
483 }
484
485 return kb, vb
486}
487
488func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
489 frb := r.getOrCreateBlock(topic, partition)
490 kb, vb := encodeKV(key, value)
491 if r.LogAppendTime {
492 timestamp = r.Timestamp
493 }
494 msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
495 msgBlock := &MessageBlock{Msg: msg, Offset: offset}
496 if len(frb.RecordsSet) == 0 {
497 records := newLegacyRecords(&MessageSet{})
498 frb.RecordsSet = []*Records{&records}
499 }
500 set := frb.RecordsSet[0].MsgSet
501 set.Messages = append(set.Messages, msgBlock)
502}
503
504func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
505 frb := r.getOrCreateBlock(topic, partition)
506 kb, vb := encodeKV(key, value)
507 if len(frb.RecordsSet) == 0 {
508 records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
509 frb.RecordsSet = []*Records{&records}
510 }
511 batch := frb.RecordsSet[0].RecordBatch
512 rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
513 batch.addRecord(rec)
514}
515
516// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
517// But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
518// Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
519func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
520 frb := r.getOrCreateBlock(topic, partition)
521 kb, vb := encodeKV(key, value)
522
523 records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
524 batch := &RecordBatch{
525 Version: 2,
526 LogAppendTime: r.LogAppendTime,
527 FirstTimestamp: timestamp,
528 MaxTimestamp: r.Timestamp,
529 FirstOffset: offset,
530 LastOffsetDelta: 0,
531 ProducerID: producerID,
532 IsTransactional: isTransactional,
533 }
534 rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
535 batch.addRecord(rec)
536 records.RecordBatch = batch
537
538 frb.RecordsSet = append(frb.RecordsSet, &records)
539}
540
541func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
542 frb := r.getOrCreateBlock(topic, partition)
543
544 // batch
545 batch := &RecordBatch{
546 Version: 2,
547 LogAppendTime: r.LogAppendTime,
548 FirstTimestamp: timestamp,
549 MaxTimestamp: r.Timestamp,
550 FirstOffset: offset,
551 LastOffsetDelta: 0,
552 ProducerID: producerID,
553 IsTransactional: true,
554 Control: true,
555 }
556
557 // records
558 records := newDefaultRecords(nil)
559 records.RecordBatch = batch
560
561 // record
562 crAbort := ControlRecord{
563 Version: 0,
564 Type: recordType,
565 }
566 crKey := &realEncoder{raw: make([]byte, 4)}
567 crValue := &realEncoder{raw: make([]byte, 6)}
568 crAbort.encode(crKey, crValue)
569 rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
570 batch.addRecord(rec)
571
572 frb.RecordsSet = append(frb.RecordsSet, &records)
573}
574
575func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
576 r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
577}
578
579func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
580 r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
581}
582
583func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
584 r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
585}
586
587func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
588 // define controlRecord key and value
589 r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
590}
591
592func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
593 frb := r.getOrCreateBlock(topic, partition)
594 if len(frb.RecordsSet) == 0 {
595 records := newDefaultRecords(&RecordBatch{Version: 2})
596 frb.RecordsSet = []*Records{&records}
597 }
598 batch := frb.RecordsSet[0].RecordBatch
599 batch.LastOffsetDelta = offset
600}
601
602func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
603 frb := r.getOrCreateBlock(topic, partition)
604 frb.LastStableOffset = offset
605}