blob: bf20b75e90570ec13a460003577966b6249e4830 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "errors"
5 "fmt"
6 "strings"
7 "sync"
8 "time"
9)
10
11// ProducerTxnStatusFlag mark current transaction status.
12type ProducerTxnStatusFlag int16
13
14const (
15 // ProducerTxnFlagUninitialized when txnmgr is created
16 ProducerTxnFlagUninitialized ProducerTxnStatusFlag = 1 << iota
17 // ProducerTxnFlagInitializing when txnmgr is initializing
18 ProducerTxnFlagInitializing
19 // ProducerTxnFlagReady when is ready to receive transaction
20 ProducerTxnFlagReady
21 // ProducerTxnFlagInTransaction when transaction is started
22 ProducerTxnFlagInTransaction
23 // ProducerTxnFlagEndTransaction when transaction will be committed
24 ProducerTxnFlagEndTransaction
25 // ProducerTxnFlagInError when having abortable or fatal error
26 ProducerTxnFlagInError
27 // ProducerTxnFlagCommittingTransaction when committing txn
28 ProducerTxnFlagCommittingTransaction
29 // ProducerTxnFlagAbortingTransaction when committing txn
30 ProducerTxnFlagAbortingTransaction
31 // ProducerTxnFlagAbortableError when producer encounter an abortable error
32 // Must call AbortTxn in this case.
33 ProducerTxnFlagAbortableError
34 // ProducerTxnFlagFatalError when producer encounter an fatal error
35 // Must Close an recreate it.
36 ProducerTxnFlagFatalError
37)
38
39func (s ProducerTxnStatusFlag) String() string {
40 status := make([]string, 0)
41 if s&ProducerTxnFlagUninitialized != 0 {
42 status = append(status, "ProducerTxnStateUninitialized")
43 }
44 if s&ProducerTxnFlagInitializing != 0 {
45 status = append(status, "ProducerTxnStateInitializing")
46 }
47 if s&ProducerTxnFlagReady != 0 {
48 status = append(status, "ProducerTxnStateReady")
49 }
50 if s&ProducerTxnFlagInTransaction != 0 {
51 status = append(status, "ProducerTxnStateInTransaction")
52 }
53 if s&ProducerTxnFlagEndTransaction != 0 {
54 status = append(status, "ProducerTxnStateEndTransaction")
55 }
56 if s&ProducerTxnFlagInError != 0 {
57 status = append(status, "ProducerTxnStateInError")
58 }
59 if s&ProducerTxnFlagCommittingTransaction != 0 {
60 status = append(status, "ProducerTxnStateCommittingTransaction")
61 }
62 if s&ProducerTxnFlagAbortingTransaction != 0 {
63 status = append(status, "ProducerTxnStateAbortingTransaction")
64 }
65 if s&ProducerTxnFlagAbortableError != 0 {
66 status = append(status, "ProducerTxnStateAbortableError")
67 }
68 if s&ProducerTxnFlagFatalError != 0 {
69 status = append(status, "ProducerTxnStateFatalError")
70 }
71 return strings.Join(status, "|")
72}
73
74// transactionManager keeps the state necessary to ensure idempotent production
75type transactionManager struct {
76 producerID int64
77 producerEpoch int16
78 sequenceNumbers map[string]int32
79 mutex sync.Mutex
80 transactionalID string
81 transactionTimeout time.Duration
82 client Client
83
84 // when kafka cluster is at least 2.5.0.
85 // used to recover when producer failed.
86 coordinatorSupportsBumpingEpoch bool
87
88 // When producer need to bump it's epoch.
89 epochBumpRequired bool
90 // Record last seen error.
91 lastError error
92
93 // Ensure that status is never accessed with a race-condition.
94 statusLock sync.RWMutex
95 status ProducerTxnStatusFlag
96
97 // Ensure that only one goroutine will update partitions in current transaction.
98 partitionInTxnLock sync.Mutex
99 pendingPartitionsInCurrentTxn topicPartitionSet
100 partitionsInCurrentTxn topicPartitionSet
101
102 // Offsets to add to transaction.
103 offsetsInCurrentTxn map[string]topicPartitionOffsets
104}
105
106const (
107 noProducerID = -1
108 noProducerEpoch = -1
109
110 // see publishTxnPartitions comment.
111 addPartitionsRetryBackoff = 20 * time.Millisecond
112)
113
114// txnmngr allowed transitions.
115var producerTxnTransitions = map[ProducerTxnStatusFlag][]ProducerTxnStatusFlag{
116 ProducerTxnFlagUninitialized: {
117 ProducerTxnFlagReady,
118 ProducerTxnFlagInError,
119 },
120 // When we need are initializing
121 ProducerTxnFlagInitializing: {
122 ProducerTxnFlagInitializing,
123 ProducerTxnFlagReady,
124 ProducerTxnFlagInError,
125 },
126 // When we have initialized transactional producer
127 ProducerTxnFlagReady: {
128 ProducerTxnFlagInTransaction,
129 },
130 // When beginTxn has been called
131 ProducerTxnFlagInTransaction: {
132 // When calling commit or abort
133 ProducerTxnFlagEndTransaction,
134 // When got an error
135 ProducerTxnFlagInError,
136 },
137 ProducerTxnFlagEndTransaction: {
138 // When epoch bump
139 ProducerTxnFlagInitializing,
140 // When commit is good
141 ProducerTxnFlagReady,
142 // When got an error
143 ProducerTxnFlagInError,
144 },
145 // Need to abort transaction
146 ProducerTxnFlagAbortableError: {
147 // Call AbortTxn
148 ProducerTxnFlagAbortingTransaction,
149 // When got an error
150 ProducerTxnFlagInError,
151 },
152 // Need to close producer
153 ProducerTxnFlagFatalError: {
154 ProducerTxnFlagFatalError,
155 },
156}
157
158type topicPartition struct {
159 topic string
160 partition int32
161}
162
163// to ensure that we don't do a full scan every time a partition or an offset is added.
164type (
165 topicPartitionSet map[topicPartition]struct{}
166 topicPartitionOffsets map[topicPartition]*PartitionOffsetMetadata
167)
168
169func (s topicPartitionSet) mapToRequest() map[string][]int32 {
170 result := make(map[string][]int32, len(s))
171 for tp := range s {
172 result[tp.topic] = append(result[tp.topic], tp.partition)
173 }
174 return result
175}
176
177func (s topicPartitionOffsets) mapToRequest() map[string][]*PartitionOffsetMetadata {
178 result := make(map[string][]*PartitionOffsetMetadata, len(s))
179 for tp, offset := range s {
180 result[tp.topic] = append(result[tp.topic], offset)
181 }
182 return result
183}
184
185// Return true if current transition is allowed.
186func (t *transactionManager) isTransitionValid(target ProducerTxnStatusFlag) bool {
187 for status, allowedTransitions := range producerTxnTransitions {
188 if status&t.status != 0 {
189 for _, allowedTransition := range allowedTransitions {
190 if allowedTransition&target != 0 {
191 return true
192 }
193 }
194 }
195 }
196 return false
197}
198
199// Get current transaction status.
200func (t *transactionManager) currentTxnStatus() ProducerTxnStatusFlag {
201 t.statusLock.RLock()
202 defer t.statusLock.RUnlock()
203
204 return t.status
205}
206
207// Try to transition to a valid status and return an error otherwise.
208func (t *transactionManager) transitionTo(target ProducerTxnStatusFlag, err error) error {
209 t.statusLock.Lock()
210 defer t.statusLock.Unlock()
211
212 if !t.isTransitionValid(target) {
213 return ErrTransitionNotAllowed
214 }
215
216 if target&ProducerTxnFlagInError != 0 {
217 if err == nil {
218 return ErrCannotTransitionNilError
219 }
220 t.lastError = err
221 } else {
222 t.lastError = nil
223 }
224
225 DebugLogger.Printf("txnmgr/transition [%s] transition from %s to %s\n", t.transactionalID, t.status, target)
226
227 t.status = target
228 return err
229}
230
231func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
232 key := fmt.Sprintf("%s-%d", topic, partition)
233 t.mutex.Lock()
234 defer t.mutex.Unlock()
235 sequence := t.sequenceNumbers[key]
236 t.sequenceNumbers[key] = sequence + 1
237 return sequence, t.producerEpoch
238}
239
240func (t *transactionManager) bumpEpoch() {
241 t.mutex.Lock()
242 defer t.mutex.Unlock()
243 t.producerEpoch++
244 for k := range t.sequenceNumbers {
245 t.sequenceNumbers[k] = 0
246 }
247}
248
249func (t *transactionManager) getProducerID() (int64, int16) {
250 t.mutex.Lock()
251 defer t.mutex.Unlock()
252 return t.producerID, t.producerEpoch
253}
254
255// Compute retry backoff considered current attempts.
256func (t *transactionManager) computeBackoff(attemptsRemaining int) time.Duration {
257 if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
258 maxRetries := t.client.Config().Producer.Transaction.Retry.Max
259 retries := maxRetries - attemptsRemaining
260 return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
261 }
262 return t.client.Config().Producer.Transaction.Retry.Backoff
263}
264
265// return true is txnmngr is transactinal.
266func (t *transactionManager) isTransactional() bool {
267 return t.transactionalID != ""
268}
269
270// add specified offsets to current transaction.
271func (t *transactionManager) addOffsetsToTxn(offsetsToAdd map[string][]*PartitionOffsetMetadata, groupId string) error {
272 t.mutex.Lock()
273 defer t.mutex.Unlock()
274
275 if t.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 {
276 return ErrTransactionNotReady
277 }
278
279 if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
280 return t.lastError
281 }
282
283 if _, ok := t.offsetsInCurrentTxn[groupId]; !ok {
284 t.offsetsInCurrentTxn[groupId] = topicPartitionOffsets{}
285 }
286
287 for topic, offsets := range offsetsToAdd {
288 for _, offset := range offsets {
289 tp := topicPartition{topic: topic, partition: offset.Partition}
290 t.offsetsInCurrentTxn[groupId][tp] = offset
291 }
292 }
293 return nil
294}
295
296// send txnmgnr save offsets to transaction coordinator.
297func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets, groupId string) (topicPartitionOffsets, error) {
298 // First AddOffsetsToTxn
299 attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
300 exec := func(run func() (bool, error), err error) error {
301 for attemptsRemaining >= 0 {
302 var retry bool
303 retry, err = run()
304 if !retry {
305 return err
306 }
307 backoff := t.computeBackoff(attemptsRemaining)
308 Logger.Printf("txnmgr/add-offset-to-txn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
309 t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
310 time.Sleep(backoff)
311 attemptsRemaining--
312 }
313 return err
314 }
315 lastError := exec(func() (bool, error) {
316 coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
317 if err != nil {
318 return true, err
319 }
320 request := &AddOffsetsToTxnRequest{
321 TransactionalID: t.transactionalID,
322 ProducerEpoch: t.producerEpoch,
323 ProducerID: t.producerID,
324 GroupID: groupId,
325 }
326 if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
327 // Version 2 adds the support for new error code PRODUCER_FENCED.
328 request.Version = 2
329 } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
330 // Version 1 is the same as version 0.
331 request.Version = 1
332 }
333 response, err := coordinator.AddOffsetsToTxn(request)
334 if err != nil {
335 // If an error occurred try to refresh current transaction coordinator.
336 _ = coordinator.Close()
337 _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
338 return true, err
339 }
340 if response == nil {
341 // If no response is returned just retry.
342 return true, ErrTxnUnableToParseResponse
343 }
344 if response.Err == ErrNoError {
345 DebugLogger.Printf("txnmgr/add-offset-to-txn [%s] successful add-offset-to-txn with group %s %+v\n",
346 t.transactionalID, groupId, response)
347 // If no error, just exit.
348 return false, nil
349 }
350 switch response.Err {
351 case ErrConsumerCoordinatorNotAvailable:
352 fallthrough
353 case ErrNotCoordinatorForConsumer:
354 _ = coordinator.Close()
355 _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
356 fallthrough
357 case ErrOffsetsLoadInProgress:
358 fallthrough
359 case ErrConcurrentTransactions:
360 // Retry
361 case ErrUnknownProducerID:
362 fallthrough
363 case ErrInvalidProducerIDMapping:
364 return false, t.abortableErrorIfPossible(response.Err)
365 case ErrGroupAuthorizationFailed:
366 return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
367 default:
368 // Others are fatal
369 return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
370 }
371 return true, response.Err
372 }, nil)
373
374 if lastError != nil {
375 return offsets, lastError
376 }
377
378 resultOffsets := offsets
379 // Then TxnOffsetCommit
380 // note the result is not completed until the TxnOffsetCommit returns
381 attemptsRemaining = t.client.Config().Producer.Transaction.Retry.Max
382 execTxnOffsetCommit := func(run func() (topicPartitionOffsets, bool, error), err error) (topicPartitionOffsets, error) {
383 var r topicPartitionOffsets
384 for attemptsRemaining >= 0 {
385 var retry bool
386 r, retry, err = run()
387 if !retry {
388 return r, err
389 }
390 backoff := t.computeBackoff(attemptsRemaining)
391 Logger.Printf("txnmgr/txn-offset-commit [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
392 t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
393 time.Sleep(backoff)
394 attemptsRemaining--
395 }
396 return r, err
397 }
398 return execTxnOffsetCommit(func() (topicPartitionOffsets, bool, error) {
399 consumerGroupCoordinator, err := t.client.Coordinator(groupId)
400 if err != nil {
401 return resultOffsets, true, err
402 }
403 request := &TxnOffsetCommitRequest{
404 TransactionalID: t.transactionalID,
405 ProducerEpoch: t.producerEpoch,
406 ProducerID: t.producerID,
407 GroupID: groupId,
408 Topics: offsets.mapToRequest(),
409 }
410 if t.client.Config().Version.IsAtLeast(V2_1_0_0) {
411 // Version 2 adds the committed leader epoch.
412 request.Version = 2
413 } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
414 // Version 1 is the same as version 0.
415 request.Version = 1
416 }
417 responses, err := consumerGroupCoordinator.TxnOffsetCommit(request)
418 if err != nil {
419 _ = consumerGroupCoordinator.Close()
420 _ = t.client.RefreshCoordinator(groupId)
421 return resultOffsets, true, err
422 }
423
424 if responses == nil {
425 return resultOffsets, true, ErrTxnUnableToParseResponse
426 }
427
428 var responseErrors []error
429 failedTxn := topicPartitionOffsets{}
430 for topic, partitionErrors := range responses.Topics {
431 for _, partitionError := range partitionErrors {
432 switch partitionError.Err {
433 case ErrNoError:
434 continue
435 // If the topic is unknown or the coordinator is loading, retry with the current coordinator
436 case ErrRequestTimedOut:
437 fallthrough
438 case ErrConsumerCoordinatorNotAvailable:
439 fallthrough
440 case ErrNotCoordinatorForConsumer:
441 _ = consumerGroupCoordinator.Close()
442 _ = t.client.RefreshCoordinator(groupId)
443 fallthrough
444 case ErrUnknownTopicOrPartition:
445 fallthrough
446 case ErrOffsetsLoadInProgress:
447 // Do nothing just retry
448 case ErrIllegalGeneration:
449 fallthrough
450 case ErrUnknownMemberId:
451 fallthrough
452 case ErrFencedInstancedId:
453 fallthrough
454 case ErrGroupAuthorizationFailed:
455 return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, partitionError.Err)
456 default:
457 // Others are fatal
458 return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, partitionError.Err)
459 }
460 tp := topicPartition{topic: topic, partition: partitionError.Partition}
461 failedTxn[tp] = offsets[tp]
462 responseErrors = append(responseErrors, partitionError.Err)
463 }
464 }
465
466 resultOffsets = failedTxn
467
468 if len(resultOffsets) == 0 {
469 DebugLogger.Printf("txnmgr/txn-offset-commit [%s] successful txn-offset-commit with group %s\n",
470 t.transactionalID, groupId)
471 return resultOffsets, false, nil
472 }
473 return resultOffsets, true, Wrap(ErrTxnOffsetCommit, responseErrors...)
474 }, nil)
475}
476
477func (t *transactionManager) initProducerId() (int64, int16, error) {
478 isEpochBump := false
479
480 req := &InitProducerIDRequest{}
481 if t.isTransactional() {
482 req.TransactionalID = &t.transactionalID
483 req.TransactionTimeout = t.transactionTimeout
484 }
485
486 if t.client.Config().Version.IsAtLeast(V2_5_0_0) {
487 if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
488 // Version 4 adds the support for new error code PRODUCER_FENCED.
489 req.Version = 4
490 } else {
491 // Version 3 adds ProducerId and ProducerEpoch, allowing producers to try
492 // to resume after an INVALID_PRODUCER_EPOCH error
493 req.Version = 3
494 }
495 isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch
496 t.coordinatorSupportsBumpingEpoch = true
497 req.ProducerID = t.producerID
498 req.ProducerEpoch = t.producerEpoch
499 } else if t.client.Config().Version.IsAtLeast(V2_4_0_0) {
500 // Version 2 is the first flexible version.
501 req.Version = 2
502 } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
503 // Version 1 is the same as version 0.
504 req.Version = 1
505 }
506
507 if isEpochBump {
508 err := t.transitionTo(ProducerTxnFlagInitializing, nil)
509 if err != nil {
510 return -1, -1, err
511 }
512 DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId for the first time in order to acquire a producer ID\n",
513 t.transactionalID)
514 } else {
515 DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId with current producer ID %d and epoch %d in order to bump the epoch\n",
516 t.transactionalID, t.producerID, t.producerEpoch)
517 }
518
519 attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
520 exec := func(run func() (int64, int16, bool, error), err error) (int64, int16, error) {
521 pid := int64(-1)
522 pepoch := int16(-1)
523 for attemptsRemaining >= 0 {
524 var retry bool
525 pid, pepoch, retry, err = run()
526 if !retry {
527 return pid, pepoch, err
528 }
529 backoff := t.computeBackoff(attemptsRemaining)
530 Logger.Printf("txnmgr/init-producer-id [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
531 t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
532 time.Sleep(backoff)
533 attemptsRemaining--
534 }
535 return -1, -1, err
536 }
537 return exec(func() (int64, int16, bool, error) {
538 var err error
539 var coordinator *Broker
540 if t.isTransactional() {
541 coordinator, err = t.client.TransactionCoordinator(t.transactionalID)
542 } else {
543 coordinator = t.client.LeastLoadedBroker()
544 }
545 if err != nil {
546 return -1, -1, true, err
547 }
548 response, err := coordinator.InitProducerID(req)
549 if err != nil {
550 if t.isTransactional() {
551 _ = coordinator.Close()
552 _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
553 }
554 return -1, -1, true, err
555 }
556 if response == nil {
557 return -1, -1, true, ErrTxnUnableToParseResponse
558 }
559 if response.Err == ErrNoError {
560 if isEpochBump {
561 t.sequenceNumbers = make(map[string]int32)
562 }
563 err := t.transitionTo(ProducerTxnFlagReady, nil)
564 if err != nil {
565 return -1, -1, true, err
566 }
567 DebugLogger.Printf("txnmgr/init-producer-id [%s] successful init producer id %+v\n",
568 t.transactionalID, response)
569 return response.ProducerID, response.ProducerEpoch, false, nil
570 }
571 switch response.Err {
572 // Retriable errors
573 case ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer, ErrOffsetsLoadInProgress:
574 if t.isTransactional() {
575 _ = coordinator.Close()
576 _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
577 }
578 // Fatal errors
579 default:
580 return -1, -1, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
581 }
582 return -1, -1, true, response.Err
583 }, nil)
584}
585
586// if kafka cluster is at least 2.5.0 mark txnmngr to bump epoch else mark it as fatal.
587func (t *transactionManager) abortableErrorIfPossible(err error) error {
588 if t.coordinatorSupportsBumpingEpoch {
589 t.epochBumpRequired = true
590 return t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
591 }
592 return t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
593}
594
595// End current transaction.
596func (t *transactionManager) completeTransaction() error {
597 if t.epochBumpRequired {
598 err := t.transitionTo(ProducerTxnFlagInitializing, nil)
599 if err != nil {
600 return err
601 }
602 } else {
603 err := t.transitionTo(ProducerTxnFlagReady, nil)
604 if err != nil {
605 return err
606 }
607 }
608
609 t.lastError = nil
610 t.epochBumpRequired = false
611 t.partitionsInCurrentTxn = topicPartitionSet{}
612 t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
613 t.offsetsInCurrentTxn = map[string]topicPartitionOffsets{}
614
615 return nil
616}
617
618// send EndTxn request with commit flag. (true when committing false otherwise)
619func (t *transactionManager) endTxn(commit bool) error {
620 attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
621 exec := func(run func() (bool, error), err error) error {
622 for attemptsRemaining >= 0 {
623 var retry bool
624 retry, err = run()
625 if !retry {
626 return err
627 }
628 backoff := t.computeBackoff(attemptsRemaining)
629 Logger.Printf("txnmgr/endtxn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
630 t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
631 time.Sleep(backoff)
632 attemptsRemaining--
633 }
634 return err
635 }
636 return exec(func() (bool, error) {
637 coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
638 if err != nil {
639 return true, err
640 }
641 request := &EndTxnRequest{
642 TransactionalID: t.transactionalID,
643 ProducerEpoch: t.producerEpoch,
644 ProducerID: t.producerID,
645 TransactionResult: commit,
646 }
647 if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
648 // Version 2 adds the support for new error code PRODUCER_FENCED.
649 request.Version = 2
650 } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
651 // Version 1 is the same as version 0.
652 request.Version = 1
653 }
654 response, err := coordinator.EndTxn(request)
655 if err != nil {
656 // Always retry on network error
657 _ = coordinator.Close()
658 _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
659 return true, err
660 }
661 if response == nil {
662 return true, ErrTxnUnableToParseResponse
663 }
664 if response.Err == ErrNoError {
665 DebugLogger.Printf("txnmgr/endtxn [%s] successful to end txn %+v\n",
666 t.transactionalID, response)
667 return false, t.completeTransaction()
668 }
669 switch response.Err {
670 // Need to refresh coordinator
671 case ErrConsumerCoordinatorNotAvailable:
672 fallthrough
673 case ErrNotCoordinatorForConsumer:
674 _ = coordinator.Close()
675 _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
676 fallthrough
677 case ErrOffsetsLoadInProgress:
678 fallthrough
679 case ErrConcurrentTransactions:
680 // Just retry
681 case ErrUnknownProducerID:
682 fallthrough
683 case ErrInvalidProducerIDMapping:
684 return false, t.abortableErrorIfPossible(response.Err)
685 // Fatal errors
686 default:
687 return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
688 }
689 return true, response.Err
690 }, nil)
691}
692
693// We will try to publish associated offsets for each groups
694// then send endtxn request to mark transaction as finished.
695func (t *transactionManager) finishTransaction(commit bool) error {
696 t.mutex.Lock()
697 defer t.mutex.Unlock()
698
699 // Ensure no error when committing or aborting
700 if commit && t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
701 return t.lastError
702 } else if !commit && t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
703 return t.lastError
704 }
705
706 // if no records has been sent don't do anything.
707 if len(t.partitionsInCurrentTxn) == 0 {
708 return t.completeTransaction()
709 }
710
711 epochBump := t.epochBumpRequired
712 // If we're aborting the transaction, so there should be no need to add offsets.
713 if commit && len(t.offsetsInCurrentTxn) > 0 {
714 for group, offsets := range t.offsetsInCurrentTxn {
715 newOffsets, err := t.publishOffsetsToTxn(offsets, group)
716 if err != nil {
717 t.offsetsInCurrentTxn[group] = newOffsets
718 return err
719 }
720 delete(t.offsetsInCurrentTxn, group)
721 }
722 }
723
724 if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
725 return t.lastError
726 }
727
728 if !errors.Is(t.lastError, ErrInvalidProducerIDMapping) {
729 err := t.endTxn(commit)
730 if err != nil {
731 return err
732 }
733 if !epochBump {
734 return nil
735 }
736 }
737 // reset pid and epoch if needed.
738 return t.initializeTransactions()
739}
740
741// called before sending any transactional record
742// won't do anything if current topic-partition is already added to transaction.
743func (t *transactionManager) maybeAddPartitionToCurrentTxn(topic string, partition int32) {
744 if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
745 return
746 }
747
748 tp := topicPartition{topic: topic, partition: partition}
749
750 t.partitionInTxnLock.Lock()
751 defer t.partitionInTxnLock.Unlock()
752 if _, ok := t.partitionsInCurrentTxn[tp]; ok {
753 // partition is already added
754 return
755 }
756
757 t.pendingPartitionsInCurrentTxn[tp] = struct{}{}
758}
759
760// Makes a request to kafka to add a list of partitions ot the current transaction.
761func (t *transactionManager) publishTxnPartitions() error {
762 t.partitionInTxnLock.Lock()
763 defer t.partitionInTxnLock.Unlock()
764
765 if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
766 return t.lastError
767 }
768
769 if len(t.pendingPartitionsInCurrentTxn) == 0 {
770 return nil
771 }
772
773 // Remove the partitions from the pending set regardless of the result. We use the presence
774 // of partitions in the pending set to know when it is not safe to send batches. However, if
775 // the partitions failed to be added and we enter an error state, we expect the batches to be
776 // aborted anyway. In this case, we must be able to continue sending the batches which are in
777 // retry for partitions that were successfully added.
778 removeAllPartitionsOnFatalOrAbortedError := func() {
779 t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
780 }
781
782 // We only want to reduce the backoff when retrying the first AddPartition which errored out due to a
783 // CONCURRENT_TRANSACTIONS error since this means that the previous transaction is still completing and
784 // we don't want to wait too long before trying to start the new one.
785 //
786 // This is only a temporary fix, the long term solution is being tracked in
787 // https://issues.apache.org/jira/browse/KAFKA-5482
788 retryBackoff := t.client.Config().Producer.Transaction.Retry.Backoff
789 computeBackoff := func(attemptsRemaining int) time.Duration {
790 if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
791 maxRetries := t.client.Config().Producer.Transaction.Retry.Max
792 retries := maxRetries - attemptsRemaining
793 return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
794 }
795 return retryBackoff
796 }
797 attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
798
799 exec := func(run func() (bool, error), err error) error {
800 for attemptsRemaining >= 0 {
801 var retry bool
802 retry, err = run()
803 if !retry {
804 return err
805 }
806 backoff := computeBackoff(attemptsRemaining)
807 Logger.Printf("txnmgr/add-partition-to-txn retrying after %dms... (%d attempts remaining) (%s)\n", backoff/time.Millisecond, attemptsRemaining, err)
808 time.Sleep(backoff)
809 attemptsRemaining--
810 }
811 return err
812 }
813 return exec(func() (bool, error) {
814 coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
815 if err != nil {
816 return true, err
817 }
818 request := &AddPartitionsToTxnRequest{
819 TransactionalID: t.transactionalID,
820 ProducerID: t.producerID,
821 ProducerEpoch: t.producerEpoch,
822 TopicPartitions: t.pendingPartitionsInCurrentTxn.mapToRequest(),
823 }
824 if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
825 // Version 2 adds the support for new error code PRODUCER_FENCED.
826 request.Version = 2
827 } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
828 // Version 1 is the same as version 0.
829 request.Version = 1
830 }
831 addPartResponse, err := coordinator.AddPartitionsToTxn(request)
832 if err != nil {
833 _ = coordinator.Close()
834 _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
835 return true, err
836 }
837
838 if addPartResponse == nil {
839 return true, ErrTxnUnableToParseResponse
840 }
841
842 // remove from the list partitions that have been successfully updated
843 var responseErrors []error
844 for topic, results := range addPartResponse.Errors {
845 for _, response := range results {
846 tp := topicPartition{topic: topic, partition: response.Partition}
847 switch response.Err {
848 case ErrNoError:
849 // Mark partition as added to transaction
850 t.partitionsInCurrentTxn[tp] = struct{}{}
851 delete(t.pendingPartitionsInCurrentTxn, tp)
852 continue
853 case ErrConsumerCoordinatorNotAvailable:
854 fallthrough
855 case ErrNotCoordinatorForConsumer:
856 _ = coordinator.Close()
857 _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
858 fallthrough
859 case ErrUnknownTopicOrPartition:
860 fallthrough
861 case ErrOffsetsLoadInProgress:
862 // Retry topicPartition
863 case ErrConcurrentTransactions:
864 if len(t.partitionsInCurrentTxn) == 0 && retryBackoff > addPartitionsRetryBackoff {
865 retryBackoff = addPartitionsRetryBackoff
866 }
867 case ErrOperationNotAttempted:
868 fallthrough
869 case ErrTopicAuthorizationFailed:
870 removeAllPartitionsOnFatalOrAbortedError()
871 return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
872 case ErrUnknownProducerID:
873 fallthrough
874 case ErrInvalidProducerIDMapping:
875 removeAllPartitionsOnFatalOrAbortedError()
876 return false, t.abortableErrorIfPossible(response.Err)
877 // Fatal errors
878 default:
879 removeAllPartitionsOnFatalOrAbortedError()
880 return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
881 }
882 responseErrors = append(responseErrors, response.Err)
883 }
884 }
885
886 // handle end
887 if len(t.pendingPartitionsInCurrentTxn) == 0 {
888 DebugLogger.Printf("txnmgr/add-partition-to-txn [%s] successful to add partitions txn %+v\n",
889 t.transactionalID, addPartResponse)
890 return false, nil
891 }
892 return true, Wrap(ErrAddPartitionsToTxn, responseErrors...)
893 }, nil)
894}
895
896// Build a new transaction manager sharing producer client.
897func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
898 txnmgr := &transactionManager{
899 producerID: noProducerID,
900 producerEpoch: noProducerEpoch,
901 client: client,
902 pendingPartitionsInCurrentTxn: topicPartitionSet{},
903 partitionsInCurrentTxn: topicPartitionSet{},
904 offsetsInCurrentTxn: make(map[string]topicPartitionOffsets),
905 status: ProducerTxnFlagUninitialized,
906 }
907
908 if conf.Producer.Idempotent {
909 txnmgr.transactionalID = conf.Producer.Transaction.ID
910 txnmgr.transactionTimeout = conf.Producer.Transaction.Timeout
911 txnmgr.sequenceNumbers = make(map[string]int32)
912 txnmgr.mutex = sync.Mutex{}
913
914 var err error
915 txnmgr.producerID, txnmgr.producerEpoch, err = txnmgr.initProducerId()
916 if err != nil {
917 return nil, err
918 }
919 Logger.Printf("txnmgr/init-producer-id [%s] obtained a ProducerId: %d and ProducerEpoch: %d\n",
920 txnmgr.transactionalID, txnmgr.producerID, txnmgr.producerEpoch)
921 }
922
923 return txnmgr, nil
924}
925
926// re-init producer-id and producer-epoch if needed.
927func (t *transactionManager) initializeTransactions() (err error) {
928 t.producerID, t.producerEpoch, err = t.initProducerId()
929 return
930}