[VOL-5486] Upgrade library versions

Change-Id: I8b4e88699e03f44ee13e467867f45ae3f0a63c4b
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/github.com/IBM/sarama/admin.go b/vendor/github.com/IBM/sarama/admin.go
new file mode 100644
index 0000000..dda0201
--- /dev/null
+++ b/vendor/github.com/IBM/sarama/admin.go
@@ -0,0 +1,1395 @@
+package sarama
+
+import (
+	"errors"
+	"fmt"
+	"io"
+	"maps"
+	"math/rand"
+	"strconv"
+	"sync"
+	"time"
+)
+
+// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
+// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
+// Methods with stricter requirements will specify the minimum broker version required.
+// You MUST call Close() on a client to avoid leaks
+type ClusterAdmin interface {
+	// Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
+	// It may take several seconds after CreateTopic returns success for all the brokers
+	// to become aware that the topic has been created. During this time, listTopics
+	// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
+	CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
+
+	// List the topics available in the cluster with the default options.
+	ListTopics() (map[string]TopicDetail, error)
+
+	// Describe some topics in the cluster.
+	DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
+
+	// Delete a topic. It may take several seconds after the DeleteTopic to returns success
+	// and for all the brokers to become aware that the topics are gone.
+	// During this time, listTopics  may continue to return information about the deleted topic.
+	// If delete.topic.enable is false on the brokers, deleteTopic will mark
+	// the topic for deletion, but not actually delete them.
+	// This operation is supported by brokers with version 0.10.1.0 or higher.
+	DeleteTopic(topic string) error
+
+	// Increase the number of partitions of the topics  according to the corresponding values.
+	// If partitions are increased for a topic that has a key, the partition logic or ordering of
+	// the messages will be affected. It may take several seconds after this method returns
+	// success for all the brokers to become aware that the partitions have been created.
+	// During this time, ClusterAdmin#describeTopics may not return information about the
+	// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
+	CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
+
+	// Alter the replica assignment for partitions.
+	// This operation is supported by brokers with version 2.4.0.0 or higher.
+	AlterPartitionReassignments(topic string, assignment [][]int32) error
+
+	// Provides info on ongoing partitions replica reassignments.
+	// This operation is supported by brokers with version 2.4.0.0 or higher.
+	ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
+
+	// Delete records whose offset is smaller than the given offset of the corresponding partition.
+	// This operation is supported by brokers with version 0.11.0.0 or higher.
+	DeleteRecords(topic string, partitionOffsets map[int32]int64) error
+
+	// Get the configuration for the specified resources.
+	// The returned configuration includes default values and the Default is true
+	// can be used to distinguish them from user supplied values.
+	// Config entries where ReadOnly is true cannot be updated.
+	// The value of config entries where Sensitive is true is always nil so
+	// sensitive information is not disclosed.
+	// This operation is supported by brokers with version 0.11.0.0 or higher.
+	DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
+
+	// Update the configuration for the specified resources with the default options.
+	// This operation is supported by brokers with version 0.11.0.0 or higher.
+	// The resources with their configs (topic is the only resource type with configs
+	// that can be updated currently Updates are not transactional so they may succeed
+	// for some resources while fail for others. The configs for a particular resource are updated automatically.
+	AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
+
+	// IncrementalAlterConfig Incrementally Update the configuration for the specified resources with the default options.
+	// This operation is supported by brokers with version 2.3.0.0 or higher.
+	// Updates are not transactional so they may succeed for some resources while fail for others.
+	// The configs for a particular resource are updated automatically.
+	IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error
+
+	// Creates an access control list (ACL) which is bound to a specific resource.
+	// This operation is not transactional so it may succeed or fail.
+	// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
+	// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
+	// Deprecated: Use CreateACLs instead.
+	CreateACL(resource Resource, acl Acl) error
+
+	// Creates access control lists (ACLs) which are bound to specific resources.
+	// This operation is not transactional so it may succeed for some ACLs while fail for others.
+	// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
+	// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
+	CreateACLs([]*ResourceAcls) error
+
+	// Lists access control lists (ACLs) according to the supplied filter.
+	// it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
+	// This operation is supported by brokers with version 0.11.0.0 or higher.
+	ListAcls(filter AclFilter) ([]ResourceAcls, error)
+
+	// Deletes access control lists (ACLs) according to the supplied filters.
+	// This operation is not transactional so it may succeed for some ACLs while fail for others.
+	// This operation is supported by brokers with version 0.11.0.0 or higher.
+	DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
+
+	// ElectLeaders allows to trigger the election of preferred leaders for a set of partitions.
+	ElectLeaders(ElectionType, map[string][]int32) (map[string]map[int32]*PartitionResult, error)
+
+	// List the consumer groups available in the cluster.
+	ListConsumerGroups() (map[string]string, error)
+
+	// Describe the given consumer groups.
+	DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
+
+	// List the consumer group offsets available in the cluster.
+	ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
+
+	// Deletes a consumer group offset
+	DeleteConsumerGroupOffset(group string, topic string, partition int32) error
+
+	// Delete a consumer group.
+	DeleteConsumerGroup(group string) error
+
+	// Get information about the nodes in the cluster
+	DescribeCluster() (brokers []*Broker, controllerID int32, err error)
+
+	// Get information about all log directories on the given set of brokers
+	DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
+
+	// Get information about SCRAM users
+	DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)
+
+	// Delete SCRAM users
+	DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)
+
+	// Upsert SCRAM users
+	UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
+
+	// Get client quota configurations corresponding to the specified filter.
+	// This operation is supported by brokers with version 2.6.0.0 or higher.
+	DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error)
+
+	// Alters client quota configurations with the specified alterations.
+	// This operation is supported by brokers with version 2.6.0.0 or higher.
+	AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error
+
+	// Controller returns the cluster controller broker. It will return a
+	// locally cached value if it's available.
+	Controller() (*Broker, error)
+
+	// Coordinator returns the coordinating broker for a consumer group. It will
+	// return a locally cached value if it's available.
+	Coordinator(group string) (*Broker, error)
+
+	// Remove members from the consumer group by given member identities.
+	// This operation is supported by brokers with version 2.3 or higher
+	// This is for static membership feature. KIP-345
+	RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error)
+
+	// Close shuts down the admin and closes underlying client.
+	Close() error
+}
+
+type clusterAdmin struct {
+	client Client
+	conf   *Config
+}
+
+// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
+func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
+	client, err := NewClient(addrs, conf)
+	if err != nil {
+		return nil, err
+	}
+	admin, err := NewClusterAdminFromClient(client)
+	if err != nil {
+		client.Close()
+	}
+	return admin, err
+}
+
+// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
+// Note that underlying client will also be closed on admin's Close() call.
+func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
+	// make sure we can retrieve the controller
+	_, err := client.Controller()
+	if err != nil {
+		return nil, err
+	}
+
+	ca := &clusterAdmin{
+		client: client,
+		conf:   client.Config(),
+	}
+	return ca, nil
+}
+
+func (ca *clusterAdmin) Close() error {
+	return ca.client.Close()
+}
+
+func (ca *clusterAdmin) Controller() (*Broker, error) {
+	return ca.client.Controller()
+}
+
+func (ca *clusterAdmin) Coordinator(group string) (*Broker, error) {
+	return ca.client.Coordinator(group)
+}
+
+func (ca *clusterAdmin) refreshController() (*Broker, error) {
+	return ca.client.RefreshController()
+}
+
+// isRetriableControllerError returns `true` if the given error type unwraps to
+// an `ErrNotController` or `EOF` response from Kafka
+func isRetriableControllerError(err error) bool {
+	return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF)
+}
+
+// isRetriableGroupCoordinatorError returns `true` if the given error type
+// unwraps to an `ErrNotCoordinatorForConsumer`,
+// `ErrConsumerCoordinatorNotAvailable` or `EOF` response from Kafka
+func isRetriableGroupCoordinatorError(err error) bool {
+	return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF)
+}
+
+// retryOnError will repeatedly call the given (error-returning) func in the
+// case that its response is non-nil and retryable (as determined by the
+// provided retryable func) up to the maximum number of tries permitted by
+// the admin client configuration
+func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
+	for attemptsRemaining := ca.conf.Admin.Retry.Max + 1; ; {
+		err := fn()
+		attemptsRemaining--
+		if err == nil || attemptsRemaining <= 0 || !retryable(err) {
+			return err
+		}
+		Logger.Printf(
+			"admin/request retrying after %dms... (%d attempts remaining)\n",
+			ca.conf.Admin.Retry.Backoff/time.Millisecond, attemptsRemaining)
+		time.Sleep(ca.conf.Admin.Retry.Backoff)
+	}
+}
+
+func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
+	if topic == "" {
+		return ErrInvalidTopic
+	}
+
+	if detail == nil {
+		return errors.New("you must specify topic details")
+	}
+
+	topicDetails := map[string]*TopicDetail{
+		topic: detail,
+	}
+
+	request := NewCreateTopicsRequest(
+		ca.conf.Version,
+		topicDetails,
+		ca.conf.Admin.Timeout,
+		validateOnly,
+	)
+
+	return ca.retryOnError(isRetriableControllerError, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
+
+		rsp, err := b.CreateTopics(request)
+		if err != nil {
+			return err
+		}
+
+		topicErr, ok := rsp.TopicErrors[topic]
+		if !ok {
+			return ErrIncompleteResponse
+		}
+
+		if !errors.Is(topicErr.Err, ErrNoError) {
+			if isRetriableControllerError(topicErr.Err) {
+				_, _ = ca.refreshController()
+			}
+			return topicErr
+		}
+
+		return nil
+	})
+}
+
+func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
+	var response *MetadataResponse
+	err = ca.retryOnError(isRetriableControllerError, func() error {
+		controller, err := ca.Controller()
+		if err != nil {
+			return err
+		}
+		request := NewMetadataRequest(ca.conf.Version, topics)
+		response, err = controller.GetMetadata(request)
+		if isRetriableControllerError(err) {
+			_, _ = ca.refreshController()
+		}
+		return err
+	})
+	if err != nil {
+		return nil, err
+	}
+	return response.Topics, nil
+}
+
+func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
+	var response *MetadataResponse
+	err = ca.retryOnError(isRetriableControllerError, func() error {
+		controller, err := ca.Controller()
+		if err != nil {
+			return err
+		}
+
+		request := NewMetadataRequest(ca.conf.Version, nil)
+		response, err = controller.GetMetadata(request)
+		if isRetriableControllerError(err) {
+			_, _ = ca.refreshController()
+		}
+		return err
+	})
+	if err != nil {
+		return nil, int32(0), err
+	}
+
+	return response.Brokers, response.ControllerID, nil
+}
+
+func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
+	brokers := ca.client.Brokers()
+	for _, b := range brokers {
+		if b.ID() == id {
+			return b, nil
+		}
+	}
+	return nil, fmt.Errorf("could not find broker id %d", id)
+}
+
+func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
+	brokers := ca.client.Brokers()
+	if len(brokers) > 0 {
+		index := rand.Intn(len(brokers))
+		return brokers[index], nil
+	}
+	return nil, errors.New("no available broker")
+}
+
+func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
+	// In order to build TopicDetails we need to first get the list of all
+	// topics using a MetadataRequest and then get their configs using a
+	// DescribeConfigsRequest request. To avoid sending many requests to the
+	// broker, we use a single DescribeConfigsRequest.
+
+	// Send the all-topic MetadataRequest
+	b, err := ca.findAnyBroker()
+	if err != nil {
+		return nil, err
+	}
+	_ = b.Open(ca.client.Config())
+
+	metadataReq := NewMetadataRequest(ca.conf.Version, nil)
+	metadataResp, err := b.GetMetadata(metadataReq)
+	if err != nil {
+		return nil, err
+	}
+
+	topicsDetailsMap := make(map[string]TopicDetail, len(metadataResp.Topics))
+
+	var describeConfigsResources []*ConfigResource
+
+	for _, topic := range metadataResp.Topics {
+		topicDetails := TopicDetail{
+			NumPartitions: int32(len(topic.Partitions)),
+		}
+		if len(topic.Partitions) > 0 {
+			topicDetails.ReplicaAssignment = make(map[int32][]int32, len(topic.Partitions))
+			for _, partition := range topic.Partitions {
+				topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
+			}
+			topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
+		}
+		topicsDetailsMap[topic.Name] = topicDetails
+
+		// we populate the resources we want to describe from the MetadataResponse
+		topicResource := ConfigResource{
+			Type: TopicResource,
+			Name: topic.Name,
+		}
+		describeConfigsResources = append(describeConfigsResources, &topicResource)
+	}
+
+	// Send the DescribeConfigsRequest
+	describeConfigsReq := &DescribeConfigsRequest{
+		Resources: describeConfigsResources,
+	}
+
+	if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+		describeConfigsReq.Version = 1
+	}
+
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		describeConfigsReq.Version = 2
+	}
+
+	describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
+	if err != nil {
+		return nil, err
+	}
+
+	for _, resource := range describeConfigsResp.Resources {
+		topicDetails := topicsDetailsMap[resource.Name]
+		topicDetails.ConfigEntries = make(map[string]*string)
+
+		for _, entry := range resource.Configs {
+			// only include non-default non-sensitive config
+			// (don't actually think topic config will ever be sensitive)
+			if entry.Default || entry.Sensitive {
+				continue
+			}
+			topicDetails.ConfigEntries[entry.Name] = &entry.Value
+		}
+
+		topicsDetailsMap[resource.Name] = topicDetails
+	}
+
+	return topicsDetailsMap, nil
+}
+
+func (ca *clusterAdmin) DeleteTopic(topic string) error {
+	if topic == "" {
+		return ErrInvalidTopic
+	}
+
+	request := &DeleteTopicsRequest{
+		Topics:  []string{topic},
+		Timeout: ca.conf.Admin.Timeout,
+	}
+
+	// Versions 0, 1, 2, and 3 are the same.
+	// Version 4 is first flexible version.
+	if ca.conf.Version.IsAtLeast(V2_4_0_0) {
+		request.Version = 4
+	} else if ca.conf.Version.IsAtLeast(V2_1_0_0) {
+		request.Version = 3
+	} else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 2
+	} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+		request.Version = 1
+	}
+
+	return ca.retryOnError(isRetriableControllerError, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
+
+		rsp, err := b.DeleteTopics(request)
+		if err != nil {
+			return err
+		}
+
+		topicErr, ok := rsp.TopicErrorCodes[topic]
+		if !ok {
+			return ErrIncompleteResponse
+		}
+
+		if !errors.Is(topicErr, ErrNoError) {
+			if errors.Is(topicErr, ErrNotController) {
+				_, _ = ca.refreshController()
+			}
+			return topicErr
+		}
+
+		return nil
+	})
+}
+
+func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
+	if topic == "" {
+		return ErrInvalidTopic
+	}
+
+	topicPartitions := map[string]*TopicPartition{
+		topic: {
+			Count:      count,
+			Assignment: assignment,
+		},
+	}
+
+	request := &CreatePartitionsRequest{
+		TopicPartitions: topicPartitions,
+		Timeout:         ca.conf.Admin.Timeout,
+		ValidateOnly:    validateOnly,
+	}
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 1
+	}
+
+	return ca.retryOnError(isRetriableControllerError, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
+
+		rsp, err := b.CreatePartitions(request)
+		if err != nil {
+			return err
+		}
+
+		topicErr, ok := rsp.TopicPartitionErrors[topic]
+		if !ok {
+			return ErrIncompleteResponse
+		}
+
+		if !errors.Is(topicErr.Err, ErrNoError) {
+			if errors.Is(topicErr.Err, ErrNotController) {
+				_, _ = ca.refreshController()
+			}
+			return topicErr
+		}
+
+		return nil
+	})
+}
+
+func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
+	if topic == "" {
+		return ErrInvalidTopic
+	}
+
+	request := &AlterPartitionReassignmentsRequest{
+		TimeoutMs: int32(60000),
+		Version:   int16(0),
+	}
+
+	for i := 0; i < len(assignment); i++ {
+		request.AddBlock(topic, int32(i), assignment[i])
+	}
+
+	return ca.retryOnError(isRetriableControllerError, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
+
+		errs := make([]error, 0)
+
+		rsp, err := b.AlterPartitionReassignments(request)
+
+		if err != nil {
+			errs = append(errs, err)
+		} else {
+			if rsp.ErrorCode > 0 {
+				errs = append(errs, rsp.ErrorCode)
+			}
+
+			for topic, topicErrors := range rsp.Errors {
+				for partition, partitionError := range topicErrors {
+					if !errors.Is(partitionError.errorCode, ErrNoError) {
+						errs = append(errs, fmt.Errorf("[%s-%d]: %w", topic, partition, partitionError.errorCode))
+					}
+				}
+			}
+		}
+
+		if len(errs) > 0 {
+			return Wrap(ErrReassignPartitions, errs...)
+		}
+
+		return nil
+	})
+}
+
+func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
+	if topic == "" {
+		return nil, ErrInvalidTopic
+	}
+
+	request := &ListPartitionReassignmentsRequest{
+		TimeoutMs: int32(60000),
+		Version:   int16(0),
+	}
+
+	request.AddBlock(topic, partitions)
+
+	var rsp *ListPartitionReassignmentsResponse
+	err = ca.retryOnError(isRetriableControllerError, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
+		_ = b.Open(ca.client.Config())
+
+		rsp, err = b.ListPartitionReassignments(request)
+		if isRetriableControllerError(err) {
+			_, _ = ca.refreshController()
+		}
+		return err
+	})
+
+	if err == nil && rsp != nil {
+		return rsp.TopicStatus, nil
+	} else {
+		return nil, err
+	}
+}
+
+func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
+	if topic == "" {
+		return ErrInvalidTopic
+	}
+	errs := make([]error, 0)
+	partitionPerBroker := make(map[*Broker][]int32)
+	for partition := range partitionOffsets {
+		broker, err := ca.client.Leader(topic, partition)
+		if err != nil {
+			errs = append(errs, err)
+			continue
+		}
+		partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
+	}
+	for broker, partitions := range partitionPerBroker {
+		recordsToDelete := make(map[int32]int64, len(partitions))
+		for _, p := range partitions {
+			recordsToDelete[p] = partitionOffsets[p]
+		}
+		topics := map[string]*DeleteRecordsRequestTopic{
+			topic: {
+				PartitionOffsets: recordsToDelete,
+			},
+		}
+		request := &DeleteRecordsRequest{
+			Topics:  topics,
+			Timeout: ca.conf.Admin.Timeout,
+		}
+		if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+			request.Version = 1
+		}
+		rsp, err := broker.DeleteRecords(request)
+		if err != nil {
+			errs = append(errs, err)
+			continue
+		}
+
+		deleteRecordsResponseTopic, ok := rsp.Topics[topic]
+		if !ok {
+			errs = append(errs, ErrIncompleteResponse)
+			continue
+		}
+
+		for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
+			if !errors.Is(deleteRecordsResponsePartition.Err, ErrNoError) {
+				errs = append(errs, deleteRecordsResponsePartition.Err)
+				continue
+			}
+		}
+	}
+	if len(errs) > 0 {
+		return Wrap(ErrDeleteRecords, errs...)
+	}
+	// todo since we are dealing with couple of partitions it would be good if we return slice of errors
+	// for each partition instead of one error
+	return nil
+}
+
+// Returns a bool indicating whether the resource request needs to go to a
+// specific broker
+func dependsOnSpecificNode(resource ConfigResource) bool {
+	return (resource.Type == BrokerResource && resource.Name != "") ||
+		resource.Type == BrokerLoggerResource
+}
+
+func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
+	var entries []ConfigEntry
+	var resources []*ConfigResource
+	resources = append(resources, &resource)
+
+	request := &DescribeConfigsRequest{
+		Resources: resources,
+	}
+
+	if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+		request.Version = 1
+	}
+
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 2
+	}
+
+	var (
+		b   *Broker
+		err error
+	)
+
+	// DescribeConfig of broker/broker logger must be sent to the broker in question
+	if dependsOnSpecificNode(resource) {
+		var id int64
+		id, err = strconv.ParseInt(resource.Name, 10, 32)
+		if err != nil {
+			return nil, err
+		}
+		b, err = ca.findBroker(int32(id))
+	} else {
+		b, err = ca.findAnyBroker()
+	}
+	if err != nil {
+		return nil, err
+	}
+
+	_ = b.Open(ca.client.Config())
+	rsp, err := b.DescribeConfigs(request)
+	if err != nil {
+		return nil, err
+	}
+
+	for _, rspResource := range rsp.Resources {
+		if rspResource.Name == resource.Name {
+			if rspResource.ErrorCode != 0 {
+				return nil, &DescribeConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg}
+			}
+			for _, cfgEntry := range rspResource.Configs {
+				entries = append(entries, *cfgEntry)
+			}
+		}
+	}
+	return entries, nil
+}
+
+func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
+	var resources []*AlterConfigsResource
+	resources = append(resources, &AlterConfigsResource{
+		Type:          resourceType,
+		Name:          name,
+		ConfigEntries: entries,
+	})
+
+	request := &AlterConfigsRequest{
+		Resources:    resources,
+		ValidateOnly: validateOnly,
+	}
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 1
+	}
+
+	var (
+		b   *Broker
+		err error
+	)
+
+	// AlterConfig of broker/broker logger must be sent to the broker in question
+	if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
+		var id int64
+		id, err = strconv.ParseInt(name, 10, 32)
+		if err != nil {
+			return err
+		}
+		b, err = ca.findBroker(int32(id))
+	} else {
+		b, err = ca.findAnyBroker()
+	}
+	if err != nil {
+		return err
+	}
+
+	_ = b.Open(ca.client.Config())
+	rsp, err := b.AlterConfigs(request)
+	if err != nil {
+		return err
+	}
+
+	for _, rspResource := range rsp.Resources {
+		if rspResource.Name == name {
+			if rspResource.ErrorCode != 0 {
+				return &AlterConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg}
+			}
+		}
+	}
+	return nil
+}
+
+func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error {
+	var resources []*IncrementalAlterConfigsResource
+	resources = append(resources, &IncrementalAlterConfigsResource{
+		Type:          resourceType,
+		Name:          name,
+		ConfigEntries: entries,
+	})
+
+	request := &IncrementalAlterConfigsRequest{
+		Resources:    resources,
+		ValidateOnly: validateOnly,
+	}
+
+	if ca.conf.Version.IsAtLeast(V2_4_0_0) {
+		request.Version = 1
+	}
+
+	var (
+		b   *Broker
+		err error
+	)
+
+	// AlterConfig of broker/broker logger must be sent to the broker in question
+	if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
+		var id int64
+		id, err = strconv.ParseInt(name, 10, 32)
+		if err != nil {
+			return err
+		}
+		b, err = ca.findBroker(int32(id))
+	} else {
+		b, err = ca.findAnyBroker()
+	}
+	if err != nil {
+		return err
+	}
+
+	_ = b.Open(ca.client.Config())
+	rsp, err := b.IncrementalAlterConfigs(request)
+	if err != nil {
+		return err
+	}
+
+	for _, rspResource := range rsp.Resources {
+		if rspResource.Name == name {
+			if rspResource.ErrorMsg != "" {
+				return errors.New(rspResource.ErrorMsg)
+			}
+			if rspResource.ErrorCode != 0 {
+				return KError(rspResource.ErrorCode)
+			}
+		}
+	}
+	return nil
+}
+
+func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
+	var acls []*AclCreation
+	acls = append(acls, &AclCreation{resource, acl})
+	request := &CreateAclsRequest{AclCreations: acls}
+
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 1
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return err
+	}
+
+	_, err = b.CreateAcls(request)
+	return err
+}
+
+func (ca *clusterAdmin) CreateACLs(resourceACLs []*ResourceAcls) error {
+	var acls []*AclCreation
+	for _, resourceACL := range resourceACLs {
+		for _, acl := range resourceACL.Acls {
+			acls = append(acls, &AclCreation{resourceACL.Resource, *acl})
+		}
+	}
+	request := &CreateAclsRequest{AclCreations: acls}
+
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 1
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return err
+	}
+
+	_, err = b.CreateAcls(request)
+	return err
+}
+
+func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
+	request := &DescribeAclsRequest{AclFilter: filter}
+
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 1
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return nil, err
+	}
+
+	rsp, err := b.DescribeAcls(request)
+	if err != nil {
+		return nil, err
+	}
+
+	var lAcls []ResourceAcls
+	for _, rAcl := range rsp.ResourceAcls {
+		lAcls = append(lAcls, *rAcl)
+	}
+	return lAcls, nil
+}
+
+func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
+	var filters []*AclFilter
+	filters = append(filters, &filter)
+	request := &DeleteAclsRequest{Filters: filters}
+
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 1
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return nil, err
+	}
+
+	rsp, err := b.DeleteAcls(request)
+	if err != nil {
+		return nil, err
+	}
+
+	var mAcls []MatchingAcl
+	for _, fr := range rsp.FilterResponses {
+		for _, mACL := range fr.MatchingAcls {
+			mAcls = append(mAcls, *mACL)
+		}
+	}
+	return mAcls, nil
+}
+
+func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[string][]int32) (map[string]map[int32]*PartitionResult, error) {
+	request := &ElectLeadersRequest{
+		Type:            electionType,
+		TopicPartitions: partitions,
+		TimeoutMs:       int32(60000),
+	}
+
+	if ca.conf.Version.IsAtLeast(V2_4_0_0) {
+		request.Version = 2
+	} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+		request.Version = 1
+	}
+
+	var res *ElectLeadersResponse
+	if err := ca.retryOnError(isRetriableControllerError, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
+		_ = b.Open(ca.client.Config())
+
+		res, err = b.ElectLeaders(request)
+		if err != nil {
+			return err
+		}
+		if !errors.Is(res.ErrorCode, ErrNoError) {
+			if isRetriableControllerError(res.ErrorCode) {
+				_, _ = ca.refreshController()
+			}
+			return res.ErrorCode
+		}
+		return nil
+	}); err != nil {
+		return nil, err
+	}
+	return res.ReplicaElectionResults, nil
+}
+
+func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
+	groupsPerBroker := make(map[*Broker][]string)
+
+	for _, group := range groups {
+		coordinator, err := ca.client.Coordinator(group)
+		if err != nil {
+			return nil, err
+		}
+		groupsPerBroker[coordinator] = append(groupsPerBroker[coordinator], group)
+	}
+
+	for broker, brokerGroups := range groupsPerBroker {
+		describeReq := &DescribeGroupsRequest{
+			Groups: brokerGroups,
+		}
+
+		if ca.conf.Version.IsAtLeast(V2_4_0_0) {
+			// Starting in version 4, the response will include group.instance.id info for members.
+			// Starting in version 5, the response uses flexible encoding
+			describeReq.Version = 5
+		} else if ca.conf.Version.IsAtLeast(V2_3_0_0) {
+			// Starting in version 3, authorized operations can be requested.
+			describeReq.Version = 3
+		} else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+			// Version 2 is the same as version 0.
+			describeReq.Version = 2
+		} else if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+			// Version 1 is the same as version 0.
+			describeReq.Version = 1
+		}
+		response, err := broker.DescribeGroups(describeReq)
+		if err != nil {
+			return nil, err
+		}
+
+		result = append(result, response.Groups...)
+	}
+	return result, nil
+}
+
+func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
+	allGroups = make(map[string]string)
+
+	// Query brokers in parallel, since we have to query *all* brokers
+	brokers := ca.client.Brokers()
+	groupMaps := make(chan map[string]string, len(brokers))
+	errChan := make(chan error, len(brokers))
+	wg := sync.WaitGroup{}
+
+	for _, b := range brokers {
+		wg.Add(1)
+		go func(b *Broker, conf *Config) {
+			defer wg.Done()
+			_ = b.Open(conf) // Ensure that broker is opened
+
+			request := &ListGroupsRequest{}
+			if ca.conf.Version.IsAtLeast(V3_8_0_0) {
+				// Version 5 adds the TypesFilter field (KIP-848).
+				request.Version = 5
+			} else if ca.conf.Version.IsAtLeast(V2_6_0_0) {
+				// Version 4 adds the StatesFilter field (KIP-518).
+				request.Version = 4
+			} else if ca.conf.Version.IsAtLeast(V2_4_0_0) {
+				// Version 3 is the first flexible version.
+				request.Version = 3
+			} else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+				// Version 2 is the same as version 0.
+				request.Version = 2
+			} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+				// Version 1 is the same as version 0.
+				request.Version = 1
+			}
+
+			response, err := b.ListGroups(request)
+			if err != nil {
+				errChan <- err
+				return
+			}
+
+			groupMaps <- maps.Clone(response.Groups)
+		}(b, ca.conf)
+	}
+
+	wg.Wait()
+	close(groupMaps)
+	close(errChan)
+
+	for groupMap := range groupMaps {
+		maps.Copy(allGroups, groupMap)
+	}
+
+	// Intentionally return only the first error for simplicity
+	err = <-errChan
+	return
+}
+
+func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
+	var response *OffsetFetchResponse
+	request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions)
+	err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
+		defer func() {
+			if err != nil && isRetriableGroupCoordinatorError(err) {
+				_ = ca.client.RefreshCoordinator(group)
+			}
+		}()
+
+		coordinator, err := ca.client.Coordinator(group)
+		if err != nil {
+			return err
+		}
+
+		response, err = coordinator.FetchOffset(request)
+		if err != nil {
+			return err
+		}
+		if !errors.Is(response.Err, ErrNoError) {
+			return response.Err
+		}
+
+		return nil
+	})
+
+	return response, err
+}
+
+func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error {
+	var response *DeleteOffsetsResponse
+	request := &DeleteOffsetsRequest{
+		Group: group,
+		partitions: map[string][]int32{
+			topic: {partition},
+		},
+	}
+
+	return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
+		defer func() {
+			if err != nil && isRetriableGroupCoordinatorError(err) {
+				_ = ca.client.RefreshCoordinator(group)
+			}
+		}()
+
+		coordinator, err := ca.client.Coordinator(group)
+		if err != nil {
+			return err
+		}
+
+		response, err = coordinator.DeleteOffsets(request)
+		if err != nil {
+			return err
+		}
+		if !errors.Is(response.ErrorCode, ErrNoError) {
+			return response.ErrorCode
+		}
+		if !errors.Is(response.Errors[topic][partition], ErrNoError) {
+			return response.Errors[topic][partition]
+		}
+
+		return nil
+	})
+}
+
+func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
+	var response *DeleteGroupsResponse
+	request := &DeleteGroupsRequest{
+		Groups: []string{group},
+	}
+
+	if ca.conf.Version.IsAtLeast(V2_4_0_0) {
+		request.Version = 2
+	} else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 1
+	}
+
+	return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
+		defer func() {
+			if err != nil && isRetriableGroupCoordinatorError(err) {
+				_ = ca.client.RefreshCoordinator(group)
+			}
+		}()
+
+		coordinator, err := ca.client.Coordinator(group)
+		if err != nil {
+			return err
+		}
+
+		response, err = coordinator.DeleteGroups(request)
+		if err != nil {
+			return err
+		}
+
+		groupErr, ok := response.GroupErrorCodes[group]
+		if !ok {
+			return ErrIncompleteResponse
+		}
+
+		if !errors.Is(groupErr, ErrNoError) {
+			return groupErr
+		}
+
+		return nil
+	})
+}
+
+func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
+	type result struct {
+		id      int32
+		logdirs []DescribeLogDirsResponseDirMetadata
+	}
+	// Query brokers in parallel, since we may have to query multiple brokers
+	logDirsResults := make(chan result, len(brokerIds))
+	errChan := make(chan error, len(brokerIds))
+	wg := sync.WaitGroup{}
+
+	for _, b := range brokerIds {
+		broker, err := ca.findBroker(b)
+		if err != nil {
+			Logger.Printf("Unable to find broker with ID = %v\n", b)
+			continue
+		}
+		wg.Add(1)
+		go func(b *Broker, conf *Config) {
+			defer wg.Done()
+			_ = b.Open(conf) // Ensure that broker is opened
+
+			request := &DescribeLogDirsRequest{}
+			if ca.conf.Version.IsAtLeast(V3_3_0_0) {
+				request.Version = 4
+			} else if ca.conf.Version.IsAtLeast(V3_2_0_0) {
+				request.Version = 3
+			} else if ca.conf.Version.IsAtLeast(V2_6_0_0) {
+				request.Version = 2
+			} else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+				request.Version = 1
+			}
+			response, err := b.DescribeLogDirs(request)
+			if err != nil {
+				errChan <- err
+				return
+			}
+			if !errors.Is(response.ErrorCode, ErrNoError) {
+				errChan <- response.ErrorCode
+				return
+			}
+			logDirsResults <- result{id: b.ID(), logdirs: response.LogDirs}
+		}(broker, ca.conf)
+	}
+
+	wg.Wait()
+	close(logDirsResults)
+	close(errChan)
+
+	allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
+	for logDirsResult := range logDirsResults {
+		allLogDirs[logDirsResult.id] = logDirsResult.logdirs
+	}
+
+	// Intentionally return only the first error for simplicity
+	err = <-errChan
+	return
+}
+
+func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
+	req := &DescribeUserScramCredentialsRequest{}
+	for _, u := range users {
+		req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
+			Name: u,
+		})
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return nil, err
+	}
+
+	rsp, err := b.DescribeUserScramCredentials(req)
+	if err != nil {
+		return nil, err
+	}
+
+	return rsp.Results, nil
+}
+
+func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
+	res, err := ca.AlterUserScramCredentials(upsert, nil)
+	if err != nil {
+		return nil, err
+	}
+
+	return res, nil
+}
+
+func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
+	res, err := ca.AlterUserScramCredentials(nil, delete)
+	if err != nil {
+		return nil, err
+	}
+
+	return res, nil
+}
+
+func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
+	req := &AlterUserScramCredentialsRequest{
+		Deletions:  d,
+		Upsertions: u,
+	}
+
+	var rsp *AlterUserScramCredentialsResponse
+	err := ca.retryOnError(isRetriableControllerError, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
+
+		rsp, err = b.AlterUserScramCredentials(req)
+		return err
+	})
+	if err != nil {
+		return nil, err
+	}
+
+	return rsp.Results, nil
+}
+
+// Describe All : use an empty/nil components slice + strict = false
+// Contains components: strict = false
+// Contains only components: strict = true
+func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) {
+	request := NewDescribeClientQuotasRequest(
+		ca.conf.Version,
+		components,
+		strict,
+	)
+
+	b, err := ca.Controller()
+	if err != nil {
+		return nil, err
+	}
+
+	rsp, err := b.DescribeClientQuotas(request)
+	if err != nil {
+		return nil, err
+	}
+
+	if rsp.ErrorMsg != nil && len(*rsp.ErrorMsg) > 0 {
+		return nil, errors.New(*rsp.ErrorMsg)
+	}
+	if !errors.Is(rsp.ErrorCode, ErrNoError) {
+		return nil, rsp.ErrorCode
+	}
+
+	return rsp.Entries, nil
+}
+
+func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error {
+	entry := AlterClientQuotasEntry{
+		Entity: entity,
+		Ops:    []ClientQuotasOp{op},
+	}
+
+	request := &AlterClientQuotasRequest{
+		Entries:      []AlterClientQuotasEntry{entry},
+		ValidateOnly: validateOnly,
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return err
+	}
+
+	rsp, err := b.AlterClientQuotas(request)
+	if err != nil {
+		return err
+	}
+
+	for _, entry := range rsp.Entries {
+		if entry.ErrorMsg != nil && len(*entry.ErrorMsg) > 0 {
+			return errors.New(*entry.ErrorMsg)
+		}
+		if !errors.Is(entry.ErrorCode, ErrNoError) {
+			return entry.ErrorCode
+		}
+	}
+
+	return nil
+}
+
+func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
+	if !ca.conf.Version.IsAtLeast(V2_4_0_0) {
+		return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0")
+	}
+	var response *LeaveGroupResponse
+	request := &LeaveGroupRequest{
+		Version: 3,
+		GroupId: group,
+	}
+	for _, instanceId := range groupInstanceIds {
+		groupInstanceId := instanceId
+		request.Members = append(request.Members, MemberIdentity{
+			GroupInstanceId: &groupInstanceId,
+		})
+	}
+	err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
+		defer func() {
+			if err != nil && isRetriableGroupCoordinatorError(err) {
+				_ = ca.client.RefreshCoordinator(group)
+			}
+		}()
+
+		coordinator, err := ca.client.Coordinator(group)
+		if err != nil {
+			return err
+		}
+
+		response, err = coordinator.LeaveGroup(request)
+		if err != nil {
+			return err
+		}
+		if !errors.Is(response.Err, ErrNoError) {
+			return response.Err
+		}
+
+		return nil
+	})
+
+	return response, err
+}