blob: e34eed5445a7a830f420cf6c6945e9553e9a4bf1 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "encoding/binary"
5 "errors"
6 "fmt"
7 "math"
8 "sync"
9 "time"
10
11 "github.com/eapache/go-resiliency/breaker"
12 "github.com/eapache/queue"
13 "github.com/rcrowley/go-metrics"
14)
15
16// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied.
17var ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow")
18
19const (
20 // minFunctionalRetryBufferLength defines the minimum number of messages the retry buffer must support.
21 // If Producer.Retry.MaxBufferLength is set to a non-zero value below this limit, it will be adjusted to this value.
22 // This ensures the retry buffer remains functional under typical workloads.
23 minFunctionalRetryBufferLength = 4 * 1024
24 // minFunctionalRetryBufferBytes defines the minimum total byte size the retry buffer must support.
25 // If Producer.Retry.MaxBufferBytes is set to a non-zero value below this limit, it will be adjusted to this value.
26 // A 32 MB lower limit ensures sufficient capacity for retrying larger messages without exhausting resources.
27 minFunctionalRetryBufferBytes = 32 * 1024 * 1024
28)
29
30// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
31// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
32// and parses responses for errors. You must read from the Errors() channel or the
33// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
34// leaks and message lost: it will not be garbage-collected automatically when it passes
35// out of scope and buffered messages may not be flushed.
36type AsyncProducer interface {
37 // AsyncClose triggers a shutdown of the producer. The shutdown has completed
38 // when both the Errors and Successes channels have been closed. When calling
39 // AsyncClose, you *must* continue to read from those channels in order to
40 // drain the results of any messages in flight.
41 AsyncClose()
42
43 // Close shuts down the producer and waits for any buffered messages to be
44 // flushed. You must call this function before a producer object passes out of
45 // scope, as it may otherwise leak memory. You must call this before process
46 // shutting down, or you may lose messages. You must call this before calling
47 // Close on the underlying client.
48 Close() error
49
50 // Input is the input channel for the user to write messages to that they
51 // wish to send.
52 Input() chan<- *ProducerMessage
53
54 // Successes is the success output channel back to the user when Return.Successes is
55 // enabled. If Return.Successes is true, you MUST read from this channel or the
56 // Producer will deadlock. It is suggested that you send and read messages
57 // together in a single select statement.
58 Successes() <-chan *ProducerMessage
59
60 // Errors is the error output channel back to the user. You MUST read from this
61 // channel or the Producer will deadlock when the channel is full. Alternatively,
62 // you can set Producer.Return.Errors in your config to false, which prevents
63 // errors to be returned.
64 Errors() <-chan *ProducerError
65
66 // IsTransactional return true when current producer is transactional.
67 IsTransactional() bool
68
69 // TxnStatus return current producer transaction status.
70 TxnStatus() ProducerTxnStatusFlag
71
72 // BeginTxn mark current transaction as ready.
73 BeginTxn() error
74
75 // CommitTxn commit current transaction.
76 CommitTxn() error
77
78 // AbortTxn abort current transaction.
79 AbortTxn() error
80
81 // AddOffsetsToTxn add associated offsets to current transaction.
82 AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error
83
84 // AddMessageToTxn add message offsets to current transaction.
85 AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
86}
87
88type asyncProducer struct {
89 client Client
90 conf *Config
91
92 errors chan *ProducerError
93 input, successes, retries chan *ProducerMessage
94 inFlight sync.WaitGroup
95
96 brokers map[*Broker]*brokerProducer
97 brokerRefs map[*brokerProducer]int
98 brokerLock sync.Mutex
99
100 txnmgr *transactionManager
101 txLock sync.Mutex
102
103 metricsRegistry metrics.Registry
104}
105
106// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
107func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
108 client, err := NewClient(addrs, conf)
109 if err != nil {
110 return nil, err
111 }
112 return newAsyncProducer(client)
113}
114
115// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
116// necessary to call Close() on the underlying client when shutting down this producer.
117func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
118 // For clients passed in by the client, ensure we don't
119 // call Close() on it.
120 cli := &nopCloserClient{client}
121 return newAsyncProducer(cli)
122}
123
124func newAsyncProducer(client Client) (AsyncProducer, error) {
125 // Check that we are not dealing with a closed Client before processing any other arguments
126 if client.Closed() {
127 return nil, ErrClosedClient
128 }
129
130 txnmgr, err := newTransactionManager(client.Config(), client)
131 if err != nil {
132 return nil, err
133 }
134
135 p := &asyncProducer{
136 client: client,
137 conf: client.Config(),
138 errors: make(chan *ProducerError),
139 input: make(chan *ProducerMessage),
140 successes: make(chan *ProducerMessage),
141 retries: make(chan *ProducerMessage),
142 brokers: make(map[*Broker]*brokerProducer),
143 brokerRefs: make(map[*brokerProducer]int),
144 txnmgr: txnmgr,
145 metricsRegistry: newCleanupRegistry(client.Config().MetricRegistry),
146 }
147
148 // launch our singleton dispatchers
149 go withRecover(p.dispatcher)
150 go withRecover(p.retryHandler)
151
152 return p, nil
153}
154
155type flagSet int8
156
157const (
158 syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
159 fin // final message from partitionProducer to brokerProducer and back
160 shutdown // start the shutdown process
161 endtxn // endtxn
162 committxn // endtxn
163 aborttxn // endtxn
164)
165
166// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
167type ProducerMessage struct {
168 Topic string // The Kafka topic for this message.
169 // The partitioning key for this message. Pre-existing Encoders include
170 // StringEncoder and ByteEncoder.
171 Key Encoder
172 // The actual message to store in Kafka. Pre-existing Encoders include
173 // StringEncoder and ByteEncoder.
174 Value Encoder
175
176 // The headers are key-value pairs that are transparently passed
177 // by Kafka between producers and consumers.
178 Headers []RecordHeader
179
180 // This field is used to hold arbitrary data you wish to include so it
181 // will be available when receiving on the Successes and Errors channels.
182 // Sarama completely ignores this field and is only to be used for
183 // pass-through data.
184 Metadata interface{}
185
186 // Below this point are filled in by the producer as the message is processed
187
188 // Offset is the offset of the message stored on the broker. This is only
189 // guaranteed to be defined if the message was successfully delivered and
190 // RequiredAcks is not NoResponse.
191 Offset int64
192 // Partition is the partition that the message was sent to. This is only
193 // guaranteed to be defined if the message was successfully delivered.
194 Partition int32
195 // Timestamp can vary in behavior depending on broker configuration, being
196 // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
197 // and requiring version at least 0.10.0.
198 //
199 // When configured to CreateTime, the timestamp is specified by the producer
200 // either by explicitly setting this field, or when the message is added
201 // to a produce set.
202 //
203 // When configured to LogAppendTime, the timestamp assigned to the message
204 // by the broker. This is only guaranteed to be defined if the message was
205 // successfully delivered and RequiredAcks is not NoResponse.
206 Timestamp time.Time
207
208 retries int
209 flags flagSet
210 expectation chan *ProducerError
211 sequenceNumber int32
212 producerEpoch int16
213 hasSequence bool
214}
215
216const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
217
218func (m *ProducerMessage) ByteSize(version int) int {
219 var size int
220 if version >= 2 {
221 size = maximumRecordOverhead
222 for _, h := range m.Headers {
223 size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
224 }
225 } else {
226 size = producerMessageOverhead
227 }
228 if m.Key != nil {
229 size += m.Key.Length()
230 }
231 if m.Value != nil {
232 size += m.Value.Length()
233 }
234 return size
235}
236
237func (m *ProducerMessage) clear() {
238 m.flags = 0
239 m.retries = 0
240 m.sequenceNumber = 0
241 m.producerEpoch = 0
242 m.hasSequence = false
243}
244
245// ProducerError is the type of error generated when the producer fails to deliver a message.
246// It contains the original ProducerMessage as well as the actual error value.
247type ProducerError struct {
248 Msg *ProducerMessage
249 Err error
250}
251
252func (pe ProducerError) Error() string {
253 return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
254}
255
256func (pe ProducerError) Unwrap() error {
257 return pe.Err
258}
259
260// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
261// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
262// when closing a producer.
263type ProducerErrors []*ProducerError
264
265func (pe ProducerErrors) Error() string {
266 return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
267}
268
269func (p *asyncProducer) IsTransactional() bool {
270 return p.txnmgr.isTransactional()
271}
272
273func (p *asyncProducer) AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error {
274 offsets := make(map[string][]*PartitionOffsetMetadata)
275 offsets[msg.Topic] = []*PartitionOffsetMetadata{
276 {
277 Partition: msg.Partition,
278 Offset: msg.Offset + 1,
279 Metadata: metadata,
280 },
281 }
282 return p.AddOffsetsToTxn(offsets, groupId)
283}
284
285func (p *asyncProducer) AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error {
286 p.txLock.Lock()
287 defer p.txLock.Unlock()
288
289 if !p.IsTransactional() {
290 DebugLogger.Printf("producer/txnmgr [%s] attempt to call AddOffsetsToTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
291 return ErrNonTransactedProducer
292 }
293
294 DebugLogger.Printf("producer/txnmgr [%s] add offsets to transaction\n", p.txnmgr.transactionalID)
295 return p.txnmgr.addOffsetsToTxn(offsets, groupId)
296}
297
298func (p *asyncProducer) TxnStatus() ProducerTxnStatusFlag {
299 return p.txnmgr.currentTxnStatus()
300}
301
302func (p *asyncProducer) BeginTxn() error {
303 p.txLock.Lock()
304 defer p.txLock.Unlock()
305
306 if !p.IsTransactional() {
307 DebugLogger.Println("producer/txnmgr attempt to call BeginTxn on a non-transactional producer")
308 return ErrNonTransactedProducer
309 }
310
311 return p.txnmgr.transitionTo(ProducerTxnFlagInTransaction, nil)
312}
313
314func (p *asyncProducer) CommitTxn() error {
315 p.txLock.Lock()
316 defer p.txLock.Unlock()
317
318 if !p.IsTransactional() {
319 DebugLogger.Printf("producer/txnmgr [%s] attempt to call CommitTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
320 return ErrNonTransactedProducer
321 }
322
323 DebugLogger.Printf("producer/txnmgr [%s] committing transaction\n", p.txnmgr.transactionalID)
324 err := p.finishTransaction(true)
325 if err != nil {
326 return err
327 }
328 DebugLogger.Printf("producer/txnmgr [%s] transaction committed\n", p.txnmgr.transactionalID)
329 return nil
330}
331
332func (p *asyncProducer) AbortTxn() error {
333 p.txLock.Lock()
334 defer p.txLock.Unlock()
335
336 if !p.IsTransactional() {
337 DebugLogger.Printf("producer/txnmgr [%s] attempt to call AbortTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
338 return ErrNonTransactedProducer
339 }
340 DebugLogger.Printf("producer/txnmgr [%s] aborting transaction\n", p.txnmgr.transactionalID)
341 err := p.finishTransaction(false)
342 if err != nil {
343 return err
344 }
345 DebugLogger.Printf("producer/txnmgr [%s] transaction aborted\n", p.txnmgr.transactionalID)
346 return nil
347}
348
349func (p *asyncProducer) finishTransaction(commit bool) error {
350 p.inFlight.Add(1)
351 if commit {
352 p.input <- &ProducerMessage{flags: endtxn | committxn}
353 } else {
354 p.input <- &ProducerMessage{flags: endtxn | aborttxn}
355 }
356 p.inFlight.Wait()
357 return p.txnmgr.finishTransaction(commit)
358}
359
360func (p *asyncProducer) Errors() <-chan *ProducerError {
361 return p.errors
362}
363
364func (p *asyncProducer) Successes() <-chan *ProducerMessage {
365 return p.successes
366}
367
368func (p *asyncProducer) Input() chan<- *ProducerMessage {
369 return p.input
370}
371
372func (p *asyncProducer) Close() error {
373 p.AsyncClose()
374
375 if p.conf.Producer.Return.Successes {
376 go withRecover(func() {
377 for range p.successes {
378 }
379 })
380 }
381
382 var pErrs ProducerErrors
383 if p.conf.Producer.Return.Errors {
384 for event := range p.errors {
385 pErrs = append(pErrs, event)
386 }
387 } else {
388 <-p.errors
389 }
390
391 if len(pErrs) > 0 {
392 return pErrs
393 }
394 return nil
395}
396
397func (p *asyncProducer) AsyncClose() {
398 go withRecover(p.shutdown)
399}
400
401// singleton
402// dispatches messages by topic
403func (p *asyncProducer) dispatcher() {
404 handlers := make(map[string]chan<- *ProducerMessage)
405 shuttingDown := false
406
407 for msg := range p.input {
408 if msg == nil {
409 Logger.Println("Something tried to send a nil message, it was ignored.")
410 continue
411 }
412
413 if msg.flags&endtxn != 0 {
414 var err error
415 if msg.flags&committxn != 0 {
416 err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagCommittingTransaction, nil)
417 } else {
418 err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagAbortingTransaction, nil)
419 }
420 if err != nil {
421 Logger.Printf("producer/txnmgr unable to end transaction %s", err)
422 }
423 p.inFlight.Done()
424 continue
425 }
426
427 if msg.flags&shutdown != 0 {
428 shuttingDown = true
429 p.inFlight.Done()
430 continue
431 }
432
433 if msg.retries == 0 {
434 if shuttingDown {
435 // we can't just call returnError here because that decrements the wait group,
436 // which hasn't been incremented yet for this message, and shouldn't be
437 pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
438 if p.conf.Producer.Return.Errors {
439 p.errors <- pErr
440 } else {
441 Logger.Println(pErr)
442 }
443 continue
444 }
445 p.inFlight.Add(1)
446 // Ignore retried msg, there are already in txn.
447 // Can't produce new record when transaction is not started.
448 if p.IsTransactional() && p.txnmgr.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 {
449 Logger.Printf("attempt to send message when transaction is not started or is in ending state, got %d, expect %d\n", p.txnmgr.currentTxnStatus(), ProducerTxnFlagInTransaction)
450 p.returnError(msg, ErrTransactionNotReady)
451 continue
452 }
453 }
454
455 for _, interceptor := range p.conf.Producer.Interceptors {
456 msg.safelyApplyInterceptor(interceptor)
457 }
458
459 version := 1
460 if p.conf.Version.IsAtLeast(V0_11_0_0) {
461 version = 2
462 } else if msg.Headers != nil {
463 p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
464 continue
465 }
466
467 size := msg.ByteSize(version)
468 if size > p.conf.Producer.MaxMessageBytes {
469 p.returnError(msg, ConfigurationError(fmt.Sprintf("Attempt to produce message larger than configured Producer.MaxMessageBytes: %d > %d", size, p.conf.Producer.MaxMessageBytes)))
470 continue
471 }
472
473 handler := handlers[msg.Topic]
474 if handler == nil {
475 handler = p.newTopicProducer(msg.Topic)
476 handlers[msg.Topic] = handler
477 }
478
479 handler <- msg
480 }
481
482 for _, handler := range handlers {
483 close(handler)
484 }
485}
486
487// one per topic
488// partitions messages, then dispatches them by partition
489type topicProducer struct {
490 parent *asyncProducer
491 topic string
492 input <-chan *ProducerMessage
493
494 breaker *breaker.Breaker
495 handlers map[int32]chan<- *ProducerMessage
496 partitioner Partitioner
497}
498
499func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
500 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
501 tp := &topicProducer{
502 parent: p,
503 topic: topic,
504 input: input,
505 breaker: breaker.New(3, 1, 10*time.Second),
506 handlers: make(map[int32]chan<- *ProducerMessage),
507 partitioner: p.conf.Producer.Partitioner(topic),
508 }
509 go withRecover(tp.dispatch)
510 return input
511}
512
513func (tp *topicProducer) dispatch() {
514 for msg := range tp.input {
515 if msg.retries == 0 {
516 if err := tp.partitionMessage(msg); err != nil {
517 tp.parent.returnError(msg, err)
518 continue
519 }
520 }
521
522 handler := tp.handlers[msg.Partition]
523 if handler == nil {
524 handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
525 tp.handlers[msg.Partition] = handler
526 }
527
528 handler <- msg
529 }
530
531 for _, handler := range tp.handlers {
532 close(handler)
533 }
534}
535
536func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
537 var partitions []int32
538
539 err := tp.breaker.Run(func() (err error) {
540 requiresConsistency := false
541 if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
542 requiresConsistency = ep.MessageRequiresConsistency(msg)
543 } else {
544 requiresConsistency = tp.partitioner.RequiresConsistency()
545 }
546
547 if requiresConsistency {
548 partitions, err = tp.parent.client.Partitions(msg.Topic)
549 } else {
550 partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
551 }
552 return
553 })
554 if err != nil {
555 return err
556 }
557
558 numPartitions := int32(len(partitions))
559
560 if numPartitions == 0 {
561 return ErrLeaderNotAvailable
562 }
563
564 choice, err := tp.partitioner.Partition(msg, numPartitions)
565
566 if err != nil {
567 return err
568 } else if choice < 0 || choice >= numPartitions {
569 return ErrInvalidPartition
570 }
571
572 msg.Partition = partitions[choice]
573
574 return nil
575}
576
577// one per partition per topic
578// dispatches messages to the appropriate broker
579// also responsible for maintaining message order during retries
580type partitionProducer struct {
581 parent *asyncProducer
582 topic string
583 partition int32
584 input <-chan *ProducerMessage
585
586 leader *Broker
587 breaker *breaker.Breaker
588 brokerProducer *brokerProducer
589
590 // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
591 // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
592 // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
593 // therefore whether our buffer is complete and safe to flush)
594 highWatermark int
595 retryState []partitionRetryState
596}
597
598type partitionRetryState struct {
599 buf []*ProducerMessage
600 expectChaser bool
601}
602
603func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
604 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
605 pp := &partitionProducer{
606 parent: p,
607 topic: topic,
608 partition: partition,
609 input: input,
610
611 breaker: breaker.New(3, 1, 10*time.Second),
612 retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
613 }
614 go withRecover(pp.dispatch)
615 return input
616}
617
618func (pp *partitionProducer) backoff(retries int) {
619 var backoff time.Duration
620 if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
621 maxRetries := pp.parent.conf.Producer.Retry.Max
622 backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
623 } else {
624 backoff = pp.parent.conf.Producer.Retry.Backoff
625 }
626 if backoff > 0 {
627 time.Sleep(backoff)
628 }
629}
630
631func (pp *partitionProducer) updateLeaderIfBrokerProducerIsNil(msg *ProducerMessage) error {
632 if pp.brokerProducer == nil {
633 if err := pp.updateLeader(); err != nil {
634 pp.parent.returnError(msg, err)
635 pp.backoff(msg.retries)
636 return err
637 }
638 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
639 }
640 return nil
641}
642
643func (pp *partitionProducer) dispatch() {
644 // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
645 // on the first message
646 pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
647 if pp.leader != nil {
648 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
649 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
650 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
651 }
652
653 defer func() {
654 if pp.brokerProducer != nil {
655 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
656 }
657 }()
658
659 for msg := range pp.input {
660 if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
661 select {
662 case <-pp.brokerProducer.abandoned:
663 // a message on the abandoned channel means that our current broker selection is out of date
664 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
665 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
666 pp.brokerProducer = nil
667 time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
668 default:
669 // producer connection is still open.
670 }
671 }
672
673 if msg.retries > pp.highWatermark {
674 if err := pp.updateLeaderIfBrokerProducerIsNil(msg); err != nil {
675 continue
676 }
677 // a new, higher, retry level; handle it and then back off
678 pp.newHighWatermark(msg.retries)
679 pp.backoff(msg.retries)
680 } else if pp.highWatermark > 0 {
681 // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
682 if msg.retries < pp.highWatermark {
683 // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
684 if msg.flags&fin == fin {
685 pp.retryState[msg.retries].expectChaser = false
686 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
687 } else {
688 pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
689 }
690 continue
691 } else if msg.flags&fin == fin {
692 // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
693 // meaning this retry level is done and we can go down (at least) one level and flush that
694 pp.retryState[pp.highWatermark].expectChaser = false
695 pp.flushRetryBuffers()
696 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
697 continue
698 }
699 }
700
701 // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
702 // without breaking any of our ordering guarantees
703 if err := pp.updateLeaderIfBrokerProducerIsNil(msg); err != nil {
704 continue
705 }
706
707 // Now that we know we have a broker to actually try and send this message to, generate the sequence
708 // number for it.
709 // All messages being retried (sent or not) have already had their retry count updated
710 // Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
711 if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
712 msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
713 msg.hasSequence = true
714 }
715
716 if pp.parent.IsTransactional() {
717 pp.parent.txnmgr.maybeAddPartitionToCurrentTxn(pp.topic, pp.partition)
718 }
719
720 pp.brokerProducer.input <- msg
721 }
722}
723
724func (pp *partitionProducer) newHighWatermark(hwm int) {
725 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
726 pp.highWatermark = hwm
727
728 // send off a fin so that we know when everything "in between" has made it
729 // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
730 pp.retryState[pp.highWatermark].expectChaser = true
731 pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
732 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
733
734 // a new HWM means that our current broker selection is out of date
735 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
736 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
737 pp.brokerProducer = nil
738}
739
740func (pp *partitionProducer) flushRetryBuffers() {
741 Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
742 for {
743 pp.highWatermark--
744
745 if pp.brokerProducer == nil {
746 if err := pp.updateLeader(); err != nil {
747 pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
748 goto flushDone
749 }
750 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
751 }
752
753 for _, msg := range pp.retryState[pp.highWatermark].buf {
754 pp.brokerProducer.input <- msg
755 }
756
757 flushDone:
758 pp.retryState[pp.highWatermark].buf = nil
759 if pp.retryState[pp.highWatermark].expectChaser {
760 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
761 break
762 } else if pp.highWatermark == 0 {
763 Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
764 break
765 }
766 }
767}
768
769func (pp *partitionProducer) updateLeader() error {
770 return pp.breaker.Run(func() (err error) {
771 if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
772 return err
773 }
774
775 if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
776 return err
777 }
778
779 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
780 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
781 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
782
783 return nil
784 })
785}
786
787// one per broker; also constructs an associated flusher
788func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
789 var (
790 input = make(chan *ProducerMessage)
791 bridge = make(chan *produceSet)
792 pending = make(chan *brokerProducerResponse)
793 responses = make(chan *brokerProducerResponse)
794 )
795
796 bp := &brokerProducer{
797 parent: p,
798 broker: broker,
799 input: input,
800 output: bridge,
801 responses: responses,
802 buffer: newProduceSet(p),
803 currentRetries: make(map[string]map[int32]error),
804 }
805 go withRecover(bp.run)
806
807 // minimal bridge to make the network response `select`able
808 go withRecover(func() {
809 // Use a wait group to know if we still have in flight requests
810 var wg sync.WaitGroup
811
812 for set := range bridge {
813 request := set.buildRequest()
814
815 // Count the in flight requests to know when we can close the pending channel safely
816 wg.Add(1)
817 // Capture the current set to forward in the callback
818 sendResponse := func(set *produceSet) ProduceCallback {
819 return func(response *ProduceResponse, err error) {
820 // Forward the response to make sure we do not block the responseReceiver
821 pending <- &brokerProducerResponse{
822 set: set,
823 err: err,
824 res: response,
825 }
826 wg.Done()
827 }
828 }(set)
829
830 if p.IsTransactional() {
831 // Add partition to tx before sending current batch
832 err := p.txnmgr.publishTxnPartitions()
833 if err != nil {
834 // Request failed to be sent
835 sendResponse(nil, err)
836 continue
837 }
838 }
839
840 // Use AsyncProduce vs Produce to not block waiting for the response
841 // so that we can pipeline multiple produce requests and achieve higher throughput, see:
842 // https://kafka.apache.org/protocol#protocol_network
843 err := broker.AsyncProduce(request, sendResponse)
844 if err != nil {
845 // Request failed to be sent
846 sendResponse(nil, err)
847 continue
848 }
849 // Callback is not called when using NoResponse
850 if p.conf.Producer.RequiredAcks == NoResponse {
851 // Provide the expected nil response
852 sendResponse(nil, nil)
853 }
854 }
855 // Wait for all in flight requests to close the pending channel safely
856 wg.Wait()
857 close(pending)
858 })
859
860 // In order to avoid a deadlock when closing the broker on network or malformed response error
861 // we use an intermediate channel to buffer and send pending responses in order
862 // This is because the AsyncProduce callback inside the bridge is invoked from the broker
863 // responseReceiver goroutine and closing the broker requires such goroutine to be finished
864 go withRecover(func() {
865 buf := queue.New()
866 for {
867 if buf.Length() == 0 {
868 res, ok := <-pending
869 if !ok {
870 // We are done forwarding the last pending response
871 close(responses)
872 return
873 }
874 buf.Add(res)
875 }
876 // Send the head pending response or buffer another one
877 // so that we never block the callback
878 headRes := buf.Peek().(*brokerProducerResponse)
879 select {
880 case res, ok := <-pending:
881 if !ok {
882 continue
883 }
884 buf.Add(res)
885 continue
886 case responses <- headRes:
887 buf.Remove()
888 continue
889 }
890 }
891 })
892
893 if p.conf.Producer.Retry.Max <= 0 {
894 bp.abandoned = make(chan struct{})
895 }
896
897 return bp
898}
899
900type brokerProducerResponse struct {
901 set *produceSet
902 err error
903 res *ProduceResponse
904}
905
906// groups messages together into appropriately-sized batches for sending to the broker
907// handles state related to retries etc
908type brokerProducer struct {
909 parent *asyncProducer
910 broker *Broker
911
912 input chan *ProducerMessage
913 output chan<- *produceSet
914 responses <-chan *brokerProducerResponse
915 abandoned chan struct{}
916
917 buffer *produceSet
918 timer *time.Timer
919 timerFired bool
920
921 closing error
922 currentRetries map[string]map[int32]error
923}
924
925func (bp *brokerProducer) run() {
926 var output chan<- *produceSet
927 var timerChan <-chan time.Time
928 Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
929
930 for {
931 select {
932 case msg, ok := <-bp.input:
933 if !ok {
934 Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
935 bp.shutdown()
936 return
937 }
938
939 if msg == nil {
940 continue
941 }
942
943 if msg.flags&syn == syn {
944 Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
945 bp.broker.ID(), msg.Topic, msg.Partition)
946 if bp.currentRetries[msg.Topic] == nil {
947 bp.currentRetries[msg.Topic] = make(map[int32]error)
948 }
949 bp.currentRetries[msg.Topic][msg.Partition] = nil
950 bp.parent.inFlight.Done()
951 continue
952 }
953
954 if reason := bp.needsRetry(msg); reason != nil {
955 bp.parent.retryMessage(msg, reason)
956
957 if bp.closing == nil && msg.flags&fin == fin {
958 // we were retrying this partition but we can start processing again
959 delete(bp.currentRetries[msg.Topic], msg.Partition)
960 Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
961 bp.broker.ID(), msg.Topic, msg.Partition)
962 }
963
964 continue
965 }
966
967 if msg.flags&fin == fin {
968 // New broker producer that was caught up by the retry loop
969 bp.parent.retryMessage(msg, ErrShuttingDown)
970 DebugLogger.Printf("producer/broker/%d state change to [dying-%d] on %s/%d\n",
971 bp.broker.ID(), msg.retries, msg.Topic, msg.Partition)
972 continue
973 }
974
975 if bp.buffer.wouldOverflow(msg) {
976 Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
977 if err := bp.waitForSpace(msg, false); err != nil {
978 bp.parent.retryMessage(msg, err)
979 continue
980 }
981 }
982
983 if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
984 // The epoch was reset, need to roll the buffer over
985 Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
986 if err := bp.waitForSpace(msg, true); err != nil {
987 bp.parent.retryMessage(msg, err)
988 continue
989 }
990 }
991 if err := bp.buffer.add(msg); err != nil {
992 bp.parent.returnError(msg, err)
993 continue
994 }
995
996 if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
997 bp.timer = time.NewTimer(bp.parent.conf.Producer.Flush.Frequency)
998 timerChan = bp.timer.C
999 }
1000 case <-timerChan:
1001 bp.timerFired = true
1002 case output <- bp.buffer:
1003 bp.rollOver()
1004 timerChan = nil
1005 case response, ok := <-bp.responses:
1006 if ok {
1007 bp.handleResponse(response)
1008 }
1009 }
1010
1011 if bp.timerFired || bp.buffer.readyToFlush() {
1012 output = bp.output
1013 } else {
1014 output = nil
1015 }
1016 }
1017}
1018
1019func (bp *brokerProducer) shutdown() {
1020 for !bp.buffer.empty() {
1021 select {
1022 case response := <-bp.responses:
1023 bp.handleResponse(response)
1024 case bp.output <- bp.buffer:
1025 bp.rollOver()
1026 }
1027 }
1028 close(bp.output)
1029 // Drain responses from the bridge goroutine
1030 for response := range bp.responses {
1031 bp.handleResponse(response)
1032 }
1033 // No more brokerProducer related goroutine should be running
1034 Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
1035}
1036
1037func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
1038 if bp.closing != nil {
1039 return bp.closing
1040 }
1041
1042 return bp.currentRetries[msg.Topic][msg.Partition]
1043}
1044
1045func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
1046 for {
1047 select {
1048 case response := <-bp.responses:
1049 bp.handleResponse(response)
1050 // handling a response can change our state, so re-check some things
1051 if reason := bp.needsRetry(msg); reason != nil {
1052 return reason
1053 } else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
1054 return nil
1055 }
1056 case bp.output <- bp.buffer:
1057 bp.rollOver()
1058 return nil
1059 }
1060 }
1061}
1062
1063func (bp *brokerProducer) rollOver() {
1064 if bp.timer != nil {
1065 bp.timer.Stop()
1066 }
1067 bp.timer = nil
1068 bp.timerFired = false
1069 bp.buffer = newProduceSet(bp.parent)
1070}
1071
1072func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
1073 if response.err != nil {
1074 bp.handleError(response.set, response.err)
1075 } else {
1076 bp.handleSuccess(response.set, response.res)
1077 }
1078
1079 if bp.buffer.empty() {
1080 bp.rollOver() // this can happen if the response invalidated our buffer
1081 }
1082}
1083
1084func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
1085 // we iterate through the blocks in the request set, not the response, so that we notice
1086 // if the response is missing a block completely
1087 var retryTopics []string
1088 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1089 if response == nil {
1090 // this only happens when RequiredAcks is NoResponse, so we have to assume success
1091 bp.parent.returnSuccesses(pSet.msgs)
1092 return
1093 }
1094
1095 block := response.GetBlock(topic, partition)
1096 if block == nil {
1097 bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
1098 return
1099 }
1100
1101 switch block.Err {
1102 // Success
1103 case ErrNoError:
1104 if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
1105 for _, msg := range pSet.msgs {
1106 msg.Timestamp = block.Timestamp
1107 }
1108 }
1109 for i, msg := range pSet.msgs {
1110 msg.Offset = block.Offset + int64(i)
1111 }
1112 bp.parent.returnSuccesses(pSet.msgs)
1113 // Duplicate
1114 case ErrDuplicateSequenceNumber:
1115 bp.parent.returnSuccesses(pSet.msgs)
1116 // Retriable errors
1117 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
1118 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
1119 if bp.parent.conf.Producer.Retry.Max <= 0 {
1120 bp.parent.abandonBrokerConnection(bp.broker)
1121 bp.parent.returnErrors(pSet.msgs, block.Err)
1122 } else {
1123 retryTopics = append(retryTopics, topic)
1124 }
1125 // Other non-retriable errors
1126 default:
1127 if bp.parent.conf.Producer.Retry.Max <= 0 {
1128 bp.parent.abandonBrokerConnection(bp.broker)
1129 }
1130 bp.parent.returnErrors(pSet.msgs, block.Err)
1131 }
1132 })
1133
1134 if len(retryTopics) > 0 {
1135 if bp.parent.conf.Producer.Idempotent {
1136 err := bp.parent.client.RefreshMetadata(retryTopics...)
1137 if err != nil {
1138 Logger.Printf("Failed refreshing metadata because of %v\n", err)
1139 }
1140 }
1141
1142 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1143 block := response.GetBlock(topic, partition)
1144 if block == nil {
1145 // handled in the previous "eachPartition" loop
1146 return
1147 }
1148
1149 switch block.Err {
1150 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
1151 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
1152 Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
1153 bp.broker.ID(), topic, partition, block.Err)
1154 if bp.currentRetries[topic] == nil {
1155 bp.currentRetries[topic] = make(map[int32]error)
1156 }
1157 bp.currentRetries[topic][partition] = block.Err
1158 if bp.parent.conf.Producer.Idempotent {
1159 go bp.parent.retryBatch(topic, partition, pSet, block.Err)
1160 } else {
1161 bp.parent.retryMessages(pSet.msgs, block.Err)
1162 }
1163 // dropping the following messages has the side effect of incrementing their retry count
1164 bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
1165 }
1166 })
1167 }
1168}
1169
1170func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
1171 Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
1172 produceSet := newProduceSet(p)
1173 produceSet.msgs[topic] = make(map[int32]*partitionSet)
1174 produceSet.msgs[topic][partition] = pSet
1175 produceSet.bufferBytes += pSet.bufferBytes
1176 produceSet.bufferCount += len(pSet.msgs)
1177 for _, msg := range pSet.msgs {
1178 if msg.retries >= p.conf.Producer.Retry.Max {
1179 p.returnErrors(pSet.msgs, kerr)
1180 return
1181 }
1182 msg.retries++
1183 }
1184
1185 // it's expected that a metadata refresh has been requested prior to calling retryBatch
1186 leader, err := p.client.Leader(topic, partition)
1187 if err != nil {
1188 Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
1189 for _, msg := range pSet.msgs {
1190 p.returnError(msg, kerr)
1191 }
1192 return
1193 }
1194 bp := p.getBrokerProducer(leader)
1195 bp.output <- produceSet
1196 p.unrefBrokerProducer(leader, bp)
1197}
1198
1199func (bp *brokerProducer) handleError(sent *produceSet, err error) {
1200 var target PacketEncodingError
1201 if errors.As(err, &target) {
1202 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1203 bp.parent.returnErrors(pSet.msgs, err)
1204 })
1205 } else {
1206 Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
1207 bp.parent.abandonBrokerConnection(bp.broker)
1208 _ = bp.broker.Close()
1209 bp.closing = err
1210 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1211 bp.parent.retryMessages(pSet.msgs, err)
1212 })
1213 bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1214 bp.parent.retryMessages(pSet.msgs, err)
1215 })
1216 bp.rollOver()
1217 }
1218}
1219
1220// singleton
1221// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
1222// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
1223func (p *asyncProducer) retryHandler() {
1224 maxBufferLength := p.conf.Producer.Retry.MaxBufferLength
1225 if 0 < maxBufferLength && maxBufferLength < minFunctionalRetryBufferLength {
1226 maxBufferLength = minFunctionalRetryBufferLength
1227 }
1228
1229 maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes
1230 if 0 < maxBufferBytes && maxBufferBytes < minFunctionalRetryBufferBytes {
1231 maxBufferBytes = minFunctionalRetryBufferBytes
1232 }
1233
1234 version := 1
1235 if p.conf.Version.IsAtLeast(V0_11_0_0) {
1236 version = 2
1237 }
1238
1239 var currentByteSize int64
1240 var msg *ProducerMessage
1241 buf := queue.New()
1242
1243 for {
1244 if buf.Length() == 0 {
1245 msg = <-p.retries
1246 } else {
1247 select {
1248 case msg = <-p.retries:
1249 case p.input <- buf.Peek().(*ProducerMessage):
1250 msgToRemove := buf.Remove().(*ProducerMessage)
1251 currentByteSize -= int64(msgToRemove.ByteSize(version))
1252 continue
1253 }
1254 }
1255
1256 if msg == nil {
1257 return
1258 }
1259
1260 buf.Add(msg)
1261 currentByteSize += int64(msg.ByteSize(version))
1262
1263 if (maxBufferLength <= 0 || buf.Length() < maxBufferLength) && (maxBufferBytes <= 0 || currentByteSize < maxBufferBytes) {
1264 continue
1265 }
1266
1267 msgToHandle := buf.Peek().(*ProducerMessage)
1268 if msgToHandle.flags == 0 {
1269 select {
1270 case p.input <- msgToHandle:
1271 buf.Remove()
1272 currentByteSize -= int64(msgToHandle.ByteSize(version))
1273 default:
1274 buf.Remove()
1275 currentByteSize -= int64(msgToHandle.ByteSize(version))
1276 p.returnError(msgToHandle, ErrProducerRetryBufferOverflow)
1277 }
1278 }
1279 }
1280}
1281
1282// utility functions
1283
1284func (p *asyncProducer) shutdown() {
1285 Logger.Println("Producer shutting down.")
1286 p.inFlight.Add(1)
1287 p.input <- &ProducerMessage{flags: shutdown}
1288
1289 p.inFlight.Wait()
1290
1291 err := p.client.Close()
1292 if err != nil {
1293 Logger.Println("producer/shutdown failed to close the embedded client:", err)
1294 }
1295
1296 close(p.input)
1297 close(p.retries)
1298 close(p.errors)
1299 close(p.successes)
1300
1301 p.metricsRegistry.UnregisterAll()
1302}
1303
1304func (p *asyncProducer) bumpIdempotentProducerEpoch() {
1305 _, epoch := p.txnmgr.getProducerID()
1306 if epoch == math.MaxInt16 {
1307 Logger.Println("producer/txnmanager epoch exhausted, requesting new producer ID")
1308 txnmgr, err := newTransactionManager(p.conf, p.client)
1309 if err != nil {
1310 Logger.Println(err)
1311 return
1312 }
1313
1314 p.txnmgr = txnmgr
1315 } else {
1316 p.txnmgr.bumpEpoch()
1317 }
1318}
1319
1320func (p *asyncProducer) maybeTransitionToErrorState(err error) error {
1321 if errors.Is(err, ErrClusterAuthorizationFailed) ||
1322 errors.Is(err, ErrProducerFenced) ||
1323 errors.Is(err, ErrUnsupportedVersion) ||
1324 errors.Is(err, ErrTransactionalIDAuthorizationFailed) {
1325 return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
1326 }
1327 if p.txnmgr.coordinatorSupportsBumpingEpoch && p.txnmgr.currentTxnStatus()&ProducerTxnFlagEndTransaction == 0 {
1328 p.txnmgr.epochBumpRequired = true
1329 }
1330 return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
1331}
1332
1333func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
1334 if p.IsTransactional() {
1335 _ = p.maybeTransitionToErrorState(err)
1336 }
1337 // We need to reset the producer ID epoch if we set a sequence number on it, because the broker
1338 // will never see a message with this number, so we can never continue the sequence.
1339 if !p.IsTransactional() && msg.hasSequence {
1340 Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
1341 p.bumpIdempotentProducerEpoch()
1342 }
1343
1344 msg.clear()
1345 pErr := &ProducerError{Msg: msg, Err: err}
1346 if p.conf.Producer.Return.Errors {
1347 p.errors <- pErr
1348 } else {
1349 Logger.Println(pErr)
1350 }
1351 p.inFlight.Done()
1352}
1353
1354func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
1355 for _, msg := range batch {
1356 p.returnError(msg, err)
1357 }
1358}
1359
1360func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
1361 for _, msg := range batch {
1362 if p.conf.Producer.Return.Successes {
1363 msg.clear()
1364 p.successes <- msg
1365 }
1366 p.inFlight.Done()
1367 }
1368}
1369
1370func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
1371 if msg.retries >= p.conf.Producer.Retry.Max {
1372 p.returnError(msg, err)
1373 } else {
1374 msg.retries++
1375 p.retries <- msg
1376 }
1377}
1378
1379func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
1380 for _, msg := range batch {
1381 p.retryMessage(msg, err)
1382 }
1383}
1384
1385func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
1386 p.brokerLock.Lock()
1387 defer p.brokerLock.Unlock()
1388
1389 bp := p.brokers[broker]
1390
1391 if bp == nil {
1392 bp = p.newBrokerProducer(broker)
1393 p.brokers[broker] = bp
1394 p.brokerRefs[bp] = 0
1395 }
1396
1397 p.brokerRefs[bp]++
1398
1399 return bp
1400}
1401
1402func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
1403 p.brokerLock.Lock()
1404 defer p.brokerLock.Unlock()
1405
1406 p.brokerRefs[bp]--
1407 if p.brokerRefs[bp] == 0 {
1408 close(bp.input)
1409 delete(p.brokerRefs, bp)
1410
1411 if p.brokers[broker] == bp {
1412 delete(p.brokers, broker)
1413 }
1414 }
1415}
1416
1417func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
1418 p.brokerLock.Lock()
1419 defer p.brokerLock.Unlock()
1420
1421 bc, ok := p.brokers[broker]
1422 if ok && bc.abandoned != nil {
1423 close(bc.abandoned)
1424 }
1425
1426 delete(p.brokers, broker)
1427}