blob: 4c96af4fe1dd170b29bf1aa3d57044b6c2e86f46 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "errors"
5 "fmt"
6 "math"
7 "sync"
8 "sync/atomic"
9 "time"
10
11 "github.com/rcrowley/go-metrics"
12)
13
14// ConsumerMessage encapsulates a Kafka message returned by the consumer.
15type ConsumerMessage struct {
16 Headers []*RecordHeader // only set if kafka is version 0.11+
17 Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
18 BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
19
20 Key, Value []byte
21 Topic string
22 Partition int32
23 Offset int64
24}
25
26// ConsumerError is what is provided to the user when an error occurs.
27// It wraps an error and includes the topic and partition.
28type ConsumerError struct {
29 Topic string
30 Partition int32
31 Err error
32}
33
34func (ce ConsumerError) Error() string {
35 return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
36}
37
38func (ce ConsumerError) Unwrap() error {
39 return ce.Err
40}
41
42// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
43// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
44// when stopping.
45type ConsumerErrors []*ConsumerError
46
47func (ce ConsumerErrors) Error() string {
48 return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
49}
50
51// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
52// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
53// scope.
54type Consumer interface {
55 // Topics returns the set of available topics as retrieved from the cluster
56 // metadata. This method is the same as Client.Topics(), and is provided for
57 // convenience.
58 Topics() ([]string, error)
59
60 // Partitions returns the sorted list of all partition IDs for the given topic.
61 // This method is the same as Client.Partitions(), and is provided for convenience.
62 Partitions(topic string) ([]int32, error)
63
64 // ConsumePartition creates a PartitionConsumer on the given topic/partition with
65 // the given offset. It will return an error if this Consumer is already consuming
66 // on the given topic/partition. Offset can be a literal offset, or OffsetNewest
67 // or OffsetOldest
68 ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
69
70 // HighWaterMarks returns the current high water marks for each topic and partition.
71 // Consistency between partitions is not guaranteed since high water marks are updated separately.
72 HighWaterMarks() map[string]map[int32]int64
73
74 // Close shuts down the consumer. It must be called after all child
75 // PartitionConsumers have already been closed.
76 Close() error
77
78 // Pause suspends fetching from the requested partitions. Future calls to the broker will not return any
79 // records from these partitions until they have been resumed using Resume()/ResumeAll().
80 // Note that this method does not affect partition subscription.
81 // In particular, it does not cause a group rebalance when automatic assignment is used.
82 Pause(topicPartitions map[string][]int32)
83
84 // Resume resumes specified partitions which have been paused with Pause()/PauseAll().
85 // New calls to the broker will return records from these partitions if there are any to be fetched.
86 Resume(topicPartitions map[string][]int32)
87
88 // PauseAll suspends fetching from all partitions. Future calls to the broker will not return any
89 // records from these partitions until they have been resumed using Resume()/ResumeAll().
90 // Note that this method does not affect partition subscription.
91 // In particular, it does not cause a group rebalance when automatic assignment is used.
92 PauseAll()
93
94 // ResumeAll resumes all partitions which have been paused with Pause()/PauseAll().
95 // New calls to the broker will return records from these partitions if there are any to be fetched.
96 ResumeAll()
97}
98
99// max time to wait for more partition subscriptions
100const partitionConsumersBatchTimeout = 100 * time.Millisecond
101
102type consumer struct {
103 conf *Config
104 children map[string]map[int32]*partitionConsumer
105 brokerConsumers map[*Broker]*brokerConsumer
106 client Client
107 metricRegistry metrics.Registry
108 lock sync.Mutex
109}
110
111// NewConsumer creates a new consumer using the given broker addresses and configuration.
112func NewConsumer(addrs []string, config *Config) (Consumer, error) {
113 client, err := NewClient(addrs, config)
114 if err != nil {
115 return nil, err
116 }
117 return newConsumer(client)
118}
119
120// NewConsumerFromClient creates a new consumer using the given client. It is still
121// necessary to call Close() on the underlying client when shutting down this consumer.
122func NewConsumerFromClient(client Client) (Consumer, error) {
123 // For clients passed in by the client, ensure we don't
124 // call Close() on it.
125 cli := &nopCloserClient{client}
126 return newConsumer(cli)
127}
128
129func newConsumer(client Client) (Consumer, error) {
130 // Check that we are not dealing with a closed Client before processing any other arguments
131 if client.Closed() {
132 return nil, ErrClosedClient
133 }
134
135 c := &consumer{
136 client: client,
137 conf: client.Config(),
138 children: make(map[string]map[int32]*partitionConsumer),
139 brokerConsumers: make(map[*Broker]*brokerConsumer),
140 metricRegistry: newCleanupRegistry(client.Config().MetricRegistry),
141 }
142
143 return c, nil
144}
145
146func (c *consumer) Close() error {
147 c.metricRegistry.UnregisterAll()
148 return c.client.Close()
149}
150
151func (c *consumer) Topics() ([]string, error) {
152 return c.client.Topics()
153}
154
155func (c *consumer) Partitions(topic string) ([]int32, error) {
156 return c.client.Partitions(topic)
157}
158
159func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
160 child := &partitionConsumer{
161 consumer: c,
162 conf: c.conf,
163 topic: topic,
164 partition: partition,
165 messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
166 errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
167 feeder: make(chan *FetchResponse, 1),
168 leaderEpoch: invalidLeaderEpoch,
169 preferredReadReplica: invalidPreferredReplicaID,
170 trigger: make(chan none, 1),
171 dying: make(chan none),
172 fetchSize: c.conf.Consumer.Fetch.Default,
173 }
174
175 if err := child.chooseStartingOffset(offset); err != nil {
176 return nil, err
177 }
178
179 leader, epoch, err := c.client.LeaderAndEpoch(child.topic, child.partition)
180 if err != nil {
181 return nil, err
182 }
183
184 if err := c.addChild(child); err != nil {
185 return nil, err
186 }
187
188 go withRecover(child.dispatcher)
189 go withRecover(child.responseFeeder)
190
191 child.leaderEpoch = epoch
192 child.broker = c.refBrokerConsumer(leader)
193 child.broker.input <- child
194
195 return child, nil
196}
197
198func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
199 c.lock.Lock()
200 defer c.lock.Unlock()
201
202 hwms := make(map[string]map[int32]int64)
203 for topic, p := range c.children {
204 hwm := make(map[int32]int64, len(p))
205 for partition, pc := range p {
206 hwm[partition] = pc.HighWaterMarkOffset()
207 }
208 hwms[topic] = hwm
209 }
210
211 return hwms
212}
213
214func (c *consumer) addChild(child *partitionConsumer) error {
215 c.lock.Lock()
216 defer c.lock.Unlock()
217
218 topicChildren := c.children[child.topic]
219 if topicChildren == nil {
220 topicChildren = make(map[int32]*partitionConsumer)
221 c.children[child.topic] = topicChildren
222 }
223
224 if topicChildren[child.partition] != nil {
225 return ConfigurationError("That topic/partition is already being consumed")
226 }
227
228 topicChildren[child.partition] = child
229 return nil
230}
231
232func (c *consumer) removeChild(child *partitionConsumer) {
233 c.lock.Lock()
234 defer c.lock.Unlock()
235
236 delete(c.children[child.topic], child.partition)
237}
238
239func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
240 c.lock.Lock()
241 defer c.lock.Unlock()
242
243 bc := c.brokerConsumers[broker]
244 if bc == nil {
245 bc = c.newBrokerConsumer(broker)
246 c.brokerConsumers[broker] = bc
247 }
248
249 bc.refs++
250
251 return bc
252}
253
254func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
255 c.lock.Lock()
256 defer c.lock.Unlock()
257
258 brokerWorker.refs--
259
260 if brokerWorker.refs == 0 {
261 close(brokerWorker.input)
262 if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
263 delete(c.brokerConsumers, brokerWorker.broker)
264 }
265 }
266}
267
268func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
269 c.lock.Lock()
270 defer c.lock.Unlock()
271
272 delete(c.brokerConsumers, brokerWorker.broker)
273}
274
275// Pause implements Consumer.
276func (c *consumer) Pause(topicPartitions map[string][]int32) {
277 c.lock.Lock()
278 defer c.lock.Unlock()
279
280 for topic, partitions := range topicPartitions {
281 for _, partition := range partitions {
282 if topicConsumers, ok := c.children[topic]; ok {
283 if partitionConsumer, ok := topicConsumers[partition]; ok {
284 partitionConsumer.Pause()
285 }
286 }
287 }
288 }
289}
290
291// Resume implements Consumer.
292func (c *consumer) Resume(topicPartitions map[string][]int32) {
293 c.lock.Lock()
294 defer c.lock.Unlock()
295
296 for topic, partitions := range topicPartitions {
297 for _, partition := range partitions {
298 if topicConsumers, ok := c.children[topic]; ok {
299 if partitionConsumer, ok := topicConsumers[partition]; ok {
300 partitionConsumer.Resume()
301 }
302 }
303 }
304 }
305}
306
307// PauseAll implements Consumer.
308func (c *consumer) PauseAll() {
309 c.lock.Lock()
310 defer c.lock.Unlock()
311
312 for _, partitions := range c.children {
313 for _, partitionConsumer := range partitions {
314 partitionConsumer.Pause()
315 }
316 }
317}
318
319// ResumeAll implements Consumer.
320func (c *consumer) ResumeAll() {
321 c.lock.Lock()
322 defer c.lock.Unlock()
323
324 for _, partitions := range c.children {
325 for _, partitionConsumer := range partitions {
326 partitionConsumer.Resume()
327 }
328 }
329}
330
331// PartitionConsumer
332
333// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
334// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out
335// of scope.
336//
337// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
338// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
339// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
340// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
341// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
342// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
343// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
344//
345// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
346// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process
347// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
348// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
349// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
350type PartitionConsumer interface {
351 // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
352 // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
353 // function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
354 // this before calling Close on the underlying client.
355 AsyncClose()
356
357 // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
358 // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
359 // the Messages channel when this function is called, you will be competing with Close for messages; consider
360 // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
361 // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
362 Close() error
363
364 // Messages returns the read channel for the messages that are returned by
365 // the broker.
366 Messages() <-chan *ConsumerMessage
367
368 // Errors returns a read channel of errors that occurred during consuming, if
369 // enabled. By default, errors are logged and not returned over this channel.
370 // If you want to implement any custom error handling, set your config's
371 // Consumer.Return.Errors setting to true, and read from this channel.
372 Errors() <-chan *ConsumerError
373
374 // HighWaterMarkOffset returns the high water mark offset of the partition,
375 // i.e. the offset that will be used for the next message that will be produced.
376 // You can use this to determine how far behind the processing is.
377 HighWaterMarkOffset() int64
378
379 // Pause suspends fetching from this partition. Future calls to the broker will not return
380 // any records from these partition until it have been resumed using Resume().
381 // Note that this method does not affect partition subscription.
382 // In particular, it does not cause a group rebalance when automatic assignment is used.
383 Pause()
384
385 // Resume resumes this partition which have been paused with Pause().
386 // New calls to the broker will return records from these partitions if there are any to be fetched.
387 // If the partition was not previously paused, this method is a no-op.
388 Resume()
389
390 // IsPaused indicates if this partition consumer is paused or not
391 IsPaused() bool
392}
393
394type partitionConsumer struct {
395 highWaterMarkOffset atomic.Int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
396
397 consumer *consumer
398 conf *Config
399 broker *brokerConsumer
400 messages chan *ConsumerMessage
401 errors chan *ConsumerError
402 feeder chan *FetchResponse
403
404 leaderEpoch int32
405 preferredReadReplica int32
406
407 trigger, dying chan none
408 closeOnce sync.Once
409 topic string
410 partition int32
411 responseResult error
412 fetchSize int32
413 offset int64
414 retries atomic.Int32
415
416 paused atomic.Bool // accessed atomically, 0 = not paused, 1 = paused
417}
418
419var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
420
421func (child *partitionConsumer) sendError(err error) {
422 cErr := &ConsumerError{
423 Topic: child.topic,
424 Partition: child.partition,
425 Err: err,
426 }
427
428 if child.conf.Consumer.Return.Errors {
429 child.errors <- cErr
430 } else {
431 Logger.Println(cErr)
432 }
433}
434
435func (child *partitionConsumer) computeBackoff() time.Duration {
436 if child.conf.Consumer.Retry.BackoffFunc != nil {
437 retries := child.retries.Add(1)
438 return child.conf.Consumer.Retry.BackoffFunc(int(retries))
439 }
440 return child.conf.Consumer.Retry.Backoff
441}
442
443func (child *partitionConsumer) dispatcher() {
444 for range child.trigger {
445 select {
446 case <-child.dying:
447 close(child.trigger)
448 case <-time.After(child.computeBackoff()):
449 if child.broker != nil {
450 child.consumer.unrefBrokerConsumer(child.broker)
451 child.broker = nil
452 }
453
454 if err := child.dispatch(); err != nil {
455 child.sendError(err)
456 child.trigger <- none{}
457 }
458 }
459 }
460
461 if child.broker != nil {
462 child.consumer.unrefBrokerConsumer(child.broker)
463 }
464 child.consumer.removeChild(child)
465 close(child.feeder)
466}
467
468func (child *partitionConsumer) preferredBroker() (*Broker, int32, error) {
469 if child.preferredReadReplica >= 0 {
470 broker, err := child.consumer.client.Broker(child.preferredReadReplica)
471 if err == nil {
472 return broker, child.leaderEpoch, nil
473 }
474 Logger.Printf(
475 "consumer/%s/%d failed to find active broker for preferred read replica %d - will fallback to leader",
476 child.topic, child.partition, child.preferredReadReplica)
477
478 // if we couldn't find it, discard the replica preference and trigger a
479 // metadata refresh whilst falling back to consuming from the leader again
480 child.preferredReadReplica = invalidPreferredReplicaID
481 _ = child.consumer.client.RefreshMetadata(child.topic)
482 }
483
484 // if preferred replica cannot be found fallback to leader
485 return child.consumer.client.LeaderAndEpoch(child.topic, child.partition)
486}
487
488func (child *partitionConsumer) dispatch() error {
489 if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
490 return err
491 }
492
493 broker, epoch, err := child.preferredBroker()
494 if err != nil {
495 return err
496 }
497
498 child.leaderEpoch = epoch
499 child.broker = child.consumer.refBrokerConsumer(broker)
500 child.broker.input <- child
501
502 return nil
503}
504
505func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
506 newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
507 if err != nil {
508 return err
509 }
510
511 child.highWaterMarkOffset.Store(newestOffset)
512
513 oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
514 if err != nil {
515 return err
516 }
517
518 switch {
519 case offset == OffsetNewest:
520 child.offset = newestOffset
521 case offset == OffsetOldest:
522 child.offset = oldestOffset
523 case offset >= oldestOffset && offset <= newestOffset:
524 child.offset = offset
525 default:
526 return ErrOffsetOutOfRange
527 }
528
529 return nil
530}
531
532func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
533 return child.messages
534}
535
536func (child *partitionConsumer) Errors() <-chan *ConsumerError {
537 return child.errors
538}
539
540func (child *partitionConsumer) AsyncClose() {
541 // this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
542 // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
543 // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
544 // also just close itself)
545 child.closeOnce.Do(func() {
546 close(child.dying)
547 })
548}
549
550func (child *partitionConsumer) Close() error {
551 child.AsyncClose()
552
553 var consumerErrors ConsumerErrors
554 for err := range child.errors {
555 consumerErrors = append(consumerErrors, err)
556 }
557
558 if len(consumerErrors) > 0 {
559 return consumerErrors
560 }
561 return nil
562}
563
564func (child *partitionConsumer) HighWaterMarkOffset() int64 {
565 return child.highWaterMarkOffset.Load()
566}
567
568func (child *partitionConsumer) responseFeeder() {
569 var msgs []*ConsumerMessage
570 expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
571 firstAttempt := true
572
573feederLoop:
574 for response := range child.feeder {
575 msgs, child.responseResult = child.parseResponse(response)
576
577 if child.responseResult == nil {
578 child.retries.Store(0)
579 }
580
581 for i, msg := range msgs {
582 child.interceptors(msg)
583 messageSelect:
584 select {
585 case <-child.dying:
586 child.broker.acks.Done()
587 continue feederLoop
588 case child.messages <- msg:
589 firstAttempt = true
590 case <-expiryTicker.C:
591 if !firstAttempt {
592 child.responseResult = errTimedOut
593 child.broker.acks.Done()
594 remainingLoop:
595 for _, msg = range msgs[i:] {
596 child.interceptors(msg)
597 select {
598 case child.messages <- msg:
599 case <-child.dying:
600 break remainingLoop
601 }
602 }
603 child.broker.input <- child
604 continue feederLoop
605 } else {
606 // current message has not been sent, return to select
607 // statement
608 firstAttempt = false
609 goto messageSelect
610 }
611 }
612 }
613
614 child.broker.acks.Done()
615 }
616
617 expiryTicker.Stop()
618 close(child.messages)
619 close(child.errors)
620}
621
622func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
623 var messages []*ConsumerMessage
624 for _, msgBlock := range msgSet.Messages {
625 for _, msg := range msgBlock.Messages() {
626 offset := msg.Offset
627 timestamp := msg.Msg.Timestamp
628 if msg.Msg.Version >= 1 {
629 baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
630 offset += baseOffset
631 if msg.Msg.LogAppendTime {
632 timestamp = msgBlock.Msg.Timestamp
633 }
634 }
635 if offset < child.offset {
636 continue
637 }
638 messages = append(messages, &ConsumerMessage{
639 Topic: child.topic,
640 Partition: child.partition,
641 Key: msg.Msg.Key,
642 Value: msg.Msg.Value,
643 Offset: offset,
644 Timestamp: timestamp,
645 BlockTimestamp: msgBlock.Msg.Timestamp,
646 })
647 child.offset = offset + 1
648 }
649 }
650 if len(messages) == 0 {
651 child.offset++
652 }
653 return messages, nil
654}
655
656func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
657 messages := make([]*ConsumerMessage, 0, len(batch.Records))
658
659 for _, rec := range batch.Records {
660 offset := batch.FirstOffset + rec.OffsetDelta
661 if offset < child.offset {
662 continue
663 }
664 timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
665 if batch.LogAppendTime {
666 timestamp = batch.MaxTimestamp
667 }
668 messages = append(messages, &ConsumerMessage{
669 Topic: child.topic,
670 Partition: child.partition,
671 Key: rec.Key,
672 Value: rec.Value,
673 Offset: offset,
674 Timestamp: timestamp,
675 Headers: rec.Headers,
676 })
677 child.offset = offset + 1
678 }
679 if len(messages) == 0 {
680 child.offset++
681 }
682 return messages, nil
683}
684
685func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
686 var consumerBatchSizeMetric metrics.Histogram
687 if child.consumer != nil && child.consumer.metricRegistry != nil {
688 consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", child.consumer.metricRegistry)
689 }
690
691 // If request was throttled and empty we log and return without error
692 if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
693 Logger.Printf(
694 "consumer/broker/%d FetchResponse throttled %v\n",
695 child.broker.broker.ID(), response.ThrottleTime)
696 return nil, nil
697 }
698
699 block := response.GetBlock(child.topic, child.partition)
700 if block == nil {
701 return nil, ErrIncompleteResponse
702 }
703
704 if !errors.Is(block.Err, ErrNoError) {
705 return nil, block.Err
706 }
707
708 nRecs, err := block.numRecords()
709 if err != nil {
710 return nil, err
711 }
712
713 if consumerBatchSizeMetric != nil {
714 consumerBatchSizeMetric.Update(int64(nRecs))
715 }
716
717 if block.PreferredReadReplica != invalidPreferredReplicaID {
718 child.preferredReadReplica = block.PreferredReadReplica
719 }
720
721 if nRecs == 0 {
722 partialTrailingMessage, err := block.isPartial()
723 if err != nil {
724 return nil, err
725 }
726 // We got no messages. If we got a trailing one then we need to ask for more data.
727 // Otherwise we just poll again and wait for one to be produced...
728 if partialTrailingMessage {
729 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
730 // we can't ask for more data, we've hit the configured limit
731 child.sendError(ErrMessageTooLarge)
732 child.offset++ // skip this one so we can keep processing future messages
733 } else {
734 child.fetchSize *= 2
735 // check int32 overflow
736 if child.fetchSize < 0 {
737 child.fetchSize = math.MaxInt32
738 }
739 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
740 child.fetchSize = child.conf.Consumer.Fetch.Max
741 }
742 }
743 } else if block.recordsNextOffset != nil && *block.recordsNextOffset <= block.HighWaterMarkOffset {
744 // check last record next offset to avoid stuck if high watermark was not reached
745 Logger.Printf("consumer/broker/%d received batch with zero records but high watermark was not reached, topic %s, partition %d, next offset %d\n", child.broker.broker.ID(), child.topic, child.partition, *block.recordsNextOffset)
746 child.offset = *block.recordsNextOffset
747 }
748
749 return nil, nil
750 }
751
752 // we got messages, reset our fetch size in case it was increased for a previous request
753 child.fetchSize = child.conf.Consumer.Fetch.Default
754 child.highWaterMarkOffset.Store(block.HighWaterMarkOffset)
755
756 // abortedProducerIDs contains producerID which message should be ignored as uncommitted
757 // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
758 // - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
759 abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
760 abortedTransactions := block.getAbortedTransactions()
761
762 var messages []*ConsumerMessage
763 for _, records := range block.RecordsSet {
764 switch records.recordsType {
765 case legacyRecords:
766 messageSetMessages, err := child.parseMessages(records.MsgSet)
767 if err != nil {
768 return nil, err
769 }
770
771 messages = append(messages, messageSetMessages...)
772 case defaultRecords:
773 // Consume remaining abortedTransaction up to last offset of current batch
774 for _, txn := range abortedTransactions {
775 if txn.FirstOffset > records.RecordBatch.LastOffset() {
776 break
777 }
778 abortedProducerIDs[txn.ProducerID] = struct{}{}
779 // Pop abortedTransactions so that we never add it again
780 abortedTransactions = abortedTransactions[1:]
781 }
782
783 recordBatchMessages, err := child.parseRecords(records.RecordBatch)
784 if err != nil {
785 return nil, err
786 }
787
788 // Parse and commit offset but do not expose messages that are:
789 // - control records
790 // - part of an aborted transaction when set to `ReadCommitted`
791
792 // control record
793 isControl, err := records.isControl()
794 if err != nil {
795 // I don't know why there is this continue in case of error to begin with
796 // Safe bet is to ignore control messages if ReadUncommitted
797 // and block on them in case of error and ReadCommitted
798 if child.conf.Consumer.IsolationLevel == ReadCommitted {
799 return nil, err
800 }
801 continue
802 }
803 if isControl {
804 controlRecord, err := records.getControlRecord()
805 if err != nil {
806 return nil, err
807 }
808
809 if controlRecord.Type == ControlRecordAbort {
810 delete(abortedProducerIDs, records.RecordBatch.ProducerID)
811 }
812 continue
813 }
814
815 // filter aborted transactions
816 if child.conf.Consumer.IsolationLevel == ReadCommitted {
817 _, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
818 if records.RecordBatch.IsTransactional && isAborted {
819 continue
820 }
821 }
822
823 messages = append(messages, recordBatchMessages...)
824 default:
825 return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
826 }
827 }
828
829 return messages, nil
830}
831
832func (child *partitionConsumer) interceptors(msg *ConsumerMessage) {
833 for _, interceptor := range child.conf.Consumer.Interceptors {
834 msg.safelyApplyInterceptor(interceptor)
835 }
836}
837
838// Pause implements PartitionConsumer.
839func (child *partitionConsumer) Pause() {
840 child.paused.Store(true)
841}
842
843// Resume implements PartitionConsumer.
844func (child *partitionConsumer) Resume() {
845 child.paused.Store(false)
846}
847
848// IsPaused implements PartitionConsumer.
849func (child *partitionConsumer) IsPaused() bool {
850 return child.paused.Load()
851}
852
853type brokerConsumer struct {
854 consumer *consumer
855 broker *Broker
856 input chan *partitionConsumer
857 newSubscriptions chan []*partitionConsumer
858 subscriptions map[*partitionConsumer]none
859 acks sync.WaitGroup
860 refs int
861}
862
863func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
864 bc := &brokerConsumer{
865 consumer: c,
866 broker: broker,
867 input: make(chan *partitionConsumer),
868 newSubscriptions: make(chan []*partitionConsumer),
869 subscriptions: make(map[*partitionConsumer]none),
870 refs: 0,
871 }
872
873 go withRecover(bc.subscriptionManager)
874 go withRecover(bc.subscriptionConsumer)
875
876 return bc
877}
878
879// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
880// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
881// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
882// it nil if no new subscriptions are available.
883func (bc *brokerConsumer) subscriptionManager() {
884 defer close(bc.newSubscriptions)
885
886 for {
887 var partitionConsumers []*partitionConsumer
888
889 // Check for any partition consumer asking to subscribe if there aren't
890 // any, trigger the network request (to fetch Kafka messages) by sending "nil" to the
891 // newSubscriptions channel
892 select {
893 case pc, ok := <-bc.input:
894 if !ok {
895 return
896 }
897 partitionConsumers = append(partitionConsumers, pc)
898 case bc.newSubscriptions <- nil:
899 continue
900 }
901
902 // drain input of any further incoming subscriptions
903 timer := time.NewTimer(partitionConsumersBatchTimeout)
904 for batchComplete := false; !batchComplete; {
905 select {
906 case pc := <-bc.input:
907 partitionConsumers = append(partitionConsumers, pc)
908 case <-timer.C:
909 batchComplete = true
910 }
911 }
912 timer.Stop()
913
914 Logger.Printf(
915 "consumer/broker/%d accumulated %d new subscriptions\n",
916 bc.broker.ID(), len(partitionConsumers))
917
918 bc.newSubscriptions <- partitionConsumers
919 }
920}
921
922// subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
923// this is the main loop that fetches Kafka messages
924func (bc *brokerConsumer) subscriptionConsumer() {
925 for newSubscriptions := range bc.newSubscriptions {
926 bc.updateSubscriptions(newSubscriptions)
927
928 if len(bc.subscriptions) == 0 {
929 // We're about to be shut down or we're about to receive more subscriptions.
930 // Take a small nap to avoid burning the CPU.
931 time.Sleep(partitionConsumersBatchTimeout)
932 continue
933 }
934
935 response, err := bc.fetchNewMessages()
936 if err != nil {
937 Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
938 bc.abort(err)
939 return
940 }
941
942 // if there isn't response, it means that not fetch was made
943 // so we don't need to handle any response
944 if response == nil {
945 time.Sleep(partitionConsumersBatchTimeout)
946 continue
947 }
948
949 bc.acks.Add(len(bc.subscriptions))
950 for child := range bc.subscriptions {
951 if _, ok := response.Blocks[child.topic]; !ok {
952 bc.acks.Done()
953 continue
954 }
955
956 if _, ok := response.Blocks[child.topic][child.partition]; !ok {
957 bc.acks.Done()
958 continue
959 }
960
961 child.feeder <- response
962 }
963 bc.acks.Wait()
964 bc.handleResponses()
965 }
966}
967
968func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
969 for _, child := range newSubscriptions {
970 bc.subscriptions[child] = none{}
971 Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
972 }
973
974 for child := range bc.subscriptions {
975 select {
976 case <-child.dying:
977 Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
978 close(child.trigger)
979 delete(bc.subscriptions, child)
980 default:
981 // no-op
982 }
983 }
984}
985
986// handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
987func (bc *brokerConsumer) handleResponses() {
988 for child := range bc.subscriptions {
989 result := child.responseResult
990 child.responseResult = nil
991
992 if result == nil {
993 if preferredBroker, _, err := child.preferredBroker(); err == nil {
994 if bc.broker.ID() != preferredBroker.ID() {
995 // not an error but needs redispatching to consume from preferred replica
996 Logger.Printf(
997 "consumer/broker/%d abandoned in favor of preferred replica broker/%d\n",
998 bc.broker.ID(), preferredBroker.ID())
999 child.trigger <- none{}
1000 delete(bc.subscriptions, child)
1001 }
1002 }
1003 continue
1004 }
1005
1006 // Discard any replica preference.
1007 child.preferredReadReplica = invalidPreferredReplicaID
1008
1009 if errors.Is(result, errTimedOut) {
1010 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
1011 bc.broker.ID(), child.topic, child.partition)
1012 delete(bc.subscriptions, child)
1013 } else if errors.Is(result, ErrOffsetOutOfRange) {
1014 // there's no point in retrying this it will just fail the same way again
1015 // shut it down and force the user to choose what to do
1016 child.sendError(result)
1017 Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
1018 close(child.trigger)
1019 delete(bc.subscriptions, child)
1020 } else if errors.Is(result, ErrUnknownTopicOrPartition) ||
1021 errors.Is(result, ErrNotLeaderForPartition) ||
1022 errors.Is(result, ErrLeaderNotAvailable) ||
1023 errors.Is(result, ErrReplicaNotAvailable) ||
1024 errors.Is(result, ErrFencedLeaderEpoch) ||
1025 errors.Is(result, ErrUnknownLeaderEpoch) {
1026 // not an error, but does need redispatching
1027 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
1028 bc.broker.ID(), child.topic, child.partition, result)
1029 child.trigger <- none{}
1030 delete(bc.subscriptions, child)
1031 } else {
1032 // dunno, tell the user and try redispatching
1033 child.sendError(result)
1034 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
1035 bc.broker.ID(), child.topic, child.partition, result)
1036 child.trigger <- none{}
1037 delete(bc.subscriptions, child)
1038 }
1039 }
1040}
1041
1042func (bc *brokerConsumer) abort(err error) {
1043 bc.consumer.abandonBrokerConsumer(bc)
1044 _ = bc.broker.Close() // we don't care about the error this might return, we already have one
1045
1046 for child := range bc.subscriptions {
1047 child.sendError(err)
1048 child.trigger <- none{}
1049 }
1050
1051 for newSubscriptions := range bc.newSubscriptions {
1052 if len(newSubscriptions) == 0 {
1053 // Take a small nap to avoid burning the CPU.
1054 time.Sleep(partitionConsumersBatchTimeout)
1055 continue
1056 }
1057 for _, child := range newSubscriptions {
1058 child.sendError(err)
1059 child.trigger <- none{}
1060 }
1061 }
1062}
1063
1064// fetchNewMessages can be nil if no fetch is made, it can occur when
1065// all partitions are paused
1066func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
1067 request := &FetchRequest{
1068 MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
1069 MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
1070 }
1071 // Version 1 is the same as version 0.
1072 if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
1073 request.Version = 1
1074 }
1075 // Starting in Version 2, the requestor must be able to handle Kafka Log
1076 // Message format version 1.
1077 if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
1078 request.Version = 2
1079 }
1080 // Version 3 adds MaxBytes. Starting in version 3, the partition ordering in
1081 // the request is now relevant. Partitions will be processed in the order
1082 // they appear in the request.
1083 if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
1084 request.Version = 3
1085 request.MaxBytes = MaxResponseSize
1086 }
1087 // Version 4 adds IsolationLevel. Starting in version 4, the reqestor must be
1088 // able to handle Kafka log message format version 2.
1089 // Version 5 adds LogStartOffset to indicate the earliest available offset of
1090 // partition data that can be consumed.
1091 if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
1092 request.Version = 5
1093 request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
1094 }
1095 // Version 6 is the same as version 5.
1096 if bc.consumer.conf.Version.IsAtLeast(V1_0_0_0) {
1097 request.Version = 6
1098 }
1099 // Version 7 adds incremental fetch request support.
1100 if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
1101 request.Version = 7
1102 // We do not currently implement KIP-227 FetchSessions. Setting the id to 0
1103 // and the epoch to -1 tells the broker not to generate as session ID we're going
1104 // to just ignore anyway.
1105 request.SessionID = 0
1106 request.SessionEpoch = -1
1107 }
1108 // Version 8 is the same as version 7.
1109 if bc.consumer.conf.Version.IsAtLeast(V2_0_0_0) {
1110 request.Version = 8
1111 }
1112 // Version 9 adds CurrentLeaderEpoch, as described in KIP-320.
1113 // Version 10 indicates that we can use the ZStd compression algorithm, as
1114 // described in KIP-110.
1115 if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
1116 request.Version = 10
1117 }
1118 // Version 11 adds RackID for KIP-392 fetch from closest replica
1119 if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) {
1120 request.Version = 11
1121 request.RackID = bc.consumer.conf.RackID
1122 }
1123
1124 for child := range bc.subscriptions {
1125 if !child.IsPaused() {
1126 request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize, child.leaderEpoch)
1127 }
1128 }
1129
1130 // avoid to fetch when there is no block
1131 if len(request.blocks) == 0 {
1132 return nil, nil
1133 }
1134
1135 return bc.broker.Fetch(request)
1136}