[VOL-5486] Upgrade library versions
Change-Id: I8b4e88699e03f44ee13e467867f45ae3f0a63c4b
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/github.com/IBM/sarama/admin.go b/vendor/github.com/IBM/sarama/admin.go
new file mode 100644
index 0000000..dda0201
--- /dev/null
+++ b/vendor/github.com/IBM/sarama/admin.go
@@ -0,0 +1,1395 @@
+package sarama
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "maps"
+ "math/rand"
+ "strconv"
+ "sync"
+ "time"
+)
+
+// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
+// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
+// Methods with stricter requirements will specify the minimum broker version required.
+// You MUST call Close() on a client to avoid leaks
+type ClusterAdmin interface {
+ // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
+ // It may take several seconds after CreateTopic returns success for all the brokers
+ // to become aware that the topic has been created. During this time, listTopics
+ // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
+ CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
+
+ // List the topics available in the cluster with the default options.
+ ListTopics() (map[string]TopicDetail, error)
+
+ // Describe some topics in the cluster.
+ DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
+
+ // Delete a topic. It may take several seconds after the DeleteTopic to returns success
+ // and for all the brokers to become aware that the topics are gone.
+ // During this time, listTopics may continue to return information about the deleted topic.
+ // If delete.topic.enable is false on the brokers, deleteTopic will mark
+ // the topic for deletion, but not actually delete them.
+ // This operation is supported by brokers with version 0.10.1.0 or higher.
+ DeleteTopic(topic string) error
+
+ // Increase the number of partitions of the topics according to the corresponding values.
+ // If partitions are increased for a topic that has a key, the partition logic or ordering of
+ // the messages will be affected. It may take several seconds after this method returns
+ // success for all the brokers to become aware that the partitions have been created.
+ // During this time, ClusterAdmin#describeTopics may not return information about the
+ // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
+ CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
+
+ // Alter the replica assignment for partitions.
+ // This operation is supported by brokers with version 2.4.0.0 or higher.
+ AlterPartitionReassignments(topic string, assignment [][]int32) error
+
+ // Provides info on ongoing partitions replica reassignments.
+ // This operation is supported by brokers with version 2.4.0.0 or higher.
+ ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
+
+ // Delete records whose offset is smaller than the given offset of the corresponding partition.
+ // This operation is supported by brokers with version 0.11.0.0 or higher.
+ DeleteRecords(topic string, partitionOffsets map[int32]int64) error
+
+ // Get the configuration for the specified resources.
+ // The returned configuration includes default values and the Default is true
+ // can be used to distinguish them from user supplied values.
+ // Config entries where ReadOnly is true cannot be updated.
+ // The value of config entries where Sensitive is true is always nil so
+ // sensitive information is not disclosed.
+ // This operation is supported by brokers with version 0.11.0.0 or higher.
+ DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
+
+ // Update the configuration for the specified resources with the default options.
+ // This operation is supported by brokers with version 0.11.0.0 or higher.
+ // The resources with their configs (topic is the only resource type with configs
+ // that can be updated currently Updates are not transactional so they may succeed
+ // for some resources while fail for others. The configs for a particular resource are updated automatically.
+ AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
+
+ // IncrementalAlterConfig Incrementally Update the configuration for the specified resources with the default options.
+ // This operation is supported by brokers with version 2.3.0.0 or higher.
+ // Updates are not transactional so they may succeed for some resources while fail for others.
+ // The configs for a particular resource are updated automatically.
+ IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error
+
+ // Creates an access control list (ACL) which is bound to a specific resource.
+ // This operation is not transactional so it may succeed or fail.
+ // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
+ // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
+ // Deprecated: Use CreateACLs instead.
+ CreateACL(resource Resource, acl Acl) error
+
+ // Creates access control lists (ACLs) which are bound to specific resources.
+ // This operation is not transactional so it may succeed for some ACLs while fail for others.
+ // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
+ // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
+ CreateACLs([]*ResourceAcls) error
+
+ // Lists access control lists (ACLs) according to the supplied filter.
+ // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
+ // This operation is supported by brokers with version 0.11.0.0 or higher.
+ ListAcls(filter AclFilter) ([]ResourceAcls, error)
+
+ // Deletes access control lists (ACLs) according to the supplied filters.
+ // This operation is not transactional so it may succeed for some ACLs while fail for others.
+ // This operation is supported by brokers with version 0.11.0.0 or higher.
+ DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
+
+ // ElectLeaders allows to trigger the election of preferred leaders for a set of partitions.
+ ElectLeaders(ElectionType, map[string][]int32) (map[string]map[int32]*PartitionResult, error)
+
+ // List the consumer groups available in the cluster.
+ ListConsumerGroups() (map[string]string, error)
+
+ // Describe the given consumer groups.
+ DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
+
+ // List the consumer group offsets available in the cluster.
+ ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
+
+ // Deletes a consumer group offset
+ DeleteConsumerGroupOffset(group string, topic string, partition int32) error
+
+ // Delete a consumer group.
+ DeleteConsumerGroup(group string) error
+
+ // Get information about the nodes in the cluster
+ DescribeCluster() (brokers []*Broker, controllerID int32, err error)
+
+ // Get information about all log directories on the given set of brokers
+ DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
+
+ // Get information about SCRAM users
+ DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)
+
+ // Delete SCRAM users
+ DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)
+
+ // Upsert SCRAM users
+ UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
+
+ // Get client quota configurations corresponding to the specified filter.
+ // This operation is supported by brokers with version 2.6.0.0 or higher.
+ DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error)
+
+ // Alters client quota configurations with the specified alterations.
+ // This operation is supported by brokers with version 2.6.0.0 or higher.
+ AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error
+
+ // Controller returns the cluster controller broker. It will return a
+ // locally cached value if it's available.
+ Controller() (*Broker, error)
+
+ // Coordinator returns the coordinating broker for a consumer group. It will
+ // return a locally cached value if it's available.
+ Coordinator(group string) (*Broker, error)
+
+ // Remove members from the consumer group by given member identities.
+ // This operation is supported by brokers with version 2.3 or higher
+ // This is for static membership feature. KIP-345
+ RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error)
+
+ // Close shuts down the admin and closes underlying client.
+ Close() error
+}
+
+type clusterAdmin struct {
+ client Client
+ conf *Config
+}
+
+// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
+func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
+ client, err := NewClient(addrs, conf)
+ if err != nil {
+ return nil, err
+ }
+ admin, err := NewClusterAdminFromClient(client)
+ if err != nil {
+ client.Close()
+ }
+ return admin, err
+}
+
+// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
+// Note that underlying client will also be closed on admin's Close() call.
+func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
+ // make sure we can retrieve the controller
+ _, err := client.Controller()
+ if err != nil {
+ return nil, err
+ }
+
+ ca := &clusterAdmin{
+ client: client,
+ conf: client.Config(),
+ }
+ return ca, nil
+}
+
+func (ca *clusterAdmin) Close() error {
+ return ca.client.Close()
+}
+
+func (ca *clusterAdmin) Controller() (*Broker, error) {
+ return ca.client.Controller()
+}
+
+func (ca *clusterAdmin) Coordinator(group string) (*Broker, error) {
+ return ca.client.Coordinator(group)
+}
+
+func (ca *clusterAdmin) refreshController() (*Broker, error) {
+ return ca.client.RefreshController()
+}
+
+// isRetriableControllerError returns `true` if the given error type unwraps to
+// an `ErrNotController` or `EOF` response from Kafka
+func isRetriableControllerError(err error) bool {
+ return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF)
+}
+
+// isRetriableGroupCoordinatorError returns `true` if the given error type
+// unwraps to an `ErrNotCoordinatorForConsumer`,
+// `ErrConsumerCoordinatorNotAvailable` or `EOF` response from Kafka
+func isRetriableGroupCoordinatorError(err error) bool {
+ return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF)
+}
+
+// retryOnError will repeatedly call the given (error-returning) func in the
+// case that its response is non-nil and retryable (as determined by the
+// provided retryable func) up to the maximum number of tries permitted by
+// the admin client configuration
+func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
+ for attemptsRemaining := ca.conf.Admin.Retry.Max + 1; ; {
+ err := fn()
+ attemptsRemaining--
+ if err == nil || attemptsRemaining <= 0 || !retryable(err) {
+ return err
+ }
+ Logger.Printf(
+ "admin/request retrying after %dms... (%d attempts remaining)\n",
+ ca.conf.Admin.Retry.Backoff/time.Millisecond, attemptsRemaining)
+ time.Sleep(ca.conf.Admin.Retry.Backoff)
+ }
+}
+
+func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
+ if topic == "" {
+ return ErrInvalidTopic
+ }
+
+ if detail == nil {
+ return errors.New("you must specify topic details")
+ }
+
+ topicDetails := map[string]*TopicDetail{
+ topic: detail,
+ }
+
+ request := NewCreateTopicsRequest(
+ ca.conf.Version,
+ topicDetails,
+ ca.conf.Admin.Timeout,
+ validateOnly,
+ )
+
+ return ca.retryOnError(isRetriableControllerError, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+
+ rsp, err := b.CreateTopics(request)
+ if err != nil {
+ return err
+ }
+
+ topicErr, ok := rsp.TopicErrors[topic]
+ if !ok {
+ return ErrIncompleteResponse
+ }
+
+ if !errors.Is(topicErr.Err, ErrNoError) {
+ if isRetriableControllerError(topicErr.Err) {
+ _, _ = ca.refreshController()
+ }
+ return topicErr
+ }
+
+ return nil
+ })
+}
+
+func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
+ var response *MetadataResponse
+ err = ca.retryOnError(isRetriableControllerError, func() error {
+ controller, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+ request := NewMetadataRequest(ca.conf.Version, topics)
+ response, err = controller.GetMetadata(request)
+ if isRetriableControllerError(err) {
+ _, _ = ca.refreshController()
+ }
+ return err
+ })
+ if err != nil {
+ return nil, err
+ }
+ return response.Topics, nil
+}
+
+func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
+ var response *MetadataResponse
+ err = ca.retryOnError(isRetriableControllerError, func() error {
+ controller, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+
+ request := NewMetadataRequest(ca.conf.Version, nil)
+ response, err = controller.GetMetadata(request)
+ if isRetriableControllerError(err) {
+ _, _ = ca.refreshController()
+ }
+ return err
+ })
+ if err != nil {
+ return nil, int32(0), err
+ }
+
+ return response.Brokers, response.ControllerID, nil
+}
+
+func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
+ brokers := ca.client.Brokers()
+ for _, b := range brokers {
+ if b.ID() == id {
+ return b, nil
+ }
+ }
+ return nil, fmt.Errorf("could not find broker id %d", id)
+}
+
+func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
+ brokers := ca.client.Brokers()
+ if len(brokers) > 0 {
+ index := rand.Intn(len(brokers))
+ return brokers[index], nil
+ }
+ return nil, errors.New("no available broker")
+}
+
+func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
+ // In order to build TopicDetails we need to first get the list of all
+ // topics using a MetadataRequest and then get their configs using a
+ // DescribeConfigsRequest request. To avoid sending many requests to the
+ // broker, we use a single DescribeConfigsRequest.
+
+ // Send the all-topic MetadataRequest
+ b, err := ca.findAnyBroker()
+ if err != nil {
+ return nil, err
+ }
+ _ = b.Open(ca.client.Config())
+
+ metadataReq := NewMetadataRequest(ca.conf.Version, nil)
+ metadataResp, err := b.GetMetadata(metadataReq)
+ if err != nil {
+ return nil, err
+ }
+
+ topicsDetailsMap := make(map[string]TopicDetail, len(metadataResp.Topics))
+
+ var describeConfigsResources []*ConfigResource
+
+ for _, topic := range metadataResp.Topics {
+ topicDetails := TopicDetail{
+ NumPartitions: int32(len(topic.Partitions)),
+ }
+ if len(topic.Partitions) > 0 {
+ topicDetails.ReplicaAssignment = make(map[int32][]int32, len(topic.Partitions))
+ for _, partition := range topic.Partitions {
+ topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
+ }
+ topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
+ }
+ topicsDetailsMap[topic.Name] = topicDetails
+
+ // we populate the resources we want to describe from the MetadataResponse
+ topicResource := ConfigResource{
+ Type: TopicResource,
+ Name: topic.Name,
+ }
+ describeConfigsResources = append(describeConfigsResources, &topicResource)
+ }
+
+ // Send the DescribeConfigsRequest
+ describeConfigsReq := &DescribeConfigsRequest{
+ Resources: describeConfigsResources,
+ }
+
+ if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+ describeConfigsReq.Version = 1
+ }
+
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ describeConfigsReq.Version = 2
+ }
+
+ describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, resource := range describeConfigsResp.Resources {
+ topicDetails := topicsDetailsMap[resource.Name]
+ topicDetails.ConfigEntries = make(map[string]*string)
+
+ for _, entry := range resource.Configs {
+ // only include non-default non-sensitive config
+ // (don't actually think topic config will ever be sensitive)
+ if entry.Default || entry.Sensitive {
+ continue
+ }
+ topicDetails.ConfigEntries[entry.Name] = &entry.Value
+ }
+
+ topicsDetailsMap[resource.Name] = topicDetails
+ }
+
+ return topicsDetailsMap, nil
+}
+
+func (ca *clusterAdmin) DeleteTopic(topic string) error {
+ if topic == "" {
+ return ErrInvalidTopic
+ }
+
+ request := &DeleteTopicsRequest{
+ Topics: []string{topic},
+ Timeout: ca.conf.Admin.Timeout,
+ }
+
+ // Versions 0, 1, 2, and 3 are the same.
+ // Version 4 is first flexible version.
+ if ca.conf.Version.IsAtLeast(V2_4_0_0) {
+ request.Version = 4
+ } else if ca.conf.Version.IsAtLeast(V2_1_0_0) {
+ request.Version = 3
+ } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 2
+ } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+ request.Version = 1
+ }
+
+ return ca.retryOnError(isRetriableControllerError, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+
+ rsp, err := b.DeleteTopics(request)
+ if err != nil {
+ return err
+ }
+
+ topicErr, ok := rsp.TopicErrorCodes[topic]
+ if !ok {
+ return ErrIncompleteResponse
+ }
+
+ if !errors.Is(topicErr, ErrNoError) {
+ if errors.Is(topicErr, ErrNotController) {
+ _, _ = ca.refreshController()
+ }
+ return topicErr
+ }
+
+ return nil
+ })
+}
+
+func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
+ if topic == "" {
+ return ErrInvalidTopic
+ }
+
+ topicPartitions := map[string]*TopicPartition{
+ topic: {
+ Count: count,
+ Assignment: assignment,
+ },
+ }
+
+ request := &CreatePartitionsRequest{
+ TopicPartitions: topicPartitions,
+ Timeout: ca.conf.Admin.Timeout,
+ ValidateOnly: validateOnly,
+ }
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 1
+ }
+
+ return ca.retryOnError(isRetriableControllerError, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+
+ rsp, err := b.CreatePartitions(request)
+ if err != nil {
+ return err
+ }
+
+ topicErr, ok := rsp.TopicPartitionErrors[topic]
+ if !ok {
+ return ErrIncompleteResponse
+ }
+
+ if !errors.Is(topicErr.Err, ErrNoError) {
+ if errors.Is(topicErr.Err, ErrNotController) {
+ _, _ = ca.refreshController()
+ }
+ return topicErr
+ }
+
+ return nil
+ })
+}
+
+func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
+ if topic == "" {
+ return ErrInvalidTopic
+ }
+
+ request := &AlterPartitionReassignmentsRequest{
+ TimeoutMs: int32(60000),
+ Version: int16(0),
+ }
+
+ for i := 0; i < len(assignment); i++ {
+ request.AddBlock(topic, int32(i), assignment[i])
+ }
+
+ return ca.retryOnError(isRetriableControllerError, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+
+ errs := make([]error, 0)
+
+ rsp, err := b.AlterPartitionReassignments(request)
+
+ if err != nil {
+ errs = append(errs, err)
+ } else {
+ if rsp.ErrorCode > 0 {
+ errs = append(errs, rsp.ErrorCode)
+ }
+
+ for topic, topicErrors := range rsp.Errors {
+ for partition, partitionError := range topicErrors {
+ if !errors.Is(partitionError.errorCode, ErrNoError) {
+ errs = append(errs, fmt.Errorf("[%s-%d]: %w", topic, partition, partitionError.errorCode))
+ }
+ }
+ }
+ }
+
+ if len(errs) > 0 {
+ return Wrap(ErrReassignPartitions, errs...)
+ }
+
+ return nil
+ })
+}
+
+func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
+ if topic == "" {
+ return nil, ErrInvalidTopic
+ }
+
+ request := &ListPartitionReassignmentsRequest{
+ TimeoutMs: int32(60000),
+ Version: int16(0),
+ }
+
+ request.AddBlock(topic, partitions)
+
+ var rsp *ListPartitionReassignmentsResponse
+ err = ca.retryOnError(isRetriableControllerError, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+ _ = b.Open(ca.client.Config())
+
+ rsp, err = b.ListPartitionReassignments(request)
+ if isRetriableControllerError(err) {
+ _, _ = ca.refreshController()
+ }
+ return err
+ })
+
+ if err == nil && rsp != nil {
+ return rsp.TopicStatus, nil
+ } else {
+ return nil, err
+ }
+}
+
+func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
+ if topic == "" {
+ return ErrInvalidTopic
+ }
+ errs := make([]error, 0)
+ partitionPerBroker := make(map[*Broker][]int32)
+ for partition := range partitionOffsets {
+ broker, err := ca.client.Leader(topic, partition)
+ if err != nil {
+ errs = append(errs, err)
+ continue
+ }
+ partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
+ }
+ for broker, partitions := range partitionPerBroker {
+ recordsToDelete := make(map[int32]int64, len(partitions))
+ for _, p := range partitions {
+ recordsToDelete[p] = partitionOffsets[p]
+ }
+ topics := map[string]*DeleteRecordsRequestTopic{
+ topic: {
+ PartitionOffsets: recordsToDelete,
+ },
+ }
+ request := &DeleteRecordsRequest{
+ Topics: topics,
+ Timeout: ca.conf.Admin.Timeout,
+ }
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 1
+ }
+ rsp, err := broker.DeleteRecords(request)
+ if err != nil {
+ errs = append(errs, err)
+ continue
+ }
+
+ deleteRecordsResponseTopic, ok := rsp.Topics[topic]
+ if !ok {
+ errs = append(errs, ErrIncompleteResponse)
+ continue
+ }
+
+ for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
+ if !errors.Is(deleteRecordsResponsePartition.Err, ErrNoError) {
+ errs = append(errs, deleteRecordsResponsePartition.Err)
+ continue
+ }
+ }
+ }
+ if len(errs) > 0 {
+ return Wrap(ErrDeleteRecords, errs...)
+ }
+ // todo since we are dealing with couple of partitions it would be good if we return slice of errors
+ // for each partition instead of one error
+ return nil
+}
+
+// Returns a bool indicating whether the resource request needs to go to a
+// specific broker
+func dependsOnSpecificNode(resource ConfigResource) bool {
+ return (resource.Type == BrokerResource && resource.Name != "") ||
+ resource.Type == BrokerLoggerResource
+}
+
+func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
+ var entries []ConfigEntry
+ var resources []*ConfigResource
+ resources = append(resources, &resource)
+
+ request := &DescribeConfigsRequest{
+ Resources: resources,
+ }
+
+ if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+ request.Version = 1
+ }
+
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 2
+ }
+
+ var (
+ b *Broker
+ err error
+ )
+
+ // DescribeConfig of broker/broker logger must be sent to the broker in question
+ if dependsOnSpecificNode(resource) {
+ var id int64
+ id, err = strconv.ParseInt(resource.Name, 10, 32)
+ if err != nil {
+ return nil, err
+ }
+ b, err = ca.findBroker(int32(id))
+ } else {
+ b, err = ca.findAnyBroker()
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ _ = b.Open(ca.client.Config())
+ rsp, err := b.DescribeConfigs(request)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, rspResource := range rsp.Resources {
+ if rspResource.Name == resource.Name {
+ if rspResource.ErrorCode != 0 {
+ return nil, &DescribeConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg}
+ }
+ for _, cfgEntry := range rspResource.Configs {
+ entries = append(entries, *cfgEntry)
+ }
+ }
+ }
+ return entries, nil
+}
+
+func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
+ var resources []*AlterConfigsResource
+ resources = append(resources, &AlterConfigsResource{
+ Type: resourceType,
+ Name: name,
+ ConfigEntries: entries,
+ })
+
+ request := &AlterConfigsRequest{
+ Resources: resources,
+ ValidateOnly: validateOnly,
+ }
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 1
+ }
+
+ var (
+ b *Broker
+ err error
+ )
+
+ // AlterConfig of broker/broker logger must be sent to the broker in question
+ if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
+ var id int64
+ id, err = strconv.ParseInt(name, 10, 32)
+ if err != nil {
+ return err
+ }
+ b, err = ca.findBroker(int32(id))
+ } else {
+ b, err = ca.findAnyBroker()
+ }
+ if err != nil {
+ return err
+ }
+
+ _ = b.Open(ca.client.Config())
+ rsp, err := b.AlterConfigs(request)
+ if err != nil {
+ return err
+ }
+
+ for _, rspResource := range rsp.Resources {
+ if rspResource.Name == name {
+ if rspResource.ErrorCode != 0 {
+ return &AlterConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg}
+ }
+ }
+ }
+ return nil
+}
+
+func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error {
+ var resources []*IncrementalAlterConfigsResource
+ resources = append(resources, &IncrementalAlterConfigsResource{
+ Type: resourceType,
+ Name: name,
+ ConfigEntries: entries,
+ })
+
+ request := &IncrementalAlterConfigsRequest{
+ Resources: resources,
+ ValidateOnly: validateOnly,
+ }
+
+ if ca.conf.Version.IsAtLeast(V2_4_0_0) {
+ request.Version = 1
+ }
+
+ var (
+ b *Broker
+ err error
+ )
+
+ // AlterConfig of broker/broker logger must be sent to the broker in question
+ if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
+ var id int64
+ id, err = strconv.ParseInt(name, 10, 32)
+ if err != nil {
+ return err
+ }
+ b, err = ca.findBroker(int32(id))
+ } else {
+ b, err = ca.findAnyBroker()
+ }
+ if err != nil {
+ return err
+ }
+
+ _ = b.Open(ca.client.Config())
+ rsp, err := b.IncrementalAlterConfigs(request)
+ if err != nil {
+ return err
+ }
+
+ for _, rspResource := range rsp.Resources {
+ if rspResource.Name == name {
+ if rspResource.ErrorMsg != "" {
+ return errors.New(rspResource.ErrorMsg)
+ }
+ if rspResource.ErrorCode != 0 {
+ return KError(rspResource.ErrorCode)
+ }
+ }
+ }
+ return nil
+}
+
+func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
+ var acls []*AclCreation
+ acls = append(acls, &AclCreation{resource, acl})
+ request := &CreateAclsRequest{AclCreations: acls}
+
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 1
+ }
+
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+
+ _, err = b.CreateAcls(request)
+ return err
+}
+
+func (ca *clusterAdmin) CreateACLs(resourceACLs []*ResourceAcls) error {
+ var acls []*AclCreation
+ for _, resourceACL := range resourceACLs {
+ for _, acl := range resourceACL.Acls {
+ acls = append(acls, &AclCreation{resourceACL.Resource, *acl})
+ }
+ }
+ request := &CreateAclsRequest{AclCreations: acls}
+
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 1
+ }
+
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+
+ _, err = b.CreateAcls(request)
+ return err
+}
+
+func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
+ request := &DescribeAclsRequest{AclFilter: filter}
+
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 1
+ }
+
+ b, err := ca.Controller()
+ if err != nil {
+ return nil, err
+ }
+
+ rsp, err := b.DescribeAcls(request)
+ if err != nil {
+ return nil, err
+ }
+
+ var lAcls []ResourceAcls
+ for _, rAcl := range rsp.ResourceAcls {
+ lAcls = append(lAcls, *rAcl)
+ }
+ return lAcls, nil
+}
+
+func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
+ var filters []*AclFilter
+ filters = append(filters, &filter)
+ request := &DeleteAclsRequest{Filters: filters}
+
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 1
+ }
+
+ b, err := ca.Controller()
+ if err != nil {
+ return nil, err
+ }
+
+ rsp, err := b.DeleteAcls(request)
+ if err != nil {
+ return nil, err
+ }
+
+ var mAcls []MatchingAcl
+ for _, fr := range rsp.FilterResponses {
+ for _, mACL := range fr.MatchingAcls {
+ mAcls = append(mAcls, *mACL)
+ }
+ }
+ return mAcls, nil
+}
+
+func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[string][]int32) (map[string]map[int32]*PartitionResult, error) {
+ request := &ElectLeadersRequest{
+ Type: electionType,
+ TopicPartitions: partitions,
+ TimeoutMs: int32(60000),
+ }
+
+ if ca.conf.Version.IsAtLeast(V2_4_0_0) {
+ request.Version = 2
+ } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+ request.Version = 1
+ }
+
+ var res *ElectLeadersResponse
+ if err := ca.retryOnError(isRetriableControllerError, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+ _ = b.Open(ca.client.Config())
+
+ res, err = b.ElectLeaders(request)
+ if err != nil {
+ return err
+ }
+ if !errors.Is(res.ErrorCode, ErrNoError) {
+ if isRetriableControllerError(res.ErrorCode) {
+ _, _ = ca.refreshController()
+ }
+ return res.ErrorCode
+ }
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+ return res.ReplicaElectionResults, nil
+}
+
+func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
+ groupsPerBroker := make(map[*Broker][]string)
+
+ for _, group := range groups {
+ coordinator, err := ca.client.Coordinator(group)
+ if err != nil {
+ return nil, err
+ }
+ groupsPerBroker[coordinator] = append(groupsPerBroker[coordinator], group)
+ }
+
+ for broker, brokerGroups := range groupsPerBroker {
+ describeReq := &DescribeGroupsRequest{
+ Groups: brokerGroups,
+ }
+
+ if ca.conf.Version.IsAtLeast(V2_4_0_0) {
+ // Starting in version 4, the response will include group.instance.id info for members.
+ // Starting in version 5, the response uses flexible encoding
+ describeReq.Version = 5
+ } else if ca.conf.Version.IsAtLeast(V2_3_0_0) {
+ // Starting in version 3, authorized operations can be requested.
+ describeReq.Version = 3
+ } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ // Version 2 is the same as version 0.
+ describeReq.Version = 2
+ } else if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+ // Version 1 is the same as version 0.
+ describeReq.Version = 1
+ }
+ response, err := broker.DescribeGroups(describeReq)
+ if err != nil {
+ return nil, err
+ }
+
+ result = append(result, response.Groups...)
+ }
+ return result, nil
+}
+
+func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
+ allGroups = make(map[string]string)
+
+ // Query brokers in parallel, since we have to query *all* brokers
+ brokers := ca.client.Brokers()
+ groupMaps := make(chan map[string]string, len(brokers))
+ errChan := make(chan error, len(brokers))
+ wg := sync.WaitGroup{}
+
+ for _, b := range brokers {
+ wg.Add(1)
+ go func(b *Broker, conf *Config) {
+ defer wg.Done()
+ _ = b.Open(conf) // Ensure that broker is opened
+
+ request := &ListGroupsRequest{}
+ if ca.conf.Version.IsAtLeast(V3_8_0_0) {
+ // Version 5 adds the TypesFilter field (KIP-848).
+ request.Version = 5
+ } else if ca.conf.Version.IsAtLeast(V2_6_0_0) {
+ // Version 4 adds the StatesFilter field (KIP-518).
+ request.Version = 4
+ } else if ca.conf.Version.IsAtLeast(V2_4_0_0) {
+ // Version 3 is the first flexible version.
+ request.Version = 3
+ } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ // Version 2 is the same as version 0.
+ request.Version = 2
+ } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+ // Version 1 is the same as version 0.
+ request.Version = 1
+ }
+
+ response, err := b.ListGroups(request)
+ if err != nil {
+ errChan <- err
+ return
+ }
+
+ groupMaps <- maps.Clone(response.Groups)
+ }(b, ca.conf)
+ }
+
+ wg.Wait()
+ close(groupMaps)
+ close(errChan)
+
+ for groupMap := range groupMaps {
+ maps.Copy(allGroups, groupMap)
+ }
+
+ // Intentionally return only the first error for simplicity
+ err = <-errChan
+ return
+}
+
+func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
+ var response *OffsetFetchResponse
+ request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions)
+ err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
+ defer func() {
+ if err != nil && isRetriableGroupCoordinatorError(err) {
+ _ = ca.client.RefreshCoordinator(group)
+ }
+ }()
+
+ coordinator, err := ca.client.Coordinator(group)
+ if err != nil {
+ return err
+ }
+
+ response, err = coordinator.FetchOffset(request)
+ if err != nil {
+ return err
+ }
+ if !errors.Is(response.Err, ErrNoError) {
+ return response.Err
+ }
+
+ return nil
+ })
+
+ return response, err
+}
+
+func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error {
+ var response *DeleteOffsetsResponse
+ request := &DeleteOffsetsRequest{
+ Group: group,
+ partitions: map[string][]int32{
+ topic: {partition},
+ },
+ }
+
+ return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
+ defer func() {
+ if err != nil && isRetriableGroupCoordinatorError(err) {
+ _ = ca.client.RefreshCoordinator(group)
+ }
+ }()
+
+ coordinator, err := ca.client.Coordinator(group)
+ if err != nil {
+ return err
+ }
+
+ response, err = coordinator.DeleteOffsets(request)
+ if err != nil {
+ return err
+ }
+ if !errors.Is(response.ErrorCode, ErrNoError) {
+ return response.ErrorCode
+ }
+ if !errors.Is(response.Errors[topic][partition], ErrNoError) {
+ return response.Errors[topic][partition]
+ }
+
+ return nil
+ })
+}
+
+func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
+ var response *DeleteGroupsResponse
+ request := &DeleteGroupsRequest{
+ Groups: []string{group},
+ }
+
+ if ca.conf.Version.IsAtLeast(V2_4_0_0) {
+ request.Version = 2
+ } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 1
+ }
+
+ return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
+ defer func() {
+ if err != nil && isRetriableGroupCoordinatorError(err) {
+ _ = ca.client.RefreshCoordinator(group)
+ }
+ }()
+
+ coordinator, err := ca.client.Coordinator(group)
+ if err != nil {
+ return err
+ }
+
+ response, err = coordinator.DeleteGroups(request)
+ if err != nil {
+ return err
+ }
+
+ groupErr, ok := response.GroupErrorCodes[group]
+ if !ok {
+ return ErrIncompleteResponse
+ }
+
+ if !errors.Is(groupErr, ErrNoError) {
+ return groupErr
+ }
+
+ return nil
+ })
+}
+
+func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
+ type result struct {
+ id int32
+ logdirs []DescribeLogDirsResponseDirMetadata
+ }
+ // Query brokers in parallel, since we may have to query multiple brokers
+ logDirsResults := make(chan result, len(brokerIds))
+ errChan := make(chan error, len(brokerIds))
+ wg := sync.WaitGroup{}
+
+ for _, b := range brokerIds {
+ broker, err := ca.findBroker(b)
+ if err != nil {
+ Logger.Printf("Unable to find broker with ID = %v\n", b)
+ continue
+ }
+ wg.Add(1)
+ go func(b *Broker, conf *Config) {
+ defer wg.Done()
+ _ = b.Open(conf) // Ensure that broker is opened
+
+ request := &DescribeLogDirsRequest{}
+ if ca.conf.Version.IsAtLeast(V3_3_0_0) {
+ request.Version = 4
+ } else if ca.conf.Version.IsAtLeast(V3_2_0_0) {
+ request.Version = 3
+ } else if ca.conf.Version.IsAtLeast(V2_6_0_0) {
+ request.Version = 2
+ } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 1
+ }
+ response, err := b.DescribeLogDirs(request)
+ if err != nil {
+ errChan <- err
+ return
+ }
+ if !errors.Is(response.ErrorCode, ErrNoError) {
+ errChan <- response.ErrorCode
+ return
+ }
+ logDirsResults <- result{id: b.ID(), logdirs: response.LogDirs}
+ }(broker, ca.conf)
+ }
+
+ wg.Wait()
+ close(logDirsResults)
+ close(errChan)
+
+ allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
+ for logDirsResult := range logDirsResults {
+ allLogDirs[logDirsResult.id] = logDirsResult.logdirs
+ }
+
+ // Intentionally return only the first error for simplicity
+ err = <-errChan
+ return
+}
+
+func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
+ req := &DescribeUserScramCredentialsRequest{}
+ for _, u := range users {
+ req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
+ Name: u,
+ })
+ }
+
+ b, err := ca.Controller()
+ if err != nil {
+ return nil, err
+ }
+
+ rsp, err := b.DescribeUserScramCredentials(req)
+ if err != nil {
+ return nil, err
+ }
+
+ return rsp.Results, nil
+}
+
+func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
+ res, err := ca.AlterUserScramCredentials(upsert, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ return res, nil
+}
+
+func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
+ res, err := ca.AlterUserScramCredentials(nil, delete)
+ if err != nil {
+ return nil, err
+ }
+
+ return res, nil
+}
+
+func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
+ req := &AlterUserScramCredentialsRequest{
+ Deletions: d,
+ Upsertions: u,
+ }
+
+ var rsp *AlterUserScramCredentialsResponse
+ err := ca.retryOnError(isRetriableControllerError, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+
+ rsp, err = b.AlterUserScramCredentials(req)
+ return err
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return rsp.Results, nil
+}
+
+// Describe All : use an empty/nil components slice + strict = false
+// Contains components: strict = false
+// Contains only components: strict = true
+func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) {
+ request := NewDescribeClientQuotasRequest(
+ ca.conf.Version,
+ components,
+ strict,
+ )
+
+ b, err := ca.Controller()
+ if err != nil {
+ return nil, err
+ }
+
+ rsp, err := b.DescribeClientQuotas(request)
+ if err != nil {
+ return nil, err
+ }
+
+ if rsp.ErrorMsg != nil && len(*rsp.ErrorMsg) > 0 {
+ return nil, errors.New(*rsp.ErrorMsg)
+ }
+ if !errors.Is(rsp.ErrorCode, ErrNoError) {
+ return nil, rsp.ErrorCode
+ }
+
+ return rsp.Entries, nil
+}
+
+func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error {
+ entry := AlterClientQuotasEntry{
+ Entity: entity,
+ Ops: []ClientQuotasOp{op},
+ }
+
+ request := &AlterClientQuotasRequest{
+ Entries: []AlterClientQuotasEntry{entry},
+ ValidateOnly: validateOnly,
+ }
+
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+
+ rsp, err := b.AlterClientQuotas(request)
+ if err != nil {
+ return err
+ }
+
+ for _, entry := range rsp.Entries {
+ if entry.ErrorMsg != nil && len(*entry.ErrorMsg) > 0 {
+ return errors.New(*entry.ErrorMsg)
+ }
+ if !errors.Is(entry.ErrorCode, ErrNoError) {
+ return entry.ErrorCode
+ }
+ }
+
+ return nil
+}
+
+func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
+ if !ca.conf.Version.IsAtLeast(V2_4_0_0) {
+ return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0")
+ }
+ var response *LeaveGroupResponse
+ request := &LeaveGroupRequest{
+ Version: 3,
+ GroupId: group,
+ }
+ for _, instanceId := range groupInstanceIds {
+ groupInstanceId := instanceId
+ request.Members = append(request.Members, MemberIdentity{
+ GroupInstanceId: &groupInstanceId,
+ })
+ }
+ err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
+ defer func() {
+ if err != nil && isRetriableGroupCoordinatorError(err) {
+ _ = ca.client.RefreshCoordinator(group)
+ }
+ }()
+
+ coordinator, err := ca.client.Coordinator(group)
+ if err != nil {
+ return err
+ }
+
+ response, err = coordinator.LeaveGroup(request)
+ if err != nil {
+ return err
+ }
+ if !errors.Is(response.Err, ErrNoError) {
+ return response.Err
+ }
+
+ return nil
+ })
+
+ return response, err
+}