blob: dda0201f6059d2ed401adf59344bb44d892eb312 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "errors"
5 "fmt"
6 "io"
7 "maps"
8 "math/rand"
9 "strconv"
10 "sync"
11 "time"
12)
13
14// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
15// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
16// Methods with stricter requirements will specify the minimum broker version required.
17// You MUST call Close() on a client to avoid leaks
18type ClusterAdmin interface {
19 // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
20 // It may take several seconds after CreateTopic returns success for all the brokers
21 // to become aware that the topic has been created. During this time, listTopics
22 // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
23 CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
24
25 // List the topics available in the cluster with the default options.
26 ListTopics() (map[string]TopicDetail, error)
27
28 // Describe some topics in the cluster.
29 DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
30
31 // Delete a topic. It may take several seconds after the DeleteTopic to returns success
32 // and for all the brokers to become aware that the topics are gone.
33 // During this time, listTopics may continue to return information about the deleted topic.
34 // If delete.topic.enable is false on the brokers, deleteTopic will mark
35 // the topic for deletion, but not actually delete them.
36 // This operation is supported by brokers with version 0.10.1.0 or higher.
37 DeleteTopic(topic string) error
38
39 // Increase the number of partitions of the topics according to the corresponding values.
40 // If partitions are increased for a topic that has a key, the partition logic or ordering of
41 // the messages will be affected. It may take several seconds after this method returns
42 // success for all the brokers to become aware that the partitions have been created.
43 // During this time, ClusterAdmin#describeTopics may not return information about the
44 // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
45 CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
46
47 // Alter the replica assignment for partitions.
48 // This operation is supported by brokers with version 2.4.0.0 or higher.
49 AlterPartitionReassignments(topic string, assignment [][]int32) error
50
51 // Provides info on ongoing partitions replica reassignments.
52 // This operation is supported by brokers with version 2.4.0.0 or higher.
53 ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
54
55 // Delete records whose offset is smaller than the given offset of the corresponding partition.
56 // This operation is supported by brokers with version 0.11.0.0 or higher.
57 DeleteRecords(topic string, partitionOffsets map[int32]int64) error
58
59 // Get the configuration for the specified resources.
60 // The returned configuration includes default values and the Default is true
61 // can be used to distinguish them from user supplied values.
62 // Config entries where ReadOnly is true cannot be updated.
63 // The value of config entries where Sensitive is true is always nil so
64 // sensitive information is not disclosed.
65 // This operation is supported by brokers with version 0.11.0.0 or higher.
66 DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
67
68 // Update the configuration for the specified resources with the default options.
69 // This operation is supported by brokers with version 0.11.0.0 or higher.
70 // The resources with their configs (topic is the only resource type with configs
71 // that can be updated currently Updates are not transactional so they may succeed
72 // for some resources while fail for others. The configs for a particular resource are updated automatically.
73 AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
74
75 // IncrementalAlterConfig Incrementally Update the configuration for the specified resources with the default options.
76 // This operation is supported by brokers with version 2.3.0.0 or higher.
77 // Updates are not transactional so they may succeed for some resources while fail for others.
78 // The configs for a particular resource are updated automatically.
79 IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error
80
81 // Creates an access control list (ACL) which is bound to a specific resource.
82 // This operation is not transactional so it may succeed or fail.
83 // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
84 // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
85 // Deprecated: Use CreateACLs instead.
86 CreateACL(resource Resource, acl Acl) error
87
88 // Creates access control lists (ACLs) which are bound to specific resources.
89 // This operation is not transactional so it may succeed for some ACLs while fail for others.
90 // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
91 // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
92 CreateACLs([]*ResourceAcls) error
93
94 // Lists access control lists (ACLs) according to the supplied filter.
95 // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
96 // This operation is supported by brokers with version 0.11.0.0 or higher.
97 ListAcls(filter AclFilter) ([]ResourceAcls, error)
98
99 // Deletes access control lists (ACLs) according to the supplied filters.
100 // This operation is not transactional so it may succeed for some ACLs while fail for others.
101 // This operation is supported by brokers with version 0.11.0.0 or higher.
102 DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
103
104 // ElectLeaders allows to trigger the election of preferred leaders for a set of partitions.
105 ElectLeaders(ElectionType, map[string][]int32) (map[string]map[int32]*PartitionResult, error)
106
107 // List the consumer groups available in the cluster.
108 ListConsumerGroups() (map[string]string, error)
109
110 // Describe the given consumer groups.
111 DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
112
113 // List the consumer group offsets available in the cluster.
114 ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
115
116 // Deletes a consumer group offset
117 DeleteConsumerGroupOffset(group string, topic string, partition int32) error
118
119 // Delete a consumer group.
120 DeleteConsumerGroup(group string) error
121
122 // Get information about the nodes in the cluster
123 DescribeCluster() (brokers []*Broker, controllerID int32, err error)
124
125 // Get information about all log directories on the given set of brokers
126 DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
127
128 // Get information about SCRAM users
129 DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)
130
131 // Delete SCRAM users
132 DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)
133
134 // Upsert SCRAM users
135 UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
136
137 // Get client quota configurations corresponding to the specified filter.
138 // This operation is supported by brokers with version 2.6.0.0 or higher.
139 DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error)
140
141 // Alters client quota configurations with the specified alterations.
142 // This operation is supported by brokers with version 2.6.0.0 or higher.
143 AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error
144
145 // Controller returns the cluster controller broker. It will return a
146 // locally cached value if it's available.
147 Controller() (*Broker, error)
148
149 // Coordinator returns the coordinating broker for a consumer group. It will
150 // return a locally cached value if it's available.
151 Coordinator(group string) (*Broker, error)
152
153 // Remove members from the consumer group by given member identities.
154 // This operation is supported by brokers with version 2.3 or higher
155 // This is for static membership feature. KIP-345
156 RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error)
157
158 // Close shuts down the admin and closes underlying client.
159 Close() error
160}
161
162type clusterAdmin struct {
163 client Client
164 conf *Config
165}
166
167// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
168func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
169 client, err := NewClient(addrs, conf)
170 if err != nil {
171 return nil, err
172 }
173 admin, err := NewClusterAdminFromClient(client)
174 if err != nil {
175 client.Close()
176 }
177 return admin, err
178}
179
180// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
181// Note that underlying client will also be closed on admin's Close() call.
182func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
183 // make sure we can retrieve the controller
184 _, err := client.Controller()
185 if err != nil {
186 return nil, err
187 }
188
189 ca := &clusterAdmin{
190 client: client,
191 conf: client.Config(),
192 }
193 return ca, nil
194}
195
196func (ca *clusterAdmin) Close() error {
197 return ca.client.Close()
198}
199
200func (ca *clusterAdmin) Controller() (*Broker, error) {
201 return ca.client.Controller()
202}
203
204func (ca *clusterAdmin) Coordinator(group string) (*Broker, error) {
205 return ca.client.Coordinator(group)
206}
207
208func (ca *clusterAdmin) refreshController() (*Broker, error) {
209 return ca.client.RefreshController()
210}
211
212// isRetriableControllerError returns `true` if the given error type unwraps to
213// an `ErrNotController` or `EOF` response from Kafka
214func isRetriableControllerError(err error) bool {
215 return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF)
216}
217
218// isRetriableGroupCoordinatorError returns `true` if the given error type
219// unwraps to an `ErrNotCoordinatorForConsumer`,
220// `ErrConsumerCoordinatorNotAvailable` or `EOF` response from Kafka
221func isRetriableGroupCoordinatorError(err error) bool {
222 return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF)
223}
224
225// retryOnError will repeatedly call the given (error-returning) func in the
226// case that its response is non-nil and retryable (as determined by the
227// provided retryable func) up to the maximum number of tries permitted by
228// the admin client configuration
229func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
230 for attemptsRemaining := ca.conf.Admin.Retry.Max + 1; ; {
231 err := fn()
232 attemptsRemaining--
233 if err == nil || attemptsRemaining <= 0 || !retryable(err) {
234 return err
235 }
236 Logger.Printf(
237 "admin/request retrying after %dms... (%d attempts remaining)\n",
238 ca.conf.Admin.Retry.Backoff/time.Millisecond, attemptsRemaining)
239 time.Sleep(ca.conf.Admin.Retry.Backoff)
240 }
241}
242
243func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
244 if topic == "" {
245 return ErrInvalidTopic
246 }
247
248 if detail == nil {
249 return errors.New("you must specify topic details")
250 }
251
252 topicDetails := map[string]*TopicDetail{
253 topic: detail,
254 }
255
256 request := NewCreateTopicsRequest(
257 ca.conf.Version,
258 topicDetails,
259 ca.conf.Admin.Timeout,
260 validateOnly,
261 )
262
263 return ca.retryOnError(isRetriableControllerError, func() error {
264 b, err := ca.Controller()
265 if err != nil {
266 return err
267 }
268
269 rsp, err := b.CreateTopics(request)
270 if err != nil {
271 return err
272 }
273
274 topicErr, ok := rsp.TopicErrors[topic]
275 if !ok {
276 return ErrIncompleteResponse
277 }
278
279 if !errors.Is(topicErr.Err, ErrNoError) {
280 if isRetriableControllerError(topicErr.Err) {
281 _, _ = ca.refreshController()
282 }
283 return topicErr
284 }
285
286 return nil
287 })
288}
289
290func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
291 var response *MetadataResponse
292 err = ca.retryOnError(isRetriableControllerError, func() error {
293 controller, err := ca.Controller()
294 if err != nil {
295 return err
296 }
297 request := NewMetadataRequest(ca.conf.Version, topics)
298 response, err = controller.GetMetadata(request)
299 if isRetriableControllerError(err) {
300 _, _ = ca.refreshController()
301 }
302 return err
303 })
304 if err != nil {
305 return nil, err
306 }
307 return response.Topics, nil
308}
309
310func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
311 var response *MetadataResponse
312 err = ca.retryOnError(isRetriableControllerError, func() error {
313 controller, err := ca.Controller()
314 if err != nil {
315 return err
316 }
317
318 request := NewMetadataRequest(ca.conf.Version, nil)
319 response, err = controller.GetMetadata(request)
320 if isRetriableControllerError(err) {
321 _, _ = ca.refreshController()
322 }
323 return err
324 })
325 if err != nil {
326 return nil, int32(0), err
327 }
328
329 return response.Brokers, response.ControllerID, nil
330}
331
332func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
333 brokers := ca.client.Brokers()
334 for _, b := range brokers {
335 if b.ID() == id {
336 return b, nil
337 }
338 }
339 return nil, fmt.Errorf("could not find broker id %d", id)
340}
341
342func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
343 brokers := ca.client.Brokers()
344 if len(brokers) > 0 {
345 index := rand.Intn(len(brokers))
346 return brokers[index], nil
347 }
348 return nil, errors.New("no available broker")
349}
350
351func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
352 // In order to build TopicDetails we need to first get the list of all
353 // topics using a MetadataRequest and then get their configs using a
354 // DescribeConfigsRequest request. To avoid sending many requests to the
355 // broker, we use a single DescribeConfigsRequest.
356
357 // Send the all-topic MetadataRequest
358 b, err := ca.findAnyBroker()
359 if err != nil {
360 return nil, err
361 }
362 _ = b.Open(ca.client.Config())
363
364 metadataReq := NewMetadataRequest(ca.conf.Version, nil)
365 metadataResp, err := b.GetMetadata(metadataReq)
366 if err != nil {
367 return nil, err
368 }
369
370 topicsDetailsMap := make(map[string]TopicDetail, len(metadataResp.Topics))
371
372 var describeConfigsResources []*ConfigResource
373
374 for _, topic := range metadataResp.Topics {
375 topicDetails := TopicDetail{
376 NumPartitions: int32(len(topic.Partitions)),
377 }
378 if len(topic.Partitions) > 0 {
379 topicDetails.ReplicaAssignment = make(map[int32][]int32, len(topic.Partitions))
380 for _, partition := range topic.Partitions {
381 topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
382 }
383 topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
384 }
385 topicsDetailsMap[topic.Name] = topicDetails
386
387 // we populate the resources we want to describe from the MetadataResponse
388 topicResource := ConfigResource{
389 Type: TopicResource,
390 Name: topic.Name,
391 }
392 describeConfigsResources = append(describeConfigsResources, &topicResource)
393 }
394
395 // Send the DescribeConfigsRequest
396 describeConfigsReq := &DescribeConfigsRequest{
397 Resources: describeConfigsResources,
398 }
399
400 if ca.conf.Version.IsAtLeast(V1_1_0_0) {
401 describeConfigsReq.Version = 1
402 }
403
404 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
405 describeConfigsReq.Version = 2
406 }
407
408 describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
409 if err != nil {
410 return nil, err
411 }
412
413 for _, resource := range describeConfigsResp.Resources {
414 topicDetails := topicsDetailsMap[resource.Name]
415 topicDetails.ConfigEntries = make(map[string]*string)
416
417 for _, entry := range resource.Configs {
418 // only include non-default non-sensitive config
419 // (don't actually think topic config will ever be sensitive)
420 if entry.Default || entry.Sensitive {
421 continue
422 }
423 topicDetails.ConfigEntries[entry.Name] = &entry.Value
424 }
425
426 topicsDetailsMap[resource.Name] = topicDetails
427 }
428
429 return topicsDetailsMap, nil
430}
431
432func (ca *clusterAdmin) DeleteTopic(topic string) error {
433 if topic == "" {
434 return ErrInvalidTopic
435 }
436
437 request := &DeleteTopicsRequest{
438 Topics: []string{topic},
439 Timeout: ca.conf.Admin.Timeout,
440 }
441
442 // Versions 0, 1, 2, and 3 are the same.
443 // Version 4 is first flexible version.
444 if ca.conf.Version.IsAtLeast(V2_4_0_0) {
445 request.Version = 4
446 } else if ca.conf.Version.IsAtLeast(V2_1_0_0) {
447 request.Version = 3
448 } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
449 request.Version = 2
450 } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
451 request.Version = 1
452 }
453
454 return ca.retryOnError(isRetriableControllerError, func() error {
455 b, err := ca.Controller()
456 if err != nil {
457 return err
458 }
459
460 rsp, err := b.DeleteTopics(request)
461 if err != nil {
462 return err
463 }
464
465 topicErr, ok := rsp.TopicErrorCodes[topic]
466 if !ok {
467 return ErrIncompleteResponse
468 }
469
470 if !errors.Is(topicErr, ErrNoError) {
471 if errors.Is(topicErr, ErrNotController) {
472 _, _ = ca.refreshController()
473 }
474 return topicErr
475 }
476
477 return nil
478 })
479}
480
481func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
482 if topic == "" {
483 return ErrInvalidTopic
484 }
485
486 topicPartitions := map[string]*TopicPartition{
487 topic: {
488 Count: count,
489 Assignment: assignment,
490 },
491 }
492
493 request := &CreatePartitionsRequest{
494 TopicPartitions: topicPartitions,
495 Timeout: ca.conf.Admin.Timeout,
496 ValidateOnly: validateOnly,
497 }
498 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
499 request.Version = 1
500 }
501
502 return ca.retryOnError(isRetriableControllerError, func() error {
503 b, err := ca.Controller()
504 if err != nil {
505 return err
506 }
507
508 rsp, err := b.CreatePartitions(request)
509 if err != nil {
510 return err
511 }
512
513 topicErr, ok := rsp.TopicPartitionErrors[topic]
514 if !ok {
515 return ErrIncompleteResponse
516 }
517
518 if !errors.Is(topicErr.Err, ErrNoError) {
519 if errors.Is(topicErr.Err, ErrNotController) {
520 _, _ = ca.refreshController()
521 }
522 return topicErr
523 }
524
525 return nil
526 })
527}
528
529func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
530 if topic == "" {
531 return ErrInvalidTopic
532 }
533
534 request := &AlterPartitionReassignmentsRequest{
535 TimeoutMs: int32(60000),
536 Version: int16(0),
537 }
538
539 for i := 0; i < len(assignment); i++ {
540 request.AddBlock(topic, int32(i), assignment[i])
541 }
542
543 return ca.retryOnError(isRetriableControllerError, func() error {
544 b, err := ca.Controller()
545 if err != nil {
546 return err
547 }
548
549 errs := make([]error, 0)
550
551 rsp, err := b.AlterPartitionReassignments(request)
552
553 if err != nil {
554 errs = append(errs, err)
555 } else {
556 if rsp.ErrorCode > 0 {
557 errs = append(errs, rsp.ErrorCode)
558 }
559
560 for topic, topicErrors := range rsp.Errors {
561 for partition, partitionError := range topicErrors {
562 if !errors.Is(partitionError.errorCode, ErrNoError) {
563 errs = append(errs, fmt.Errorf("[%s-%d]: %w", topic, partition, partitionError.errorCode))
564 }
565 }
566 }
567 }
568
569 if len(errs) > 0 {
570 return Wrap(ErrReassignPartitions, errs...)
571 }
572
573 return nil
574 })
575}
576
577func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
578 if topic == "" {
579 return nil, ErrInvalidTopic
580 }
581
582 request := &ListPartitionReassignmentsRequest{
583 TimeoutMs: int32(60000),
584 Version: int16(0),
585 }
586
587 request.AddBlock(topic, partitions)
588
589 var rsp *ListPartitionReassignmentsResponse
590 err = ca.retryOnError(isRetriableControllerError, func() error {
591 b, err := ca.Controller()
592 if err != nil {
593 return err
594 }
595 _ = b.Open(ca.client.Config())
596
597 rsp, err = b.ListPartitionReassignments(request)
598 if isRetriableControllerError(err) {
599 _, _ = ca.refreshController()
600 }
601 return err
602 })
603
604 if err == nil && rsp != nil {
605 return rsp.TopicStatus, nil
606 } else {
607 return nil, err
608 }
609}
610
611func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
612 if topic == "" {
613 return ErrInvalidTopic
614 }
615 errs := make([]error, 0)
616 partitionPerBroker := make(map[*Broker][]int32)
617 for partition := range partitionOffsets {
618 broker, err := ca.client.Leader(topic, partition)
619 if err != nil {
620 errs = append(errs, err)
621 continue
622 }
623 partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
624 }
625 for broker, partitions := range partitionPerBroker {
626 recordsToDelete := make(map[int32]int64, len(partitions))
627 for _, p := range partitions {
628 recordsToDelete[p] = partitionOffsets[p]
629 }
630 topics := map[string]*DeleteRecordsRequestTopic{
631 topic: {
632 PartitionOffsets: recordsToDelete,
633 },
634 }
635 request := &DeleteRecordsRequest{
636 Topics: topics,
637 Timeout: ca.conf.Admin.Timeout,
638 }
639 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
640 request.Version = 1
641 }
642 rsp, err := broker.DeleteRecords(request)
643 if err != nil {
644 errs = append(errs, err)
645 continue
646 }
647
648 deleteRecordsResponseTopic, ok := rsp.Topics[topic]
649 if !ok {
650 errs = append(errs, ErrIncompleteResponse)
651 continue
652 }
653
654 for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
655 if !errors.Is(deleteRecordsResponsePartition.Err, ErrNoError) {
656 errs = append(errs, deleteRecordsResponsePartition.Err)
657 continue
658 }
659 }
660 }
661 if len(errs) > 0 {
662 return Wrap(ErrDeleteRecords, errs...)
663 }
664 // todo since we are dealing with couple of partitions it would be good if we return slice of errors
665 // for each partition instead of one error
666 return nil
667}
668
669// Returns a bool indicating whether the resource request needs to go to a
670// specific broker
671func dependsOnSpecificNode(resource ConfigResource) bool {
672 return (resource.Type == BrokerResource && resource.Name != "") ||
673 resource.Type == BrokerLoggerResource
674}
675
676func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
677 var entries []ConfigEntry
678 var resources []*ConfigResource
679 resources = append(resources, &resource)
680
681 request := &DescribeConfigsRequest{
682 Resources: resources,
683 }
684
685 if ca.conf.Version.IsAtLeast(V1_1_0_0) {
686 request.Version = 1
687 }
688
689 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
690 request.Version = 2
691 }
692
693 var (
694 b *Broker
695 err error
696 )
697
698 // DescribeConfig of broker/broker logger must be sent to the broker in question
699 if dependsOnSpecificNode(resource) {
700 var id int64
701 id, err = strconv.ParseInt(resource.Name, 10, 32)
702 if err != nil {
703 return nil, err
704 }
705 b, err = ca.findBroker(int32(id))
706 } else {
707 b, err = ca.findAnyBroker()
708 }
709 if err != nil {
710 return nil, err
711 }
712
713 _ = b.Open(ca.client.Config())
714 rsp, err := b.DescribeConfigs(request)
715 if err != nil {
716 return nil, err
717 }
718
719 for _, rspResource := range rsp.Resources {
720 if rspResource.Name == resource.Name {
721 if rspResource.ErrorCode != 0 {
722 return nil, &DescribeConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg}
723 }
724 for _, cfgEntry := range rspResource.Configs {
725 entries = append(entries, *cfgEntry)
726 }
727 }
728 }
729 return entries, nil
730}
731
732func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
733 var resources []*AlterConfigsResource
734 resources = append(resources, &AlterConfigsResource{
735 Type: resourceType,
736 Name: name,
737 ConfigEntries: entries,
738 })
739
740 request := &AlterConfigsRequest{
741 Resources: resources,
742 ValidateOnly: validateOnly,
743 }
744 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
745 request.Version = 1
746 }
747
748 var (
749 b *Broker
750 err error
751 )
752
753 // AlterConfig of broker/broker logger must be sent to the broker in question
754 if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
755 var id int64
756 id, err = strconv.ParseInt(name, 10, 32)
757 if err != nil {
758 return err
759 }
760 b, err = ca.findBroker(int32(id))
761 } else {
762 b, err = ca.findAnyBroker()
763 }
764 if err != nil {
765 return err
766 }
767
768 _ = b.Open(ca.client.Config())
769 rsp, err := b.AlterConfigs(request)
770 if err != nil {
771 return err
772 }
773
774 for _, rspResource := range rsp.Resources {
775 if rspResource.Name == name {
776 if rspResource.ErrorCode != 0 {
777 return &AlterConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg}
778 }
779 }
780 }
781 return nil
782}
783
784func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error {
785 var resources []*IncrementalAlterConfigsResource
786 resources = append(resources, &IncrementalAlterConfigsResource{
787 Type: resourceType,
788 Name: name,
789 ConfigEntries: entries,
790 })
791
792 request := &IncrementalAlterConfigsRequest{
793 Resources: resources,
794 ValidateOnly: validateOnly,
795 }
796
797 if ca.conf.Version.IsAtLeast(V2_4_0_0) {
798 request.Version = 1
799 }
800
801 var (
802 b *Broker
803 err error
804 )
805
806 // AlterConfig of broker/broker logger must be sent to the broker in question
807 if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
808 var id int64
809 id, err = strconv.ParseInt(name, 10, 32)
810 if err != nil {
811 return err
812 }
813 b, err = ca.findBroker(int32(id))
814 } else {
815 b, err = ca.findAnyBroker()
816 }
817 if err != nil {
818 return err
819 }
820
821 _ = b.Open(ca.client.Config())
822 rsp, err := b.IncrementalAlterConfigs(request)
823 if err != nil {
824 return err
825 }
826
827 for _, rspResource := range rsp.Resources {
828 if rspResource.Name == name {
829 if rspResource.ErrorMsg != "" {
830 return errors.New(rspResource.ErrorMsg)
831 }
832 if rspResource.ErrorCode != 0 {
833 return KError(rspResource.ErrorCode)
834 }
835 }
836 }
837 return nil
838}
839
840func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
841 var acls []*AclCreation
842 acls = append(acls, &AclCreation{resource, acl})
843 request := &CreateAclsRequest{AclCreations: acls}
844
845 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
846 request.Version = 1
847 }
848
849 b, err := ca.Controller()
850 if err != nil {
851 return err
852 }
853
854 _, err = b.CreateAcls(request)
855 return err
856}
857
858func (ca *clusterAdmin) CreateACLs(resourceACLs []*ResourceAcls) error {
859 var acls []*AclCreation
860 for _, resourceACL := range resourceACLs {
861 for _, acl := range resourceACL.Acls {
862 acls = append(acls, &AclCreation{resourceACL.Resource, *acl})
863 }
864 }
865 request := &CreateAclsRequest{AclCreations: acls}
866
867 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
868 request.Version = 1
869 }
870
871 b, err := ca.Controller()
872 if err != nil {
873 return err
874 }
875
876 _, err = b.CreateAcls(request)
877 return err
878}
879
880func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
881 request := &DescribeAclsRequest{AclFilter: filter}
882
883 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
884 request.Version = 1
885 }
886
887 b, err := ca.Controller()
888 if err != nil {
889 return nil, err
890 }
891
892 rsp, err := b.DescribeAcls(request)
893 if err != nil {
894 return nil, err
895 }
896
897 var lAcls []ResourceAcls
898 for _, rAcl := range rsp.ResourceAcls {
899 lAcls = append(lAcls, *rAcl)
900 }
901 return lAcls, nil
902}
903
904func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
905 var filters []*AclFilter
906 filters = append(filters, &filter)
907 request := &DeleteAclsRequest{Filters: filters}
908
909 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
910 request.Version = 1
911 }
912
913 b, err := ca.Controller()
914 if err != nil {
915 return nil, err
916 }
917
918 rsp, err := b.DeleteAcls(request)
919 if err != nil {
920 return nil, err
921 }
922
923 var mAcls []MatchingAcl
924 for _, fr := range rsp.FilterResponses {
925 for _, mACL := range fr.MatchingAcls {
926 mAcls = append(mAcls, *mACL)
927 }
928 }
929 return mAcls, nil
930}
931
932func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[string][]int32) (map[string]map[int32]*PartitionResult, error) {
933 request := &ElectLeadersRequest{
934 Type: electionType,
935 TopicPartitions: partitions,
936 TimeoutMs: int32(60000),
937 }
938
939 if ca.conf.Version.IsAtLeast(V2_4_0_0) {
940 request.Version = 2
941 } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
942 request.Version = 1
943 }
944
945 var res *ElectLeadersResponse
946 if err := ca.retryOnError(isRetriableControllerError, func() error {
947 b, err := ca.Controller()
948 if err != nil {
949 return err
950 }
951 _ = b.Open(ca.client.Config())
952
953 res, err = b.ElectLeaders(request)
954 if err != nil {
955 return err
956 }
957 if !errors.Is(res.ErrorCode, ErrNoError) {
958 if isRetriableControllerError(res.ErrorCode) {
959 _, _ = ca.refreshController()
960 }
961 return res.ErrorCode
962 }
963 return nil
964 }); err != nil {
965 return nil, err
966 }
967 return res.ReplicaElectionResults, nil
968}
969
970func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
971 groupsPerBroker := make(map[*Broker][]string)
972
973 for _, group := range groups {
974 coordinator, err := ca.client.Coordinator(group)
975 if err != nil {
976 return nil, err
977 }
978 groupsPerBroker[coordinator] = append(groupsPerBroker[coordinator], group)
979 }
980
981 for broker, brokerGroups := range groupsPerBroker {
982 describeReq := &DescribeGroupsRequest{
983 Groups: brokerGroups,
984 }
985
986 if ca.conf.Version.IsAtLeast(V2_4_0_0) {
987 // Starting in version 4, the response will include group.instance.id info for members.
988 // Starting in version 5, the response uses flexible encoding
989 describeReq.Version = 5
990 } else if ca.conf.Version.IsAtLeast(V2_3_0_0) {
991 // Starting in version 3, authorized operations can be requested.
992 describeReq.Version = 3
993 } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
994 // Version 2 is the same as version 0.
995 describeReq.Version = 2
996 } else if ca.conf.Version.IsAtLeast(V1_1_0_0) {
997 // Version 1 is the same as version 0.
998 describeReq.Version = 1
999 }
1000 response, err := broker.DescribeGroups(describeReq)
1001 if err != nil {
1002 return nil, err
1003 }
1004
1005 result = append(result, response.Groups...)
1006 }
1007 return result, nil
1008}
1009
1010func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
1011 allGroups = make(map[string]string)
1012
1013 // Query brokers in parallel, since we have to query *all* brokers
1014 brokers := ca.client.Brokers()
1015 groupMaps := make(chan map[string]string, len(brokers))
1016 errChan := make(chan error, len(brokers))
1017 wg := sync.WaitGroup{}
1018
1019 for _, b := range brokers {
1020 wg.Add(1)
1021 go func(b *Broker, conf *Config) {
1022 defer wg.Done()
1023 _ = b.Open(conf) // Ensure that broker is opened
1024
1025 request := &ListGroupsRequest{}
1026 if ca.conf.Version.IsAtLeast(V3_8_0_0) {
1027 // Version 5 adds the TypesFilter field (KIP-848).
1028 request.Version = 5
1029 } else if ca.conf.Version.IsAtLeast(V2_6_0_0) {
1030 // Version 4 adds the StatesFilter field (KIP-518).
1031 request.Version = 4
1032 } else if ca.conf.Version.IsAtLeast(V2_4_0_0) {
1033 // Version 3 is the first flexible version.
1034 request.Version = 3
1035 } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
1036 // Version 2 is the same as version 0.
1037 request.Version = 2
1038 } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
1039 // Version 1 is the same as version 0.
1040 request.Version = 1
1041 }
1042
1043 response, err := b.ListGroups(request)
1044 if err != nil {
1045 errChan <- err
1046 return
1047 }
1048
1049 groupMaps <- maps.Clone(response.Groups)
1050 }(b, ca.conf)
1051 }
1052
1053 wg.Wait()
1054 close(groupMaps)
1055 close(errChan)
1056
1057 for groupMap := range groupMaps {
1058 maps.Copy(allGroups, groupMap)
1059 }
1060
1061 // Intentionally return only the first error for simplicity
1062 err = <-errChan
1063 return
1064}
1065
1066func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
1067 var response *OffsetFetchResponse
1068 request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions)
1069 err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
1070 defer func() {
1071 if err != nil && isRetriableGroupCoordinatorError(err) {
1072 _ = ca.client.RefreshCoordinator(group)
1073 }
1074 }()
1075
1076 coordinator, err := ca.client.Coordinator(group)
1077 if err != nil {
1078 return err
1079 }
1080
1081 response, err = coordinator.FetchOffset(request)
1082 if err != nil {
1083 return err
1084 }
1085 if !errors.Is(response.Err, ErrNoError) {
1086 return response.Err
1087 }
1088
1089 return nil
1090 })
1091
1092 return response, err
1093}
1094
1095func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error {
1096 var response *DeleteOffsetsResponse
1097 request := &DeleteOffsetsRequest{
1098 Group: group,
1099 partitions: map[string][]int32{
1100 topic: {partition},
1101 },
1102 }
1103
1104 return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
1105 defer func() {
1106 if err != nil && isRetriableGroupCoordinatorError(err) {
1107 _ = ca.client.RefreshCoordinator(group)
1108 }
1109 }()
1110
1111 coordinator, err := ca.client.Coordinator(group)
1112 if err != nil {
1113 return err
1114 }
1115
1116 response, err = coordinator.DeleteOffsets(request)
1117 if err != nil {
1118 return err
1119 }
1120 if !errors.Is(response.ErrorCode, ErrNoError) {
1121 return response.ErrorCode
1122 }
1123 if !errors.Is(response.Errors[topic][partition], ErrNoError) {
1124 return response.Errors[topic][partition]
1125 }
1126
1127 return nil
1128 })
1129}
1130
1131func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
1132 var response *DeleteGroupsResponse
1133 request := &DeleteGroupsRequest{
1134 Groups: []string{group},
1135 }
1136
1137 if ca.conf.Version.IsAtLeast(V2_4_0_0) {
1138 request.Version = 2
1139 } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
1140 request.Version = 1
1141 }
1142
1143 return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
1144 defer func() {
1145 if err != nil && isRetriableGroupCoordinatorError(err) {
1146 _ = ca.client.RefreshCoordinator(group)
1147 }
1148 }()
1149
1150 coordinator, err := ca.client.Coordinator(group)
1151 if err != nil {
1152 return err
1153 }
1154
1155 response, err = coordinator.DeleteGroups(request)
1156 if err != nil {
1157 return err
1158 }
1159
1160 groupErr, ok := response.GroupErrorCodes[group]
1161 if !ok {
1162 return ErrIncompleteResponse
1163 }
1164
1165 if !errors.Is(groupErr, ErrNoError) {
1166 return groupErr
1167 }
1168
1169 return nil
1170 })
1171}
1172
1173func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
1174 type result struct {
1175 id int32
1176 logdirs []DescribeLogDirsResponseDirMetadata
1177 }
1178 // Query brokers in parallel, since we may have to query multiple brokers
1179 logDirsResults := make(chan result, len(brokerIds))
1180 errChan := make(chan error, len(brokerIds))
1181 wg := sync.WaitGroup{}
1182
1183 for _, b := range brokerIds {
1184 broker, err := ca.findBroker(b)
1185 if err != nil {
1186 Logger.Printf("Unable to find broker with ID = %v\n", b)
1187 continue
1188 }
1189 wg.Add(1)
1190 go func(b *Broker, conf *Config) {
1191 defer wg.Done()
1192 _ = b.Open(conf) // Ensure that broker is opened
1193
1194 request := &DescribeLogDirsRequest{}
1195 if ca.conf.Version.IsAtLeast(V3_3_0_0) {
1196 request.Version = 4
1197 } else if ca.conf.Version.IsAtLeast(V3_2_0_0) {
1198 request.Version = 3
1199 } else if ca.conf.Version.IsAtLeast(V2_6_0_0) {
1200 request.Version = 2
1201 } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
1202 request.Version = 1
1203 }
1204 response, err := b.DescribeLogDirs(request)
1205 if err != nil {
1206 errChan <- err
1207 return
1208 }
1209 if !errors.Is(response.ErrorCode, ErrNoError) {
1210 errChan <- response.ErrorCode
1211 return
1212 }
1213 logDirsResults <- result{id: b.ID(), logdirs: response.LogDirs}
1214 }(broker, ca.conf)
1215 }
1216
1217 wg.Wait()
1218 close(logDirsResults)
1219 close(errChan)
1220
1221 allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
1222 for logDirsResult := range logDirsResults {
1223 allLogDirs[logDirsResult.id] = logDirsResult.logdirs
1224 }
1225
1226 // Intentionally return only the first error for simplicity
1227 err = <-errChan
1228 return
1229}
1230
1231func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
1232 req := &DescribeUserScramCredentialsRequest{}
1233 for _, u := range users {
1234 req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
1235 Name: u,
1236 })
1237 }
1238
1239 b, err := ca.Controller()
1240 if err != nil {
1241 return nil, err
1242 }
1243
1244 rsp, err := b.DescribeUserScramCredentials(req)
1245 if err != nil {
1246 return nil, err
1247 }
1248
1249 return rsp.Results, nil
1250}
1251
1252func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
1253 res, err := ca.AlterUserScramCredentials(upsert, nil)
1254 if err != nil {
1255 return nil, err
1256 }
1257
1258 return res, nil
1259}
1260
1261func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
1262 res, err := ca.AlterUserScramCredentials(nil, delete)
1263 if err != nil {
1264 return nil, err
1265 }
1266
1267 return res, nil
1268}
1269
1270func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
1271 req := &AlterUserScramCredentialsRequest{
1272 Deletions: d,
1273 Upsertions: u,
1274 }
1275
1276 var rsp *AlterUserScramCredentialsResponse
1277 err := ca.retryOnError(isRetriableControllerError, func() error {
1278 b, err := ca.Controller()
1279 if err != nil {
1280 return err
1281 }
1282
1283 rsp, err = b.AlterUserScramCredentials(req)
1284 return err
1285 })
1286 if err != nil {
1287 return nil, err
1288 }
1289
1290 return rsp.Results, nil
1291}
1292
1293// Describe All : use an empty/nil components slice + strict = false
1294// Contains components: strict = false
1295// Contains only components: strict = true
1296func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) {
1297 request := NewDescribeClientQuotasRequest(
1298 ca.conf.Version,
1299 components,
1300 strict,
1301 )
1302
1303 b, err := ca.Controller()
1304 if err != nil {
1305 return nil, err
1306 }
1307
1308 rsp, err := b.DescribeClientQuotas(request)
1309 if err != nil {
1310 return nil, err
1311 }
1312
1313 if rsp.ErrorMsg != nil && len(*rsp.ErrorMsg) > 0 {
1314 return nil, errors.New(*rsp.ErrorMsg)
1315 }
1316 if !errors.Is(rsp.ErrorCode, ErrNoError) {
1317 return nil, rsp.ErrorCode
1318 }
1319
1320 return rsp.Entries, nil
1321}
1322
1323func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error {
1324 entry := AlterClientQuotasEntry{
1325 Entity: entity,
1326 Ops: []ClientQuotasOp{op},
1327 }
1328
1329 request := &AlterClientQuotasRequest{
1330 Entries: []AlterClientQuotasEntry{entry},
1331 ValidateOnly: validateOnly,
1332 }
1333
1334 b, err := ca.Controller()
1335 if err != nil {
1336 return err
1337 }
1338
1339 rsp, err := b.AlterClientQuotas(request)
1340 if err != nil {
1341 return err
1342 }
1343
1344 for _, entry := range rsp.Entries {
1345 if entry.ErrorMsg != nil && len(*entry.ErrorMsg) > 0 {
1346 return errors.New(*entry.ErrorMsg)
1347 }
1348 if !errors.Is(entry.ErrorCode, ErrNoError) {
1349 return entry.ErrorCode
1350 }
1351 }
1352
1353 return nil
1354}
1355
1356func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
1357 if !ca.conf.Version.IsAtLeast(V2_4_0_0) {
1358 return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0")
1359 }
1360 var response *LeaveGroupResponse
1361 request := &LeaveGroupRequest{
1362 Version: 3,
1363 GroupId: group,
1364 }
1365 for _, instanceId := range groupInstanceIds {
1366 groupInstanceId := instanceId
1367 request.Members = append(request.Members, MemberIdentity{
1368 GroupInstanceId: &groupInstanceId,
1369 })
1370 }
1371 err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
1372 defer func() {
1373 if err != nil && isRetriableGroupCoordinatorError(err) {
1374 _ = ca.client.RefreshCoordinator(group)
1375 }
1376 }()
1377
1378 coordinator, err := ca.client.Coordinator(group)
1379 if err != nil {
1380 return err
1381 }
1382
1383 response, err = coordinator.LeaveGroup(request)
1384 if err != nil {
1385 return err
1386 }
1387 if !errors.Is(response.Err, ErrNoError) {
1388 return response.Err
1389 }
1390
1391 return nil
1392 })
1393
1394 return response, err
1395}