| package sarama |
| |
| import ( |
| "errors" |
| "fmt" |
| "strings" |
| "sync" |
| "time" |
| ) |
| |
| // ProducerTxnStatusFlag mark current transaction status. |
| type ProducerTxnStatusFlag int16 |
| |
| const ( |
| // ProducerTxnFlagUninitialized when txnmgr is created |
| ProducerTxnFlagUninitialized ProducerTxnStatusFlag = 1 << iota |
| // ProducerTxnFlagInitializing when txnmgr is initializing |
| ProducerTxnFlagInitializing |
| // ProducerTxnFlagReady when is ready to receive transaction |
| ProducerTxnFlagReady |
| // ProducerTxnFlagInTransaction when transaction is started |
| ProducerTxnFlagInTransaction |
| // ProducerTxnFlagEndTransaction when transaction will be committed |
| ProducerTxnFlagEndTransaction |
| // ProducerTxnFlagInError when having abortable or fatal error |
| ProducerTxnFlagInError |
| // ProducerTxnFlagCommittingTransaction when committing txn |
| ProducerTxnFlagCommittingTransaction |
| // ProducerTxnFlagAbortingTransaction when committing txn |
| ProducerTxnFlagAbortingTransaction |
| // ProducerTxnFlagAbortableError when producer encounter an abortable error |
| // Must call AbortTxn in this case. |
| ProducerTxnFlagAbortableError |
| // ProducerTxnFlagFatalError when producer encounter an fatal error |
| // Must Close an recreate it. |
| ProducerTxnFlagFatalError |
| ) |
| |
| func (s ProducerTxnStatusFlag) String() string { |
| status := make([]string, 0) |
| if s&ProducerTxnFlagUninitialized != 0 { |
| status = append(status, "ProducerTxnStateUninitialized") |
| } |
| if s&ProducerTxnFlagInitializing != 0 { |
| status = append(status, "ProducerTxnStateInitializing") |
| } |
| if s&ProducerTxnFlagReady != 0 { |
| status = append(status, "ProducerTxnStateReady") |
| } |
| if s&ProducerTxnFlagInTransaction != 0 { |
| status = append(status, "ProducerTxnStateInTransaction") |
| } |
| if s&ProducerTxnFlagEndTransaction != 0 { |
| status = append(status, "ProducerTxnStateEndTransaction") |
| } |
| if s&ProducerTxnFlagInError != 0 { |
| status = append(status, "ProducerTxnStateInError") |
| } |
| if s&ProducerTxnFlagCommittingTransaction != 0 { |
| status = append(status, "ProducerTxnStateCommittingTransaction") |
| } |
| if s&ProducerTxnFlagAbortingTransaction != 0 { |
| status = append(status, "ProducerTxnStateAbortingTransaction") |
| } |
| if s&ProducerTxnFlagAbortableError != 0 { |
| status = append(status, "ProducerTxnStateAbortableError") |
| } |
| if s&ProducerTxnFlagFatalError != 0 { |
| status = append(status, "ProducerTxnStateFatalError") |
| } |
| return strings.Join(status, "|") |
| } |
| |
| // transactionManager keeps the state necessary to ensure idempotent production |
| type transactionManager struct { |
| producerID int64 |
| producerEpoch int16 |
| sequenceNumbers map[string]int32 |
| mutex sync.Mutex |
| transactionalID string |
| transactionTimeout time.Duration |
| client Client |
| |
| // when kafka cluster is at least 2.5.0. |
| // used to recover when producer failed. |
| coordinatorSupportsBumpingEpoch bool |
| |
| // When producer need to bump it's epoch. |
| epochBumpRequired bool |
| // Record last seen error. |
| lastError error |
| |
| // Ensure that status is never accessed with a race-condition. |
| statusLock sync.RWMutex |
| status ProducerTxnStatusFlag |
| |
| // Ensure that only one goroutine will update partitions in current transaction. |
| partitionInTxnLock sync.Mutex |
| pendingPartitionsInCurrentTxn topicPartitionSet |
| partitionsInCurrentTxn topicPartitionSet |
| |
| // Offsets to add to transaction. |
| offsetsInCurrentTxn map[string]topicPartitionOffsets |
| } |
| |
| const ( |
| noProducerID = -1 |
| noProducerEpoch = -1 |
| |
| // see publishTxnPartitions comment. |
| addPartitionsRetryBackoff = 20 * time.Millisecond |
| ) |
| |
| // txnmngr allowed transitions. |
| var producerTxnTransitions = map[ProducerTxnStatusFlag][]ProducerTxnStatusFlag{ |
| ProducerTxnFlagUninitialized: { |
| ProducerTxnFlagReady, |
| ProducerTxnFlagInError, |
| }, |
| // When we need are initializing |
| ProducerTxnFlagInitializing: { |
| ProducerTxnFlagInitializing, |
| ProducerTxnFlagReady, |
| ProducerTxnFlagInError, |
| }, |
| // When we have initialized transactional producer |
| ProducerTxnFlagReady: { |
| ProducerTxnFlagInTransaction, |
| }, |
| // When beginTxn has been called |
| ProducerTxnFlagInTransaction: { |
| // When calling commit or abort |
| ProducerTxnFlagEndTransaction, |
| // When got an error |
| ProducerTxnFlagInError, |
| }, |
| ProducerTxnFlagEndTransaction: { |
| // When epoch bump |
| ProducerTxnFlagInitializing, |
| // When commit is good |
| ProducerTxnFlagReady, |
| // When got an error |
| ProducerTxnFlagInError, |
| }, |
| // Need to abort transaction |
| ProducerTxnFlagAbortableError: { |
| // Call AbortTxn |
| ProducerTxnFlagAbortingTransaction, |
| // When got an error |
| ProducerTxnFlagInError, |
| }, |
| // Need to close producer |
| ProducerTxnFlagFatalError: { |
| ProducerTxnFlagFatalError, |
| }, |
| } |
| |
| type topicPartition struct { |
| topic string |
| partition int32 |
| } |
| |
| // to ensure that we don't do a full scan every time a partition or an offset is added. |
| type ( |
| topicPartitionSet map[topicPartition]struct{} |
| topicPartitionOffsets map[topicPartition]*PartitionOffsetMetadata |
| ) |
| |
| func (s topicPartitionSet) mapToRequest() map[string][]int32 { |
| result := make(map[string][]int32, len(s)) |
| for tp := range s { |
| result[tp.topic] = append(result[tp.topic], tp.partition) |
| } |
| return result |
| } |
| |
| func (s topicPartitionOffsets) mapToRequest() map[string][]*PartitionOffsetMetadata { |
| result := make(map[string][]*PartitionOffsetMetadata, len(s)) |
| for tp, offset := range s { |
| result[tp.topic] = append(result[tp.topic], offset) |
| } |
| return result |
| } |
| |
| // Return true if current transition is allowed. |
| func (t *transactionManager) isTransitionValid(target ProducerTxnStatusFlag) bool { |
| for status, allowedTransitions := range producerTxnTransitions { |
| if status&t.status != 0 { |
| for _, allowedTransition := range allowedTransitions { |
| if allowedTransition&target != 0 { |
| return true |
| } |
| } |
| } |
| } |
| return false |
| } |
| |
| // Get current transaction status. |
| func (t *transactionManager) currentTxnStatus() ProducerTxnStatusFlag { |
| t.statusLock.RLock() |
| defer t.statusLock.RUnlock() |
| |
| return t.status |
| } |
| |
| // Try to transition to a valid status and return an error otherwise. |
| func (t *transactionManager) transitionTo(target ProducerTxnStatusFlag, err error) error { |
| t.statusLock.Lock() |
| defer t.statusLock.Unlock() |
| |
| if !t.isTransitionValid(target) { |
| return ErrTransitionNotAllowed |
| } |
| |
| if target&ProducerTxnFlagInError != 0 { |
| if err == nil { |
| return ErrCannotTransitionNilError |
| } |
| t.lastError = err |
| } else { |
| t.lastError = nil |
| } |
| |
| DebugLogger.Printf("txnmgr/transition [%s] transition from %s to %s\n", t.transactionalID, t.status, target) |
| |
| t.status = target |
| return err |
| } |
| |
| func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) { |
| key := fmt.Sprintf("%s-%d", topic, partition) |
| t.mutex.Lock() |
| defer t.mutex.Unlock() |
| sequence := t.sequenceNumbers[key] |
| t.sequenceNumbers[key] = sequence + 1 |
| return sequence, t.producerEpoch |
| } |
| |
| func (t *transactionManager) bumpEpoch() { |
| t.mutex.Lock() |
| defer t.mutex.Unlock() |
| t.producerEpoch++ |
| for k := range t.sequenceNumbers { |
| t.sequenceNumbers[k] = 0 |
| } |
| } |
| |
| func (t *transactionManager) getProducerID() (int64, int16) { |
| t.mutex.Lock() |
| defer t.mutex.Unlock() |
| return t.producerID, t.producerEpoch |
| } |
| |
| // Compute retry backoff considered current attempts. |
| func (t *transactionManager) computeBackoff(attemptsRemaining int) time.Duration { |
| if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil { |
| maxRetries := t.client.Config().Producer.Transaction.Retry.Max |
| retries := maxRetries - attemptsRemaining |
| return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries) |
| } |
| return t.client.Config().Producer.Transaction.Retry.Backoff |
| } |
| |
| // return true is txnmngr is transactinal. |
| func (t *transactionManager) isTransactional() bool { |
| return t.transactionalID != "" |
| } |
| |
| // add specified offsets to current transaction. |
| func (t *transactionManager) addOffsetsToTxn(offsetsToAdd map[string][]*PartitionOffsetMetadata, groupId string) error { |
| t.mutex.Lock() |
| defer t.mutex.Unlock() |
| |
| if t.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 { |
| return ErrTransactionNotReady |
| } |
| |
| if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 { |
| return t.lastError |
| } |
| |
| if _, ok := t.offsetsInCurrentTxn[groupId]; !ok { |
| t.offsetsInCurrentTxn[groupId] = topicPartitionOffsets{} |
| } |
| |
| for topic, offsets := range offsetsToAdd { |
| for _, offset := range offsets { |
| tp := topicPartition{topic: topic, partition: offset.Partition} |
| t.offsetsInCurrentTxn[groupId][tp] = offset |
| } |
| } |
| return nil |
| } |
| |
| // send txnmgnr save offsets to transaction coordinator. |
| func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets, groupId string) (topicPartitionOffsets, error) { |
| // First AddOffsetsToTxn |
| attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max |
| exec := func(run func() (bool, error), err error) error { |
| for attemptsRemaining >= 0 { |
| var retry bool |
| retry, err = run() |
| if !retry { |
| return err |
| } |
| backoff := t.computeBackoff(attemptsRemaining) |
| Logger.Printf("txnmgr/add-offset-to-txn [%s] retrying after %dms... (%d attempts remaining) (%s)\n", |
| t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err) |
| time.Sleep(backoff) |
| attemptsRemaining-- |
| } |
| return err |
| } |
| lastError := exec(func() (bool, error) { |
| coordinator, err := t.client.TransactionCoordinator(t.transactionalID) |
| if err != nil { |
| return true, err |
| } |
| request := &AddOffsetsToTxnRequest{ |
| TransactionalID: t.transactionalID, |
| ProducerEpoch: t.producerEpoch, |
| ProducerID: t.producerID, |
| GroupID: groupId, |
| } |
| if t.client.Config().Version.IsAtLeast(V2_7_0_0) { |
| // Version 2 adds the support for new error code PRODUCER_FENCED. |
| request.Version = 2 |
| } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) { |
| // Version 1 is the same as version 0. |
| request.Version = 1 |
| } |
| response, err := coordinator.AddOffsetsToTxn(request) |
| if err != nil { |
| // If an error occurred try to refresh current transaction coordinator. |
| _ = coordinator.Close() |
| _ = t.client.RefreshTransactionCoordinator(t.transactionalID) |
| return true, err |
| } |
| if response == nil { |
| // If no response is returned just retry. |
| return true, ErrTxnUnableToParseResponse |
| } |
| if response.Err == ErrNoError { |
| DebugLogger.Printf("txnmgr/add-offset-to-txn [%s] successful add-offset-to-txn with group %s %+v\n", |
| t.transactionalID, groupId, response) |
| // If no error, just exit. |
| return false, nil |
| } |
| switch response.Err { |
| case ErrConsumerCoordinatorNotAvailable: |
| fallthrough |
| case ErrNotCoordinatorForConsumer: |
| _ = coordinator.Close() |
| _ = t.client.RefreshTransactionCoordinator(t.transactionalID) |
| fallthrough |
| case ErrOffsetsLoadInProgress: |
| fallthrough |
| case ErrConcurrentTransactions: |
| // Retry |
| case ErrUnknownProducerID: |
| fallthrough |
| case ErrInvalidProducerIDMapping: |
| return false, t.abortableErrorIfPossible(response.Err) |
| case ErrGroupAuthorizationFailed: |
| return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err) |
| default: |
| // Others are fatal |
| return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err) |
| } |
| return true, response.Err |
| }, nil) |
| |
| if lastError != nil { |
| return offsets, lastError |
| } |
| |
| resultOffsets := offsets |
| // Then TxnOffsetCommit |
| // note the result is not completed until the TxnOffsetCommit returns |
| attemptsRemaining = t.client.Config().Producer.Transaction.Retry.Max |
| execTxnOffsetCommit := func(run func() (topicPartitionOffsets, bool, error), err error) (topicPartitionOffsets, error) { |
| var r topicPartitionOffsets |
| for attemptsRemaining >= 0 { |
| var retry bool |
| r, retry, err = run() |
| if !retry { |
| return r, err |
| } |
| backoff := t.computeBackoff(attemptsRemaining) |
| Logger.Printf("txnmgr/txn-offset-commit [%s] retrying after %dms... (%d attempts remaining) (%s)\n", |
| t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err) |
| time.Sleep(backoff) |
| attemptsRemaining-- |
| } |
| return r, err |
| } |
| return execTxnOffsetCommit(func() (topicPartitionOffsets, bool, error) { |
| consumerGroupCoordinator, err := t.client.Coordinator(groupId) |
| if err != nil { |
| return resultOffsets, true, err |
| } |
| request := &TxnOffsetCommitRequest{ |
| TransactionalID: t.transactionalID, |
| ProducerEpoch: t.producerEpoch, |
| ProducerID: t.producerID, |
| GroupID: groupId, |
| Topics: offsets.mapToRequest(), |
| } |
| if t.client.Config().Version.IsAtLeast(V2_1_0_0) { |
| // Version 2 adds the committed leader epoch. |
| request.Version = 2 |
| } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) { |
| // Version 1 is the same as version 0. |
| request.Version = 1 |
| } |
| responses, err := consumerGroupCoordinator.TxnOffsetCommit(request) |
| if err != nil { |
| _ = consumerGroupCoordinator.Close() |
| _ = t.client.RefreshCoordinator(groupId) |
| return resultOffsets, true, err |
| } |
| |
| if responses == nil { |
| return resultOffsets, true, ErrTxnUnableToParseResponse |
| } |
| |
| var responseErrors []error |
| failedTxn := topicPartitionOffsets{} |
| for topic, partitionErrors := range responses.Topics { |
| for _, partitionError := range partitionErrors { |
| switch partitionError.Err { |
| case ErrNoError: |
| continue |
| // If the topic is unknown or the coordinator is loading, retry with the current coordinator |
| case ErrRequestTimedOut: |
| fallthrough |
| case ErrConsumerCoordinatorNotAvailable: |
| fallthrough |
| case ErrNotCoordinatorForConsumer: |
| _ = consumerGroupCoordinator.Close() |
| _ = t.client.RefreshCoordinator(groupId) |
| fallthrough |
| case ErrUnknownTopicOrPartition: |
| fallthrough |
| case ErrOffsetsLoadInProgress: |
| // Do nothing just retry |
| case ErrIllegalGeneration: |
| fallthrough |
| case ErrUnknownMemberId: |
| fallthrough |
| case ErrFencedInstancedId: |
| fallthrough |
| case ErrGroupAuthorizationFailed: |
| return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, partitionError.Err) |
| default: |
| // Others are fatal |
| return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, partitionError.Err) |
| } |
| tp := topicPartition{topic: topic, partition: partitionError.Partition} |
| failedTxn[tp] = offsets[tp] |
| responseErrors = append(responseErrors, partitionError.Err) |
| } |
| } |
| |
| resultOffsets = failedTxn |
| |
| if len(resultOffsets) == 0 { |
| DebugLogger.Printf("txnmgr/txn-offset-commit [%s] successful txn-offset-commit with group %s\n", |
| t.transactionalID, groupId) |
| return resultOffsets, false, nil |
| } |
| return resultOffsets, true, Wrap(ErrTxnOffsetCommit, responseErrors...) |
| }, nil) |
| } |
| |
| func (t *transactionManager) initProducerId() (int64, int16, error) { |
| isEpochBump := false |
| |
| req := &InitProducerIDRequest{} |
| if t.isTransactional() { |
| req.TransactionalID = &t.transactionalID |
| req.TransactionTimeout = t.transactionTimeout |
| } |
| |
| if t.client.Config().Version.IsAtLeast(V2_5_0_0) { |
| if t.client.Config().Version.IsAtLeast(V2_7_0_0) { |
| // Version 4 adds the support for new error code PRODUCER_FENCED. |
| req.Version = 4 |
| } else { |
| // Version 3 adds ProducerId and ProducerEpoch, allowing producers to try |
| // to resume after an INVALID_PRODUCER_EPOCH error |
| req.Version = 3 |
| } |
| isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch |
| t.coordinatorSupportsBumpingEpoch = true |
| req.ProducerID = t.producerID |
| req.ProducerEpoch = t.producerEpoch |
| } else if t.client.Config().Version.IsAtLeast(V2_4_0_0) { |
| // Version 2 is the first flexible version. |
| req.Version = 2 |
| } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) { |
| // Version 1 is the same as version 0. |
| req.Version = 1 |
| } |
| |
| if isEpochBump { |
| err := t.transitionTo(ProducerTxnFlagInitializing, nil) |
| if err != nil { |
| return -1, -1, err |
| } |
| DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId for the first time in order to acquire a producer ID\n", |
| t.transactionalID) |
| } else { |
| DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId with current producer ID %d and epoch %d in order to bump the epoch\n", |
| t.transactionalID, t.producerID, t.producerEpoch) |
| } |
| |
| attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max |
| exec := func(run func() (int64, int16, bool, error), err error) (int64, int16, error) { |
| pid := int64(-1) |
| pepoch := int16(-1) |
| for attemptsRemaining >= 0 { |
| var retry bool |
| pid, pepoch, retry, err = run() |
| if !retry { |
| return pid, pepoch, err |
| } |
| backoff := t.computeBackoff(attemptsRemaining) |
| Logger.Printf("txnmgr/init-producer-id [%s] retrying after %dms... (%d attempts remaining) (%s)\n", |
| t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err) |
| time.Sleep(backoff) |
| attemptsRemaining-- |
| } |
| return -1, -1, err |
| } |
| return exec(func() (int64, int16, bool, error) { |
| var err error |
| var coordinator *Broker |
| if t.isTransactional() { |
| coordinator, err = t.client.TransactionCoordinator(t.transactionalID) |
| } else { |
| coordinator = t.client.LeastLoadedBroker() |
| } |
| if err != nil { |
| return -1, -1, true, err |
| } |
| response, err := coordinator.InitProducerID(req) |
| if err != nil { |
| if t.isTransactional() { |
| _ = coordinator.Close() |
| _ = t.client.RefreshTransactionCoordinator(t.transactionalID) |
| } |
| return -1, -1, true, err |
| } |
| if response == nil { |
| return -1, -1, true, ErrTxnUnableToParseResponse |
| } |
| if response.Err == ErrNoError { |
| if isEpochBump { |
| t.sequenceNumbers = make(map[string]int32) |
| } |
| err := t.transitionTo(ProducerTxnFlagReady, nil) |
| if err != nil { |
| return -1, -1, true, err |
| } |
| DebugLogger.Printf("txnmgr/init-producer-id [%s] successful init producer id %+v\n", |
| t.transactionalID, response) |
| return response.ProducerID, response.ProducerEpoch, false, nil |
| } |
| switch response.Err { |
| // Retriable errors |
| case ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer, ErrOffsetsLoadInProgress: |
| if t.isTransactional() { |
| _ = coordinator.Close() |
| _ = t.client.RefreshTransactionCoordinator(t.transactionalID) |
| } |
| // Fatal errors |
| default: |
| return -1, -1, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err) |
| } |
| return -1, -1, true, response.Err |
| }, nil) |
| } |
| |
| // if kafka cluster is at least 2.5.0 mark txnmngr to bump epoch else mark it as fatal. |
| func (t *transactionManager) abortableErrorIfPossible(err error) error { |
| if t.coordinatorSupportsBumpingEpoch { |
| t.epochBumpRequired = true |
| return t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err) |
| } |
| return t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err) |
| } |
| |
| // End current transaction. |
| func (t *transactionManager) completeTransaction() error { |
| if t.epochBumpRequired { |
| err := t.transitionTo(ProducerTxnFlagInitializing, nil) |
| if err != nil { |
| return err |
| } |
| } else { |
| err := t.transitionTo(ProducerTxnFlagReady, nil) |
| if err != nil { |
| return err |
| } |
| } |
| |
| t.lastError = nil |
| t.epochBumpRequired = false |
| t.partitionsInCurrentTxn = topicPartitionSet{} |
| t.pendingPartitionsInCurrentTxn = topicPartitionSet{} |
| t.offsetsInCurrentTxn = map[string]topicPartitionOffsets{} |
| |
| return nil |
| } |
| |
| // send EndTxn request with commit flag. (true when committing false otherwise) |
| func (t *transactionManager) endTxn(commit bool) error { |
| attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max |
| exec := func(run func() (bool, error), err error) error { |
| for attemptsRemaining >= 0 { |
| var retry bool |
| retry, err = run() |
| if !retry { |
| return err |
| } |
| backoff := t.computeBackoff(attemptsRemaining) |
| Logger.Printf("txnmgr/endtxn [%s] retrying after %dms... (%d attempts remaining) (%s)\n", |
| t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err) |
| time.Sleep(backoff) |
| attemptsRemaining-- |
| } |
| return err |
| } |
| return exec(func() (bool, error) { |
| coordinator, err := t.client.TransactionCoordinator(t.transactionalID) |
| if err != nil { |
| return true, err |
| } |
| request := &EndTxnRequest{ |
| TransactionalID: t.transactionalID, |
| ProducerEpoch: t.producerEpoch, |
| ProducerID: t.producerID, |
| TransactionResult: commit, |
| } |
| if t.client.Config().Version.IsAtLeast(V2_7_0_0) { |
| // Version 2 adds the support for new error code PRODUCER_FENCED. |
| request.Version = 2 |
| } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) { |
| // Version 1 is the same as version 0. |
| request.Version = 1 |
| } |
| response, err := coordinator.EndTxn(request) |
| if err != nil { |
| // Always retry on network error |
| _ = coordinator.Close() |
| _ = t.client.RefreshTransactionCoordinator(t.transactionalID) |
| return true, err |
| } |
| if response == nil { |
| return true, ErrTxnUnableToParseResponse |
| } |
| if response.Err == ErrNoError { |
| DebugLogger.Printf("txnmgr/endtxn [%s] successful to end txn %+v\n", |
| t.transactionalID, response) |
| return false, t.completeTransaction() |
| } |
| switch response.Err { |
| // Need to refresh coordinator |
| case ErrConsumerCoordinatorNotAvailable: |
| fallthrough |
| case ErrNotCoordinatorForConsumer: |
| _ = coordinator.Close() |
| _ = t.client.RefreshTransactionCoordinator(t.transactionalID) |
| fallthrough |
| case ErrOffsetsLoadInProgress: |
| fallthrough |
| case ErrConcurrentTransactions: |
| // Just retry |
| case ErrUnknownProducerID: |
| fallthrough |
| case ErrInvalidProducerIDMapping: |
| return false, t.abortableErrorIfPossible(response.Err) |
| // Fatal errors |
| default: |
| return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err) |
| } |
| return true, response.Err |
| }, nil) |
| } |
| |
| // We will try to publish associated offsets for each groups |
| // then send endtxn request to mark transaction as finished. |
| func (t *transactionManager) finishTransaction(commit bool) error { |
| t.mutex.Lock() |
| defer t.mutex.Unlock() |
| |
| // Ensure no error when committing or aborting |
| if commit && t.currentTxnStatus()&ProducerTxnFlagInError != 0 { |
| return t.lastError |
| } else if !commit && t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 { |
| return t.lastError |
| } |
| |
| // if no records has been sent don't do anything. |
| if len(t.partitionsInCurrentTxn) == 0 { |
| return t.completeTransaction() |
| } |
| |
| epochBump := t.epochBumpRequired |
| // If we're aborting the transaction, so there should be no need to add offsets. |
| if commit && len(t.offsetsInCurrentTxn) > 0 { |
| for group, offsets := range t.offsetsInCurrentTxn { |
| newOffsets, err := t.publishOffsetsToTxn(offsets, group) |
| if err != nil { |
| t.offsetsInCurrentTxn[group] = newOffsets |
| return err |
| } |
| delete(t.offsetsInCurrentTxn, group) |
| } |
| } |
| |
| if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 { |
| return t.lastError |
| } |
| |
| if !errors.Is(t.lastError, ErrInvalidProducerIDMapping) { |
| err := t.endTxn(commit) |
| if err != nil { |
| return err |
| } |
| if !epochBump { |
| return nil |
| } |
| } |
| // reset pid and epoch if needed. |
| return t.initializeTransactions() |
| } |
| |
| // called before sending any transactional record |
| // won't do anything if current topic-partition is already added to transaction. |
| func (t *transactionManager) maybeAddPartitionToCurrentTxn(topic string, partition int32) { |
| if t.currentTxnStatus()&ProducerTxnFlagInError != 0 { |
| return |
| } |
| |
| tp := topicPartition{topic: topic, partition: partition} |
| |
| t.partitionInTxnLock.Lock() |
| defer t.partitionInTxnLock.Unlock() |
| if _, ok := t.partitionsInCurrentTxn[tp]; ok { |
| // partition is already added |
| return |
| } |
| |
| t.pendingPartitionsInCurrentTxn[tp] = struct{}{} |
| } |
| |
| // Makes a request to kafka to add a list of partitions ot the current transaction. |
| func (t *transactionManager) publishTxnPartitions() error { |
| t.partitionInTxnLock.Lock() |
| defer t.partitionInTxnLock.Unlock() |
| |
| if t.currentTxnStatus()&ProducerTxnFlagInError != 0 { |
| return t.lastError |
| } |
| |
| if len(t.pendingPartitionsInCurrentTxn) == 0 { |
| return nil |
| } |
| |
| // Remove the partitions from the pending set regardless of the result. We use the presence |
| // of partitions in the pending set to know when it is not safe to send batches. However, if |
| // the partitions failed to be added and we enter an error state, we expect the batches to be |
| // aborted anyway. In this case, we must be able to continue sending the batches which are in |
| // retry for partitions that were successfully added. |
| removeAllPartitionsOnFatalOrAbortedError := func() { |
| t.pendingPartitionsInCurrentTxn = topicPartitionSet{} |
| } |
| |
| // We only want to reduce the backoff when retrying the first AddPartition which errored out due to a |
| // CONCURRENT_TRANSACTIONS error since this means that the previous transaction is still completing and |
| // we don't want to wait too long before trying to start the new one. |
| // |
| // This is only a temporary fix, the long term solution is being tracked in |
| // https://issues.apache.org/jira/browse/KAFKA-5482 |
| retryBackoff := t.client.Config().Producer.Transaction.Retry.Backoff |
| computeBackoff := func(attemptsRemaining int) time.Duration { |
| if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil { |
| maxRetries := t.client.Config().Producer.Transaction.Retry.Max |
| retries := maxRetries - attemptsRemaining |
| return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries) |
| } |
| return retryBackoff |
| } |
| attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max |
| |
| exec := func(run func() (bool, error), err error) error { |
| for attemptsRemaining >= 0 { |
| var retry bool |
| retry, err = run() |
| if !retry { |
| return err |
| } |
| backoff := computeBackoff(attemptsRemaining) |
| Logger.Printf("txnmgr/add-partition-to-txn retrying after %dms... (%d attempts remaining) (%s)\n", backoff/time.Millisecond, attemptsRemaining, err) |
| time.Sleep(backoff) |
| attemptsRemaining-- |
| } |
| return err |
| } |
| return exec(func() (bool, error) { |
| coordinator, err := t.client.TransactionCoordinator(t.transactionalID) |
| if err != nil { |
| return true, err |
| } |
| request := &AddPartitionsToTxnRequest{ |
| TransactionalID: t.transactionalID, |
| ProducerID: t.producerID, |
| ProducerEpoch: t.producerEpoch, |
| TopicPartitions: t.pendingPartitionsInCurrentTxn.mapToRequest(), |
| } |
| if t.client.Config().Version.IsAtLeast(V2_7_0_0) { |
| // Version 2 adds the support for new error code PRODUCER_FENCED. |
| request.Version = 2 |
| } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) { |
| // Version 1 is the same as version 0. |
| request.Version = 1 |
| } |
| addPartResponse, err := coordinator.AddPartitionsToTxn(request) |
| if err != nil { |
| _ = coordinator.Close() |
| _ = t.client.RefreshTransactionCoordinator(t.transactionalID) |
| return true, err |
| } |
| |
| if addPartResponse == nil { |
| return true, ErrTxnUnableToParseResponse |
| } |
| |
| // remove from the list partitions that have been successfully updated |
| var responseErrors []error |
| for topic, results := range addPartResponse.Errors { |
| for _, response := range results { |
| tp := topicPartition{topic: topic, partition: response.Partition} |
| switch response.Err { |
| case ErrNoError: |
| // Mark partition as added to transaction |
| t.partitionsInCurrentTxn[tp] = struct{}{} |
| delete(t.pendingPartitionsInCurrentTxn, tp) |
| continue |
| case ErrConsumerCoordinatorNotAvailable: |
| fallthrough |
| case ErrNotCoordinatorForConsumer: |
| _ = coordinator.Close() |
| _ = t.client.RefreshTransactionCoordinator(t.transactionalID) |
| fallthrough |
| case ErrUnknownTopicOrPartition: |
| fallthrough |
| case ErrOffsetsLoadInProgress: |
| // Retry topicPartition |
| case ErrConcurrentTransactions: |
| if len(t.partitionsInCurrentTxn) == 0 && retryBackoff > addPartitionsRetryBackoff { |
| retryBackoff = addPartitionsRetryBackoff |
| } |
| case ErrOperationNotAttempted: |
| fallthrough |
| case ErrTopicAuthorizationFailed: |
| removeAllPartitionsOnFatalOrAbortedError() |
| return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err) |
| case ErrUnknownProducerID: |
| fallthrough |
| case ErrInvalidProducerIDMapping: |
| removeAllPartitionsOnFatalOrAbortedError() |
| return false, t.abortableErrorIfPossible(response.Err) |
| // Fatal errors |
| default: |
| removeAllPartitionsOnFatalOrAbortedError() |
| return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err) |
| } |
| responseErrors = append(responseErrors, response.Err) |
| } |
| } |
| |
| // handle end |
| if len(t.pendingPartitionsInCurrentTxn) == 0 { |
| DebugLogger.Printf("txnmgr/add-partition-to-txn [%s] successful to add partitions txn %+v\n", |
| t.transactionalID, addPartResponse) |
| return false, nil |
| } |
| return true, Wrap(ErrAddPartitionsToTxn, responseErrors...) |
| }, nil) |
| } |
| |
| // Build a new transaction manager sharing producer client. |
| func newTransactionManager(conf *Config, client Client) (*transactionManager, error) { |
| txnmgr := &transactionManager{ |
| producerID: noProducerID, |
| producerEpoch: noProducerEpoch, |
| client: client, |
| pendingPartitionsInCurrentTxn: topicPartitionSet{}, |
| partitionsInCurrentTxn: topicPartitionSet{}, |
| offsetsInCurrentTxn: make(map[string]topicPartitionOffsets), |
| status: ProducerTxnFlagUninitialized, |
| } |
| |
| if conf.Producer.Idempotent { |
| txnmgr.transactionalID = conf.Producer.Transaction.ID |
| txnmgr.transactionTimeout = conf.Producer.Transaction.Timeout |
| txnmgr.sequenceNumbers = make(map[string]int32) |
| txnmgr.mutex = sync.Mutex{} |
| |
| var err error |
| txnmgr.producerID, txnmgr.producerEpoch, err = txnmgr.initProducerId() |
| if err != nil { |
| return nil, err |
| } |
| Logger.Printf("txnmgr/init-producer-id [%s] obtained a ProducerId: %d and ProducerEpoch: %d\n", |
| txnmgr.transactionalID, txnmgr.producerID, txnmgr.producerEpoch) |
| } |
| |
| return txnmgr, nil |
| } |
| |
| // re-init producer-id and producer-epoch if needed. |
| func (t *transactionManager) initializeTransactions() (err error) { |
| t.producerID, t.producerEpoch, err = t.initProducerId() |
| return |
| } |