[VOL-5486] Upgrade library versions

Change-Id: I8b4e88699e03f44ee13e467867f45ae3f0a63c4b
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/github.com/IBM/sarama/transaction_manager.go b/vendor/github.com/IBM/sarama/transaction_manager.go
new file mode 100644
index 0000000..bf20b75
--- /dev/null
+++ b/vendor/github.com/IBM/sarama/transaction_manager.go
@@ -0,0 +1,930 @@
+package sarama
+
+import (
+	"errors"
+	"fmt"
+	"strings"
+	"sync"
+	"time"
+)
+
+// ProducerTxnStatusFlag mark current transaction status.
+type ProducerTxnStatusFlag int16
+
+const (
+	// ProducerTxnFlagUninitialized when txnmgr is created
+	ProducerTxnFlagUninitialized ProducerTxnStatusFlag = 1 << iota
+	// ProducerTxnFlagInitializing when txnmgr is initializing
+	ProducerTxnFlagInitializing
+	// ProducerTxnFlagReady when is ready to receive transaction
+	ProducerTxnFlagReady
+	// ProducerTxnFlagInTransaction when transaction is started
+	ProducerTxnFlagInTransaction
+	// ProducerTxnFlagEndTransaction when transaction will be committed
+	ProducerTxnFlagEndTransaction
+	// ProducerTxnFlagInError when having abortable or fatal error
+	ProducerTxnFlagInError
+	// ProducerTxnFlagCommittingTransaction when committing txn
+	ProducerTxnFlagCommittingTransaction
+	// ProducerTxnFlagAbortingTransaction when committing txn
+	ProducerTxnFlagAbortingTransaction
+	// ProducerTxnFlagAbortableError when producer encounter an abortable error
+	// Must call AbortTxn in this case.
+	ProducerTxnFlagAbortableError
+	// ProducerTxnFlagFatalError when producer encounter an fatal error
+	// Must Close an recreate it.
+	ProducerTxnFlagFatalError
+)
+
+func (s ProducerTxnStatusFlag) String() string {
+	status := make([]string, 0)
+	if s&ProducerTxnFlagUninitialized != 0 {
+		status = append(status, "ProducerTxnStateUninitialized")
+	}
+	if s&ProducerTxnFlagInitializing != 0 {
+		status = append(status, "ProducerTxnStateInitializing")
+	}
+	if s&ProducerTxnFlagReady != 0 {
+		status = append(status, "ProducerTxnStateReady")
+	}
+	if s&ProducerTxnFlagInTransaction != 0 {
+		status = append(status, "ProducerTxnStateInTransaction")
+	}
+	if s&ProducerTxnFlagEndTransaction != 0 {
+		status = append(status, "ProducerTxnStateEndTransaction")
+	}
+	if s&ProducerTxnFlagInError != 0 {
+		status = append(status, "ProducerTxnStateInError")
+	}
+	if s&ProducerTxnFlagCommittingTransaction != 0 {
+		status = append(status, "ProducerTxnStateCommittingTransaction")
+	}
+	if s&ProducerTxnFlagAbortingTransaction != 0 {
+		status = append(status, "ProducerTxnStateAbortingTransaction")
+	}
+	if s&ProducerTxnFlagAbortableError != 0 {
+		status = append(status, "ProducerTxnStateAbortableError")
+	}
+	if s&ProducerTxnFlagFatalError != 0 {
+		status = append(status, "ProducerTxnStateFatalError")
+	}
+	return strings.Join(status, "|")
+}
+
+// transactionManager keeps the state necessary to ensure idempotent production
+type transactionManager struct {
+	producerID         int64
+	producerEpoch      int16
+	sequenceNumbers    map[string]int32
+	mutex              sync.Mutex
+	transactionalID    string
+	transactionTimeout time.Duration
+	client             Client
+
+	// when kafka cluster is at least 2.5.0.
+	// used to recover when producer failed.
+	coordinatorSupportsBumpingEpoch bool
+
+	// When producer need to bump it's epoch.
+	epochBumpRequired bool
+	// Record last seen error.
+	lastError error
+
+	// Ensure that status is never accessed with a race-condition.
+	statusLock sync.RWMutex
+	status     ProducerTxnStatusFlag
+
+	// Ensure that only one goroutine will update partitions in current transaction.
+	partitionInTxnLock            sync.Mutex
+	pendingPartitionsInCurrentTxn topicPartitionSet
+	partitionsInCurrentTxn        topicPartitionSet
+
+	// Offsets to add to transaction.
+	offsetsInCurrentTxn map[string]topicPartitionOffsets
+}
+
+const (
+	noProducerID    = -1
+	noProducerEpoch = -1
+
+	// see publishTxnPartitions comment.
+	addPartitionsRetryBackoff = 20 * time.Millisecond
+)
+
+// txnmngr allowed transitions.
+var producerTxnTransitions = map[ProducerTxnStatusFlag][]ProducerTxnStatusFlag{
+	ProducerTxnFlagUninitialized: {
+		ProducerTxnFlagReady,
+		ProducerTxnFlagInError,
+	},
+	// When we need are initializing
+	ProducerTxnFlagInitializing: {
+		ProducerTxnFlagInitializing,
+		ProducerTxnFlagReady,
+		ProducerTxnFlagInError,
+	},
+	// When we have initialized transactional producer
+	ProducerTxnFlagReady: {
+		ProducerTxnFlagInTransaction,
+	},
+	// When beginTxn has been called
+	ProducerTxnFlagInTransaction: {
+		// When calling commit or abort
+		ProducerTxnFlagEndTransaction,
+		// When got an error
+		ProducerTxnFlagInError,
+	},
+	ProducerTxnFlagEndTransaction: {
+		// When epoch bump
+		ProducerTxnFlagInitializing,
+		// When commit is good
+		ProducerTxnFlagReady,
+		// When got an error
+		ProducerTxnFlagInError,
+	},
+	// Need to abort transaction
+	ProducerTxnFlagAbortableError: {
+		// Call AbortTxn
+		ProducerTxnFlagAbortingTransaction,
+		// When got an error
+		ProducerTxnFlagInError,
+	},
+	// Need to close producer
+	ProducerTxnFlagFatalError: {
+		ProducerTxnFlagFatalError,
+	},
+}
+
+type topicPartition struct {
+	topic     string
+	partition int32
+}
+
+// to ensure that we don't do a full scan every time a partition or an offset is added.
+type (
+	topicPartitionSet     map[topicPartition]struct{}
+	topicPartitionOffsets map[topicPartition]*PartitionOffsetMetadata
+)
+
+func (s topicPartitionSet) mapToRequest() map[string][]int32 {
+	result := make(map[string][]int32, len(s))
+	for tp := range s {
+		result[tp.topic] = append(result[tp.topic], tp.partition)
+	}
+	return result
+}
+
+func (s topicPartitionOffsets) mapToRequest() map[string][]*PartitionOffsetMetadata {
+	result := make(map[string][]*PartitionOffsetMetadata, len(s))
+	for tp, offset := range s {
+		result[tp.topic] = append(result[tp.topic], offset)
+	}
+	return result
+}
+
+// Return true if current transition is allowed.
+func (t *transactionManager) isTransitionValid(target ProducerTxnStatusFlag) bool {
+	for status, allowedTransitions := range producerTxnTransitions {
+		if status&t.status != 0 {
+			for _, allowedTransition := range allowedTransitions {
+				if allowedTransition&target != 0 {
+					return true
+				}
+			}
+		}
+	}
+	return false
+}
+
+// Get current transaction status.
+func (t *transactionManager) currentTxnStatus() ProducerTxnStatusFlag {
+	t.statusLock.RLock()
+	defer t.statusLock.RUnlock()
+
+	return t.status
+}
+
+// Try to transition to a valid status and return an error otherwise.
+func (t *transactionManager) transitionTo(target ProducerTxnStatusFlag, err error) error {
+	t.statusLock.Lock()
+	defer t.statusLock.Unlock()
+
+	if !t.isTransitionValid(target) {
+		return ErrTransitionNotAllowed
+	}
+
+	if target&ProducerTxnFlagInError != 0 {
+		if err == nil {
+			return ErrCannotTransitionNilError
+		}
+		t.lastError = err
+	} else {
+		t.lastError = nil
+	}
+
+	DebugLogger.Printf("txnmgr/transition [%s] transition from %s to %s\n", t.transactionalID, t.status, target)
+
+	t.status = target
+	return err
+}
+
+func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
+	key := fmt.Sprintf("%s-%d", topic, partition)
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	sequence := t.sequenceNumbers[key]
+	t.sequenceNumbers[key] = sequence + 1
+	return sequence, t.producerEpoch
+}
+
+func (t *transactionManager) bumpEpoch() {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	t.producerEpoch++
+	for k := range t.sequenceNumbers {
+		t.sequenceNumbers[k] = 0
+	}
+}
+
+func (t *transactionManager) getProducerID() (int64, int16) {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	return t.producerID, t.producerEpoch
+}
+
+// Compute retry backoff considered current attempts.
+func (t *transactionManager) computeBackoff(attemptsRemaining int) time.Duration {
+	if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
+		maxRetries := t.client.Config().Producer.Transaction.Retry.Max
+		retries := maxRetries - attemptsRemaining
+		return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
+	}
+	return t.client.Config().Producer.Transaction.Retry.Backoff
+}
+
+// return true is txnmngr is transactinal.
+func (t *transactionManager) isTransactional() bool {
+	return t.transactionalID != ""
+}
+
+// add specified offsets to current transaction.
+func (t *transactionManager) addOffsetsToTxn(offsetsToAdd map[string][]*PartitionOffsetMetadata, groupId string) error {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+
+	if t.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 {
+		return ErrTransactionNotReady
+	}
+
+	if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
+		return t.lastError
+	}
+
+	if _, ok := t.offsetsInCurrentTxn[groupId]; !ok {
+		t.offsetsInCurrentTxn[groupId] = topicPartitionOffsets{}
+	}
+
+	for topic, offsets := range offsetsToAdd {
+		for _, offset := range offsets {
+			tp := topicPartition{topic: topic, partition: offset.Partition}
+			t.offsetsInCurrentTxn[groupId][tp] = offset
+		}
+	}
+	return nil
+}
+
+// send txnmgnr save offsets to transaction coordinator.
+func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets, groupId string) (topicPartitionOffsets, error) {
+	// First AddOffsetsToTxn
+	attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
+	exec := func(run func() (bool, error), err error) error {
+		for attemptsRemaining >= 0 {
+			var retry bool
+			retry, err = run()
+			if !retry {
+				return err
+			}
+			backoff := t.computeBackoff(attemptsRemaining)
+			Logger.Printf("txnmgr/add-offset-to-txn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
+				t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
+			time.Sleep(backoff)
+			attemptsRemaining--
+		}
+		return err
+	}
+	lastError := exec(func() (bool, error) {
+		coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
+		if err != nil {
+			return true, err
+		}
+		request := &AddOffsetsToTxnRequest{
+			TransactionalID: t.transactionalID,
+			ProducerEpoch:   t.producerEpoch,
+			ProducerID:      t.producerID,
+			GroupID:         groupId,
+		}
+		if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
+			// Version 2 adds the support for new error code PRODUCER_FENCED.
+			request.Version = 2
+		} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
+			// Version 1 is the same as version 0.
+			request.Version = 1
+		}
+		response, err := coordinator.AddOffsetsToTxn(request)
+		if err != nil {
+			// If an error occurred try to refresh current transaction coordinator.
+			_ = coordinator.Close()
+			_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
+			return true, err
+		}
+		if response == nil {
+			// If no response is returned just retry.
+			return true, ErrTxnUnableToParseResponse
+		}
+		if response.Err == ErrNoError {
+			DebugLogger.Printf("txnmgr/add-offset-to-txn [%s] successful add-offset-to-txn with group %s %+v\n",
+				t.transactionalID, groupId, response)
+			// If no error, just exit.
+			return false, nil
+		}
+		switch response.Err {
+		case ErrConsumerCoordinatorNotAvailable:
+			fallthrough
+		case ErrNotCoordinatorForConsumer:
+			_ = coordinator.Close()
+			_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
+			fallthrough
+		case ErrOffsetsLoadInProgress:
+			fallthrough
+		case ErrConcurrentTransactions:
+			// Retry
+		case ErrUnknownProducerID:
+			fallthrough
+		case ErrInvalidProducerIDMapping:
+			return false, t.abortableErrorIfPossible(response.Err)
+		case ErrGroupAuthorizationFailed:
+			return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
+		default:
+			// Others are fatal
+			return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
+		}
+		return true, response.Err
+	}, nil)
+
+	if lastError != nil {
+		return offsets, lastError
+	}
+
+	resultOffsets := offsets
+	// Then TxnOffsetCommit
+	// note the result is not completed until the TxnOffsetCommit returns
+	attemptsRemaining = t.client.Config().Producer.Transaction.Retry.Max
+	execTxnOffsetCommit := func(run func() (topicPartitionOffsets, bool, error), err error) (topicPartitionOffsets, error) {
+		var r topicPartitionOffsets
+		for attemptsRemaining >= 0 {
+			var retry bool
+			r, retry, err = run()
+			if !retry {
+				return r, err
+			}
+			backoff := t.computeBackoff(attemptsRemaining)
+			Logger.Printf("txnmgr/txn-offset-commit [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
+				t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
+			time.Sleep(backoff)
+			attemptsRemaining--
+		}
+		return r, err
+	}
+	return execTxnOffsetCommit(func() (topicPartitionOffsets, bool, error) {
+		consumerGroupCoordinator, err := t.client.Coordinator(groupId)
+		if err != nil {
+			return resultOffsets, true, err
+		}
+		request := &TxnOffsetCommitRequest{
+			TransactionalID: t.transactionalID,
+			ProducerEpoch:   t.producerEpoch,
+			ProducerID:      t.producerID,
+			GroupID:         groupId,
+			Topics:          offsets.mapToRequest(),
+		}
+		if t.client.Config().Version.IsAtLeast(V2_1_0_0) {
+			// Version 2 adds the committed leader epoch.
+			request.Version = 2
+		} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
+			// Version 1 is the same as version 0.
+			request.Version = 1
+		}
+		responses, err := consumerGroupCoordinator.TxnOffsetCommit(request)
+		if err != nil {
+			_ = consumerGroupCoordinator.Close()
+			_ = t.client.RefreshCoordinator(groupId)
+			return resultOffsets, true, err
+		}
+
+		if responses == nil {
+			return resultOffsets, true, ErrTxnUnableToParseResponse
+		}
+
+		var responseErrors []error
+		failedTxn := topicPartitionOffsets{}
+		for topic, partitionErrors := range responses.Topics {
+			for _, partitionError := range partitionErrors {
+				switch partitionError.Err {
+				case ErrNoError:
+					continue
+				// If the topic is unknown or the coordinator is loading, retry with the current coordinator
+				case ErrRequestTimedOut:
+					fallthrough
+				case ErrConsumerCoordinatorNotAvailable:
+					fallthrough
+				case ErrNotCoordinatorForConsumer:
+					_ = consumerGroupCoordinator.Close()
+					_ = t.client.RefreshCoordinator(groupId)
+					fallthrough
+				case ErrUnknownTopicOrPartition:
+					fallthrough
+				case ErrOffsetsLoadInProgress:
+					// Do nothing just retry
+				case ErrIllegalGeneration:
+					fallthrough
+				case ErrUnknownMemberId:
+					fallthrough
+				case ErrFencedInstancedId:
+					fallthrough
+				case ErrGroupAuthorizationFailed:
+					return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, partitionError.Err)
+				default:
+					// Others are fatal
+					return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, partitionError.Err)
+				}
+				tp := topicPartition{topic: topic, partition: partitionError.Partition}
+				failedTxn[tp] = offsets[tp]
+				responseErrors = append(responseErrors, partitionError.Err)
+			}
+		}
+
+		resultOffsets = failedTxn
+
+		if len(resultOffsets) == 0 {
+			DebugLogger.Printf("txnmgr/txn-offset-commit [%s] successful txn-offset-commit with group %s\n",
+				t.transactionalID, groupId)
+			return resultOffsets, false, nil
+		}
+		return resultOffsets, true, Wrap(ErrTxnOffsetCommit, responseErrors...)
+	}, nil)
+}
+
+func (t *transactionManager) initProducerId() (int64, int16, error) {
+	isEpochBump := false
+
+	req := &InitProducerIDRequest{}
+	if t.isTransactional() {
+		req.TransactionalID = &t.transactionalID
+		req.TransactionTimeout = t.transactionTimeout
+	}
+
+	if t.client.Config().Version.IsAtLeast(V2_5_0_0) {
+		if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
+			// Version 4 adds the support for new error code PRODUCER_FENCED.
+			req.Version = 4
+		} else {
+			// Version 3 adds ProducerId and ProducerEpoch, allowing producers to try
+			// to resume after an INVALID_PRODUCER_EPOCH error
+			req.Version = 3
+		}
+		isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch
+		t.coordinatorSupportsBumpingEpoch = true
+		req.ProducerID = t.producerID
+		req.ProducerEpoch = t.producerEpoch
+	} else if t.client.Config().Version.IsAtLeast(V2_4_0_0) {
+		// Version 2 is the first flexible version.
+		req.Version = 2
+	} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
+		// Version 1 is the same as version 0.
+		req.Version = 1
+	}
+
+	if isEpochBump {
+		err := t.transitionTo(ProducerTxnFlagInitializing, nil)
+		if err != nil {
+			return -1, -1, err
+		}
+		DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId for the first time in order to acquire a producer ID\n",
+			t.transactionalID)
+	} else {
+		DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId with current producer ID %d and epoch %d in order to bump the epoch\n",
+			t.transactionalID, t.producerID, t.producerEpoch)
+	}
+
+	attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
+	exec := func(run func() (int64, int16, bool, error), err error) (int64, int16, error) {
+		pid := int64(-1)
+		pepoch := int16(-1)
+		for attemptsRemaining >= 0 {
+			var retry bool
+			pid, pepoch, retry, err = run()
+			if !retry {
+				return pid, pepoch, err
+			}
+			backoff := t.computeBackoff(attemptsRemaining)
+			Logger.Printf("txnmgr/init-producer-id [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
+				t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
+			time.Sleep(backoff)
+			attemptsRemaining--
+		}
+		return -1, -1, err
+	}
+	return exec(func() (int64, int16, bool, error) {
+		var err error
+		var coordinator *Broker
+		if t.isTransactional() {
+			coordinator, err = t.client.TransactionCoordinator(t.transactionalID)
+		} else {
+			coordinator = t.client.LeastLoadedBroker()
+		}
+		if err != nil {
+			return -1, -1, true, err
+		}
+		response, err := coordinator.InitProducerID(req)
+		if err != nil {
+			if t.isTransactional() {
+				_ = coordinator.Close()
+				_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
+			}
+			return -1, -1, true, err
+		}
+		if response == nil {
+			return -1, -1, true, ErrTxnUnableToParseResponse
+		}
+		if response.Err == ErrNoError {
+			if isEpochBump {
+				t.sequenceNumbers = make(map[string]int32)
+			}
+			err := t.transitionTo(ProducerTxnFlagReady, nil)
+			if err != nil {
+				return -1, -1, true, err
+			}
+			DebugLogger.Printf("txnmgr/init-producer-id [%s] successful init producer id %+v\n",
+				t.transactionalID, response)
+			return response.ProducerID, response.ProducerEpoch, false, nil
+		}
+		switch response.Err {
+		// Retriable errors
+		case ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer, ErrOffsetsLoadInProgress:
+			if t.isTransactional() {
+				_ = coordinator.Close()
+				_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
+			}
+		// Fatal errors
+		default:
+			return -1, -1, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
+		}
+		return -1, -1, true, response.Err
+	}, nil)
+}
+
+// if kafka cluster is at least 2.5.0 mark txnmngr to bump epoch else mark it as fatal.
+func (t *transactionManager) abortableErrorIfPossible(err error) error {
+	if t.coordinatorSupportsBumpingEpoch {
+		t.epochBumpRequired = true
+		return t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
+	}
+	return t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
+}
+
+// End current transaction.
+func (t *transactionManager) completeTransaction() error {
+	if t.epochBumpRequired {
+		err := t.transitionTo(ProducerTxnFlagInitializing, nil)
+		if err != nil {
+			return err
+		}
+	} else {
+		err := t.transitionTo(ProducerTxnFlagReady, nil)
+		if err != nil {
+			return err
+		}
+	}
+
+	t.lastError = nil
+	t.epochBumpRequired = false
+	t.partitionsInCurrentTxn = topicPartitionSet{}
+	t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
+	t.offsetsInCurrentTxn = map[string]topicPartitionOffsets{}
+
+	return nil
+}
+
+// send EndTxn request with commit flag. (true when committing false otherwise)
+func (t *transactionManager) endTxn(commit bool) error {
+	attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
+	exec := func(run func() (bool, error), err error) error {
+		for attemptsRemaining >= 0 {
+			var retry bool
+			retry, err = run()
+			if !retry {
+				return err
+			}
+			backoff := t.computeBackoff(attemptsRemaining)
+			Logger.Printf("txnmgr/endtxn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
+				t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
+			time.Sleep(backoff)
+			attemptsRemaining--
+		}
+		return err
+	}
+	return exec(func() (bool, error) {
+		coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
+		if err != nil {
+			return true, err
+		}
+		request := &EndTxnRequest{
+			TransactionalID:   t.transactionalID,
+			ProducerEpoch:     t.producerEpoch,
+			ProducerID:        t.producerID,
+			TransactionResult: commit,
+		}
+		if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
+			// Version 2 adds the support for new error code PRODUCER_FENCED.
+			request.Version = 2
+		} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
+			// Version 1 is the same as version 0.
+			request.Version = 1
+		}
+		response, err := coordinator.EndTxn(request)
+		if err != nil {
+			// Always retry on network error
+			_ = coordinator.Close()
+			_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
+			return true, err
+		}
+		if response == nil {
+			return true, ErrTxnUnableToParseResponse
+		}
+		if response.Err == ErrNoError {
+			DebugLogger.Printf("txnmgr/endtxn [%s] successful to end txn %+v\n",
+				t.transactionalID, response)
+			return false, t.completeTransaction()
+		}
+		switch response.Err {
+		// Need to refresh coordinator
+		case ErrConsumerCoordinatorNotAvailable:
+			fallthrough
+		case ErrNotCoordinatorForConsumer:
+			_ = coordinator.Close()
+			_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
+			fallthrough
+		case ErrOffsetsLoadInProgress:
+			fallthrough
+		case ErrConcurrentTransactions:
+			// Just retry
+		case ErrUnknownProducerID:
+			fallthrough
+		case ErrInvalidProducerIDMapping:
+			return false, t.abortableErrorIfPossible(response.Err)
+		// Fatal errors
+		default:
+			return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
+		}
+		return true, response.Err
+	}, nil)
+}
+
+// We will try to publish associated offsets for each groups
+// then send endtxn request to mark transaction as finished.
+func (t *transactionManager) finishTransaction(commit bool) error {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+
+	// Ensure no error when committing or aborting
+	if commit && t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
+		return t.lastError
+	} else if !commit && t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
+		return t.lastError
+	}
+
+	// if no records has been sent don't do anything.
+	if len(t.partitionsInCurrentTxn) == 0 {
+		return t.completeTransaction()
+	}
+
+	epochBump := t.epochBumpRequired
+	// If we're aborting the transaction, so there should be no need to add offsets.
+	if commit && len(t.offsetsInCurrentTxn) > 0 {
+		for group, offsets := range t.offsetsInCurrentTxn {
+			newOffsets, err := t.publishOffsetsToTxn(offsets, group)
+			if err != nil {
+				t.offsetsInCurrentTxn[group] = newOffsets
+				return err
+			}
+			delete(t.offsetsInCurrentTxn, group)
+		}
+	}
+
+	if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
+		return t.lastError
+	}
+
+	if !errors.Is(t.lastError, ErrInvalidProducerIDMapping) {
+		err := t.endTxn(commit)
+		if err != nil {
+			return err
+		}
+		if !epochBump {
+			return nil
+		}
+	}
+	// reset pid and epoch if needed.
+	return t.initializeTransactions()
+}
+
+// called before sending any transactional record
+// won't do anything if current topic-partition is already added to transaction.
+func (t *transactionManager) maybeAddPartitionToCurrentTxn(topic string, partition int32) {
+	if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
+		return
+	}
+
+	tp := topicPartition{topic: topic, partition: partition}
+
+	t.partitionInTxnLock.Lock()
+	defer t.partitionInTxnLock.Unlock()
+	if _, ok := t.partitionsInCurrentTxn[tp]; ok {
+		// partition is already added
+		return
+	}
+
+	t.pendingPartitionsInCurrentTxn[tp] = struct{}{}
+}
+
+// Makes a request to kafka to add a list of partitions ot the current transaction.
+func (t *transactionManager) publishTxnPartitions() error {
+	t.partitionInTxnLock.Lock()
+	defer t.partitionInTxnLock.Unlock()
+
+	if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
+		return t.lastError
+	}
+
+	if len(t.pendingPartitionsInCurrentTxn) == 0 {
+		return nil
+	}
+
+	// Remove the partitions from the pending set regardless of the result. We use the presence
+	// of partitions in the pending set to know when it is not safe to send batches. However, if
+	// the partitions failed to be added and we enter an error state, we expect the batches to be
+	// aborted anyway. In this case, we must be able to continue sending the batches which are in
+	// retry for partitions that were successfully added.
+	removeAllPartitionsOnFatalOrAbortedError := func() {
+		t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
+	}
+
+	// We only want to reduce the backoff when retrying the first AddPartition which errored out due to a
+	// CONCURRENT_TRANSACTIONS error since this means that the previous transaction is still completing and
+	// we don't want to wait too long before trying to start the new one.
+	//
+	// This is only a temporary fix, the long term solution is being tracked in
+	// https://issues.apache.org/jira/browse/KAFKA-5482
+	retryBackoff := t.client.Config().Producer.Transaction.Retry.Backoff
+	computeBackoff := func(attemptsRemaining int) time.Duration {
+		if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
+			maxRetries := t.client.Config().Producer.Transaction.Retry.Max
+			retries := maxRetries - attemptsRemaining
+			return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
+		}
+		return retryBackoff
+	}
+	attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
+
+	exec := func(run func() (bool, error), err error) error {
+		for attemptsRemaining >= 0 {
+			var retry bool
+			retry, err = run()
+			if !retry {
+				return err
+			}
+			backoff := computeBackoff(attemptsRemaining)
+			Logger.Printf("txnmgr/add-partition-to-txn retrying after %dms... (%d attempts remaining) (%s)\n", backoff/time.Millisecond, attemptsRemaining, err)
+			time.Sleep(backoff)
+			attemptsRemaining--
+		}
+		return err
+	}
+	return exec(func() (bool, error) {
+		coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
+		if err != nil {
+			return true, err
+		}
+		request := &AddPartitionsToTxnRequest{
+			TransactionalID: t.transactionalID,
+			ProducerID:      t.producerID,
+			ProducerEpoch:   t.producerEpoch,
+			TopicPartitions: t.pendingPartitionsInCurrentTxn.mapToRequest(),
+		}
+		if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
+			// Version 2 adds the support for new error code PRODUCER_FENCED.
+			request.Version = 2
+		} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
+			// Version 1 is the same as version 0.
+			request.Version = 1
+		}
+		addPartResponse, err := coordinator.AddPartitionsToTxn(request)
+		if err != nil {
+			_ = coordinator.Close()
+			_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
+			return true, err
+		}
+
+		if addPartResponse == nil {
+			return true, ErrTxnUnableToParseResponse
+		}
+
+		// remove from the list partitions that have been successfully updated
+		var responseErrors []error
+		for topic, results := range addPartResponse.Errors {
+			for _, response := range results {
+				tp := topicPartition{topic: topic, partition: response.Partition}
+				switch response.Err {
+				case ErrNoError:
+					// Mark partition as added to transaction
+					t.partitionsInCurrentTxn[tp] = struct{}{}
+					delete(t.pendingPartitionsInCurrentTxn, tp)
+					continue
+				case ErrConsumerCoordinatorNotAvailable:
+					fallthrough
+				case ErrNotCoordinatorForConsumer:
+					_ = coordinator.Close()
+					_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
+					fallthrough
+				case ErrUnknownTopicOrPartition:
+					fallthrough
+				case ErrOffsetsLoadInProgress:
+					// Retry topicPartition
+				case ErrConcurrentTransactions:
+					if len(t.partitionsInCurrentTxn) == 0 && retryBackoff > addPartitionsRetryBackoff {
+						retryBackoff = addPartitionsRetryBackoff
+					}
+				case ErrOperationNotAttempted:
+					fallthrough
+				case ErrTopicAuthorizationFailed:
+					removeAllPartitionsOnFatalOrAbortedError()
+					return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
+				case ErrUnknownProducerID:
+					fallthrough
+				case ErrInvalidProducerIDMapping:
+					removeAllPartitionsOnFatalOrAbortedError()
+					return false, t.abortableErrorIfPossible(response.Err)
+				// Fatal errors
+				default:
+					removeAllPartitionsOnFatalOrAbortedError()
+					return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
+				}
+				responseErrors = append(responseErrors, response.Err)
+			}
+		}
+
+		// handle end
+		if len(t.pendingPartitionsInCurrentTxn) == 0 {
+			DebugLogger.Printf("txnmgr/add-partition-to-txn [%s] successful to add partitions txn %+v\n",
+				t.transactionalID, addPartResponse)
+			return false, nil
+		}
+		return true, Wrap(ErrAddPartitionsToTxn, responseErrors...)
+	}, nil)
+}
+
+// Build a new transaction manager sharing producer client.
+func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
+	txnmgr := &transactionManager{
+		producerID:                    noProducerID,
+		producerEpoch:                 noProducerEpoch,
+		client:                        client,
+		pendingPartitionsInCurrentTxn: topicPartitionSet{},
+		partitionsInCurrentTxn:        topicPartitionSet{},
+		offsetsInCurrentTxn:           make(map[string]topicPartitionOffsets),
+		status:                        ProducerTxnFlagUninitialized,
+	}
+
+	if conf.Producer.Idempotent {
+		txnmgr.transactionalID = conf.Producer.Transaction.ID
+		txnmgr.transactionTimeout = conf.Producer.Transaction.Timeout
+		txnmgr.sequenceNumbers = make(map[string]int32)
+		txnmgr.mutex = sync.Mutex{}
+
+		var err error
+		txnmgr.producerID, txnmgr.producerEpoch, err = txnmgr.initProducerId()
+		if err != nil {
+			return nil, err
+		}
+		Logger.Printf("txnmgr/init-producer-id [%s] obtained a ProducerId: %d and ProducerEpoch: %d\n",
+			txnmgr.transactionalID, txnmgr.producerID, txnmgr.producerEpoch)
+	}
+
+	return txnmgr, nil
+}
+
+// re-init producer-id and producer-epoch if needed.
+func (t *transactionManager) initializeTransactions() (err error) {
+	t.producerID, t.producerEpoch, err = t.initProducerId()
+	return
+}