blob: 0dc29e225437bdff057a386f658ee55ae9438192 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "context"
5 "errors"
6 "math"
7 "math/rand"
8 "net"
9 "slices"
10 "sort"
11 "strings"
12 "sync"
13 "sync/atomic"
14 "time"
15
16 "golang.org/x/net/proxy"
17)
18
19// Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
20// You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
21// automatically when it passes out of scope. It is safe to share a client amongst many
22// users, however Kafka will process requests from a single client strictly in serial,
23// so it is generally more efficient to use the default one client per producer/consumer.
24type Client interface {
25 // Config returns the Config struct of the client. This struct should not be
26 // altered after it has been created.
27 Config() *Config
28
29 // Controller returns the cluster controller broker. It will return a
30 // locally cached value if it's available. You can call RefreshController
31 // to update the cached value. Requires Kafka 0.10 or higher.
32 Controller() (*Broker, error)
33
34 // RefreshController retrieves the cluster controller from fresh metadata
35 // and stores it in the local cache. Requires Kafka 0.10 or higher.
36 RefreshController() (*Broker, error)
37
38 // Brokers returns the current set of active brokers as retrieved from cluster metadata.
39 Brokers() []*Broker
40
41 // Broker returns the active Broker if available for the broker ID.
42 Broker(brokerID int32) (*Broker, error)
43
44 // Topics returns the set of available topics as retrieved from cluster metadata.
45 Topics() ([]string, error)
46
47 // Partitions returns the sorted list of all partition IDs for the given topic.
48 Partitions(topic string) ([]int32, error)
49
50 // WritablePartitions returns the sorted list of all writable partition IDs for
51 // the given topic, where "writable" means "having a valid leader accepting
52 // writes".
53 WritablePartitions(topic string) ([]int32, error)
54
55 // Leader returns the broker object that is the leader of the current
56 // topic/partition, as determined by querying the cluster metadata.
57 Leader(topic string, partitionID int32) (*Broker, error)
58
59 // LeaderAndEpoch returns the leader and its epoch for the current
60 // topic/partition, as determined by querying the cluster metadata.
61 LeaderAndEpoch(topic string, partitionID int32) (*Broker, int32, error)
62
63 // Replicas returns the set of all replica IDs for the given partition.
64 Replicas(topic string, partitionID int32) ([]int32, error)
65
66 // InSyncReplicas returns the set of all in-sync replica IDs for the given
67 // partition. In-sync replicas are replicas which are fully caught up with
68 // the partition leader.
69 InSyncReplicas(topic string, partitionID int32) ([]int32, error)
70
71 // OfflineReplicas returns the set of all offline replica IDs for the given
72 // partition. Offline replicas are replicas which are offline
73 OfflineReplicas(topic string, partitionID int32) ([]int32, error)
74
75 // RefreshBrokers takes a list of addresses to be used as seed brokers.
76 // Existing broker connections are closed and the updated list of seed brokers
77 // will be used for the next metadata fetch.
78 RefreshBrokers(addrs []string) error
79
80 // RefreshMetadata takes a list of topics and queries the cluster to refresh the
81 // available metadata for those topics. If no topics are provided, it will refresh
82 // metadata for all topics.
83 RefreshMetadata(topics ...string) error
84
85 // GetOffset queries the cluster to get the most recent available offset at the
86 // given time (in milliseconds) on the topic/partition combination.
87 // Time should be OffsetOldest for the earliest available offset,
88 // OffsetNewest for the offset of the message that will be produced next, or a time.
89 GetOffset(topic string, partitionID int32, time int64) (int64, error)
90
91 // Coordinator returns the coordinating broker for a consumer group. It will
92 // return a locally cached value if it's available. You can call
93 // RefreshCoordinator to update the cached value. This function only works on
94 // Kafka 0.8.2 and higher.
95 Coordinator(consumerGroup string) (*Broker, error)
96
97 // RefreshCoordinator retrieves the coordinator for a consumer group and stores it
98 // in local cache. This function only works on Kafka 0.8.2 and higher.
99 RefreshCoordinator(consumerGroup string) error
100
101 // TransactionCoordinator returns the coordinating broker for a transaction id. It will
102 // return a locally cached value if it's available. You can call
103 // RefreshCoordinator to update the cached value. This function only works on
104 // Kafka 0.11.0.0 and higher.
105 TransactionCoordinator(transactionID string) (*Broker, error)
106
107 // RefreshTransactionCoordinator retrieves the coordinator for a transaction id and stores it
108 // in local cache. This function only works on Kafka 0.11.0.0 and higher.
109 RefreshTransactionCoordinator(transactionID string) error
110
111 // InitProducerID retrieves information required for Idempotent Producer
112 InitProducerID() (*InitProducerIDResponse, error)
113
114 // LeastLoadedBroker retrieves broker that has the least responses pending
115 LeastLoadedBroker() *Broker
116
117 // PartitionNotReadable checks if partition is not readable
118 PartitionNotReadable(topic string, partition int32) bool
119
120 // Close shuts down all broker connections managed by this client. It is required
121 // to call this function before a client object passes out of scope, as it will
122 // otherwise leak memory. You must close any Producers or Consumers using a client
123 // before you close the client.
124 Close() error
125
126 // Closed returns true if the client has already had Close called on it
127 Closed() bool
128}
129
130const (
131 // OffsetNewest stands for the log head offset, i.e. the offset that will be
132 // assigned to the next message that will be produced to the partition. You
133 // can send this to a client's GetOffset method to get this offset, or when
134 // calling ConsumePartition to start consuming new messages.
135 OffsetNewest int64 = -1
136 // OffsetOldest stands for the oldest offset available on the broker for a
137 // partition. You can send this to a client's GetOffset method to get this
138 // offset, or when calling ConsumePartition to start consuming from the
139 // oldest offset that is still available on the broker.
140 OffsetOldest int64 = -2
141)
142
143type client struct {
144 // updateMetadataMs stores the time at which metadata was lasted updated.
145 // Note: this accessed atomically so must be the first word in the struct
146 // as per golang/go#41970
147 updateMetadataMs atomic.Int64
148
149 conf *Config
150 closer, closed chan none // for shutting down background metadata updater
151
152 // the broker addresses given to us through the constructor are not guaranteed to be returned in
153 // the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
154 // so we store them separately
155 seedBrokers []*Broker
156 deadSeeds []*Broker
157
158 controllerID int32 // cluster controller broker id
159 brokers map[int32]*Broker // maps broker ids to brokers
160 metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
161 metadataTopics map[string]none // topics that need to collect metadata
162 coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs
163 transactionCoordinators map[string]int32 // Maps transaction ids to coordinating broker IDs
164
165 // If the number of partitions is large, we can get some churn calling cachedPartitions,
166 // so the result is cached. It is important to update this value whenever metadata is changed
167 cachedPartitionsResults map[string][maxPartitionIndex][]int32
168
169 lock sync.RWMutex // protects access to the maps that hold cluster state.
170
171 metadataRefresh metadataRefresh
172}
173
174// NewClient creates a new Client. It connects to one of the given broker addresses
175// and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
176// be retrieved from any of the given broker addresses, the client is not created.
177func NewClient(addrs []string, conf *Config) (Client, error) {
178 DebugLogger.Println("Initializing new client")
179
180 if conf == nil {
181 conf = NewConfig()
182 }
183
184 if err := conf.Validate(); err != nil {
185 return nil, err
186 }
187
188 if len(addrs) < 1 {
189 return nil, ConfigurationError("You must provide at least one broker address")
190 }
191
192 if strings.Contains(addrs[0], ".servicebus.windows.net") {
193 if conf.Version.IsAtLeast(V1_1_0_0) || !conf.Version.IsAtLeast(V0_11_0_0) {
194 Logger.Println("Connecting to Azure Event Hubs, forcing version to V1_0_0_0 for compatibility")
195 conf.Version = V1_0_0_0
196 }
197 }
198 client := &client{
199 conf: conf,
200 closer: make(chan none),
201 closed: make(chan none),
202 brokers: make(map[int32]*Broker),
203 metadata: make(map[string]map[int32]*PartitionMetadata),
204 metadataTopics: make(map[string]none),
205 cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
206 coordinators: make(map[string]int32),
207 transactionCoordinators: make(map[string]int32),
208 }
209 refresh := func(topics []string) error {
210 deadline := time.Time{}
211 if client.conf.Metadata.Timeout > 0 {
212 deadline = time.Now().Add(client.conf.Metadata.Timeout)
213 }
214 return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
215 }
216 if conf.Metadata.SingleFlight {
217 client.metadataRefresh = newSingleFlightRefresher(refresh)
218 } else {
219 client.metadataRefresh = refresh
220 }
221
222 if conf.Net.ResolveCanonicalBootstrapServers {
223 var err error
224 addrs, err = client.resolveCanonicalNames(addrs)
225 if err != nil {
226 return nil, err
227 }
228 }
229
230 client.randomizeSeedBrokers(addrs)
231
232 if conf.Metadata.Full {
233 // do an initial fetch of all cluster metadata by specifying an empty list of topics
234 err := client.RefreshMetadata()
235 if err == nil {
236 } else if errors.Is(err, ErrLeaderNotAvailable) || errors.Is(err, ErrReplicaNotAvailable) || errors.Is(err, ErrTopicAuthorizationFailed) || errors.Is(err, ErrClusterAuthorizationFailed) {
237 // indicates that maybe part of the cluster is down, but is not fatal to creating the client
238 Logger.Println(err)
239 } else {
240 close(client.closed) // we haven't started the background updater yet, so we have to do this manually
241 _ = client.Close()
242 return nil, err
243 }
244 }
245 go withRecover(client.backgroundMetadataUpdater)
246
247 DebugLogger.Println("Successfully initialized new client")
248
249 return client, nil
250}
251
252func (client *client) Config() *Config {
253 return client.conf
254}
255
256func (client *client) Brokers() []*Broker {
257 client.lock.RLock()
258 defer client.lock.RUnlock()
259 brokers := make([]*Broker, 0, len(client.brokers))
260 for _, broker := range client.brokers {
261 brokers = append(brokers, broker)
262 }
263 return brokers
264}
265
266func (client *client) Broker(brokerID int32) (*Broker, error) {
267 client.lock.RLock()
268 defer client.lock.RUnlock()
269 broker, ok := client.brokers[brokerID]
270 if !ok {
271 return nil, ErrBrokerNotFound
272 }
273 _ = broker.Open(client.conf)
274 return broker, nil
275}
276
277func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
278 // FIXME: this InitProducerID seems to only be called from client_test.go (TestInitProducerIDConnectionRefused) and has been superceded by transaction_manager.go?
279 brokerErrors := make([]error, 0)
280 for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
281 request := &InitProducerIDRequest{}
282
283 if client.conf.Version.IsAtLeast(V2_7_0_0) {
284 // Version 4 adds the support for new error code PRODUCER_FENCED.
285 request.Version = 4
286 } else if client.conf.Version.IsAtLeast(V2_5_0_0) {
287 // Version 3 adds ProducerId and ProducerEpoch, allowing producers to try to resume after an INVALID_PRODUCER_EPOCH error
288 request.Version = 3
289 } else if client.conf.Version.IsAtLeast(V2_4_0_0) {
290 // Version 2 is the first flexible version.
291 request.Version = 2
292 } else if client.conf.Version.IsAtLeast(V2_0_0_0) {
293 // Version 1 is the same as version 0.
294 request.Version = 1
295 }
296
297 response, err := broker.InitProducerID(request)
298 if err == nil {
299 return response, nil
300 } else {
301 // some error, remove that broker and try again
302 Logger.Printf("Client got error from broker %d when issuing InitProducerID : %v\n", broker.ID(), err)
303 _ = broker.Close()
304 brokerErrors = append(brokerErrors, err)
305 client.deregisterBroker(broker)
306 }
307 }
308
309 return nil, Wrap(ErrOutOfBrokers, brokerErrors...)
310}
311
312func (client *client) Close() error {
313 if client.Closed() {
314 // Chances are this is being called from a defer() and the error will go unobserved
315 // so we go ahead and log the event in this case.
316 Logger.Printf("Close() called on already closed client")
317 return ErrClosedClient
318 }
319
320 // shutdown and wait for the background thread before we take the lock, to avoid races
321 close(client.closer)
322 <-client.closed
323
324 client.lock.Lock()
325 defer client.lock.Unlock()
326 DebugLogger.Println("Closing Client")
327
328 for _, broker := range client.brokers {
329 safeAsyncClose(broker)
330 }
331
332 for _, broker := range client.seedBrokers {
333 safeAsyncClose(broker)
334 }
335
336 client.brokers = nil
337 client.metadata = nil
338 client.metadataTopics = nil
339
340 return nil
341}
342
343func (client *client) Closed() bool {
344 client.lock.RLock()
345 defer client.lock.RUnlock()
346
347 return client.brokers == nil
348}
349
350func (client *client) Topics() ([]string, error) {
351 if client.Closed() {
352 return nil, ErrClosedClient
353 }
354
355 client.lock.RLock()
356 defer client.lock.RUnlock()
357
358 ret := make([]string, 0, len(client.metadata))
359 for topic := range client.metadata {
360 ret = append(ret, topic)
361 }
362
363 return ret, nil
364}
365
366func (client *client) MetadataTopics() ([]string, error) {
367 if client.Closed() {
368 return nil, ErrClosedClient
369 }
370
371 client.lock.RLock()
372 defer client.lock.RUnlock()
373
374 ret := make([]string, 0, len(client.metadataTopics))
375 for topic := range client.metadataTopics {
376 ret = append(ret, topic)
377 }
378
379 return ret, nil
380}
381
382func (client *client) Partitions(topic string) ([]int32, error) {
383 return client.getPartitions(topic, allPartitions)
384}
385
386func (client *client) WritablePartitions(topic string) ([]int32, error) {
387 return client.getPartitions(topic, writablePartitions)
388}
389
390func (client *client) getPartitions(topic string, pt partitionType) ([]int32, error) {
391 if client.Closed() {
392 return nil, ErrClosedClient
393 }
394
395 partitions := client.cachedPartitions(topic, pt)
396
397 // len==0 catches when it's nil (no such topic) and the odd case when every single
398 // partition is undergoing leader election simultaneously. Callers have to be able to handle
399 // this function returning an empty slice (which is a valid return value) but catching it
400 // here the first time (note we *don't* catch it below where we return ErrUnknownTopicOrPartition) triggers
401 // a metadata refresh as a nicety so callers can just try again and don't have to manually
402 // trigger a refresh (otherwise they'd just keep getting a stale cached copy).
403 if len(partitions) == 0 {
404 err := client.RefreshMetadata(topic)
405 if err != nil {
406 return nil, err
407 }
408 partitions = client.cachedPartitions(topic, pt)
409 }
410
411 if partitions == nil {
412 return nil, ErrUnknownTopicOrPartition
413 }
414
415 return partitions, nil
416}
417
418func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) {
419 return client.getReplicas(topic, partitionID, func(metadata *PartitionMetadata) []int32 {
420 return metadata.Replicas
421 })
422}
423
424func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) {
425 return client.getReplicas(topic, partitionID, func(metadata *PartitionMetadata) []int32 {
426 return metadata.Isr
427 })
428}
429
430func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) {
431 return client.getReplicas(topic, partitionID, func(metadata *PartitionMetadata) []int32 {
432 return metadata.OfflineReplicas
433 })
434}
435
436func (client *client) getReplicas(topic string, partitionID int32, extractor func(metadata *PartitionMetadata) []int32) ([]int32, error) {
437 if client.Closed() {
438 return nil, ErrClosedClient
439 }
440
441 metadata := client.cachedMetadata(topic, partitionID)
442
443 if metadata == nil {
444 err := client.RefreshMetadata(topic)
445 if err != nil {
446 return nil, err
447 }
448 metadata = client.cachedMetadata(topic, partitionID)
449 }
450
451 if metadata == nil {
452 return nil, ErrUnknownTopicOrPartition
453 }
454
455 replicas := extractor(metadata)
456 if errors.Is(metadata.Err, ErrReplicaNotAvailable) {
457 return dupInt32Slice(replicas), metadata.Err
458 }
459 return dupInt32Slice(replicas), nil
460}
461
462func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
463 leader, _, err := client.LeaderAndEpoch(topic, partitionID)
464 return leader, err
465}
466
467func (client *client) LeaderAndEpoch(topic string, partitionID int32) (*Broker, int32, error) {
468 if client.Closed() {
469 return nil, -1, ErrClosedClient
470 }
471
472 leader, epoch, err := client.cachedLeader(topic, partitionID)
473 if leader == nil {
474 err = client.RefreshMetadata(topic)
475 if err != nil {
476 return nil, -1, err
477 }
478 leader, epoch, err = client.cachedLeader(topic, partitionID)
479 }
480
481 return leader, epoch, err
482}
483
484func (client *client) RefreshBrokers(addrs []string) error {
485 if client.Closed() {
486 return ErrClosedClient
487 }
488
489 client.lock.Lock()
490 defer client.lock.Unlock()
491
492 for _, broker := range client.brokers {
493 safeAsyncClose(broker)
494 }
495 client.brokers = make(map[int32]*Broker)
496
497 for _, broker := range client.seedBrokers {
498 safeAsyncClose(broker)
499 }
500
501 for _, broker := range client.deadSeeds {
502 safeAsyncClose(broker)
503 }
504
505 client.seedBrokers = nil
506 client.deadSeeds = nil
507
508 client.randomizeSeedBrokers(addrs)
509
510 return nil
511}
512
513func (client *client) RefreshMetadata(topics ...string) error {
514 if client.Closed() {
515 return ErrClosedClient
516 }
517
518 // Prior to 0.8.2, Kafka will throw exceptions on an empty topic and not return a proper
519 // error. This handles the case by returning an error instead of sending it
520 // off to Kafka. See: https://github.com/IBM/sarama/pull/38#issuecomment-26362310
521 if slices.Contains(topics, "") {
522 return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
523 }
524 return client.metadataRefresh(topics)
525}
526
527func (client *client) GetOffset(topic string, partitionID int32, timestamp int64) (int64, error) {
528 if client.Closed() {
529 return -1, ErrClosedClient
530 }
531
532 offset, err := client.getOffset(topic, partitionID, timestamp)
533 if err != nil {
534 if err := client.RefreshMetadata(topic); err != nil {
535 return -1, err
536 }
537 return client.getOffset(topic, partitionID, timestamp)
538 }
539
540 return offset, err
541}
542
543func (client *client) Controller() (*Broker, error) {
544 if client.Closed() {
545 return nil, ErrClosedClient
546 }
547
548 if !client.conf.Version.IsAtLeast(V0_10_0_0) {
549 return nil, ErrUnsupportedVersion
550 }
551
552 controller := client.cachedController()
553 if controller == nil {
554 if err := client.refreshMetadata(); err != nil {
555 return nil, err
556 }
557 controller = client.cachedController()
558 }
559
560 if controller == nil {
561 return nil, ErrControllerNotAvailable
562 }
563
564 _ = controller.Open(client.conf)
565 return controller, nil
566}
567
568// deregisterController removes the cached controllerID
569func (client *client) deregisterController() {
570 client.lock.Lock()
571 defer client.lock.Unlock()
572 if controller, ok := client.brokers[client.controllerID]; ok {
573 _ = controller.Close()
574 delete(client.brokers, client.controllerID)
575 }
576}
577
578// RefreshController retrieves the cluster controller from fresh metadata
579// and stores it in the local cache. Requires Kafka 0.10 or higher.
580func (client *client) RefreshController() (*Broker, error) {
581 if client.Closed() {
582 return nil, ErrClosedClient
583 }
584
585 client.deregisterController()
586
587 if err := client.refreshMetadata(); err != nil {
588 return nil, err
589 }
590
591 controller := client.cachedController()
592 if controller == nil {
593 return nil, ErrControllerNotAvailable
594 }
595
596 _ = controller.Open(client.conf)
597 return controller, nil
598}
599
600func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
601 if client.Closed() {
602 return nil, ErrClosedClient
603 }
604
605 coordinator := client.cachedCoordinator(consumerGroup)
606
607 if coordinator == nil {
608 if err := client.RefreshCoordinator(consumerGroup); err != nil {
609 return nil, err
610 }
611 coordinator = client.cachedCoordinator(consumerGroup)
612 }
613
614 if coordinator == nil {
615 return nil, ErrConsumerCoordinatorNotAvailable
616 }
617
618 _ = coordinator.Open(client.conf)
619 return coordinator, nil
620}
621
622func (client *client) RefreshCoordinator(consumerGroup string) error {
623 if client.Closed() {
624 return ErrClosedClient
625 }
626
627 response, err := client.findCoordinator(consumerGroup, CoordinatorGroup, client.conf.Metadata.Retry.Max)
628 if err != nil {
629 return err
630 }
631
632 client.lock.Lock()
633 defer client.lock.Unlock()
634 client.registerBroker(response.Coordinator)
635 client.coordinators[consumerGroup] = response.Coordinator.ID()
636 return nil
637}
638
639func (client *client) TransactionCoordinator(transactionID string) (*Broker, error) {
640 if client.Closed() {
641 return nil, ErrClosedClient
642 }
643
644 coordinator := client.cachedTransactionCoordinator(transactionID)
645
646 if coordinator == nil {
647 if err := client.RefreshTransactionCoordinator(transactionID); err != nil {
648 return nil, err
649 }
650 coordinator = client.cachedTransactionCoordinator(transactionID)
651 }
652
653 if coordinator == nil {
654 return nil, ErrConsumerCoordinatorNotAvailable
655 }
656
657 _ = coordinator.Open(client.conf)
658 return coordinator, nil
659}
660
661func (client *client) RefreshTransactionCoordinator(transactionID string) error {
662 if client.Closed() {
663 return ErrClosedClient
664 }
665
666 response, err := client.findCoordinator(transactionID, CoordinatorTransaction, client.conf.Metadata.Retry.Max)
667 if err != nil {
668 return err
669 }
670
671 client.lock.Lock()
672 defer client.lock.Unlock()
673 client.registerBroker(response.Coordinator)
674 client.transactionCoordinators[transactionID] = response.Coordinator.ID()
675 return nil
676}
677
678// private broker management helpers
679
680func (client *client) randomizeSeedBrokers(addrs []string) {
681 random := rand.New(rand.NewSource(time.Now().UnixNano()))
682 for _, index := range random.Perm(len(addrs)) {
683 client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
684 }
685}
686
687func (client *client) updateBroker(brokers []*Broker) {
688 currentBroker := make(map[int32]*Broker, len(brokers))
689
690 for _, broker := range brokers {
691 currentBroker[broker.ID()] = broker
692 if client.brokers[broker.ID()] == nil { // add new broker
693 client.brokers[broker.ID()] = broker
694 DebugLogger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
695 } else if broker.Addr() != client.brokers[broker.ID()].Addr() { // replace broker with new address
696 safeAsyncClose(client.brokers[broker.ID()])
697 client.brokers[broker.ID()] = broker
698 Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
699 }
700 }
701
702 for id, broker := range client.brokers {
703 if _, exist := currentBroker[id]; !exist { // remove old broker
704 safeAsyncClose(broker)
705 delete(client.brokers, id)
706 Logger.Printf("client/broker remove invalid broker #%d with %s", broker.ID(), broker.Addr())
707 }
708 }
709}
710
711// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
712// in the brokers map. It returns the broker that is registered, which may be the provided broker,
713// or a previously registered Broker instance. You must hold the write lock before calling this function.
714func (client *client) registerBroker(broker *Broker) {
715 if client.brokers == nil {
716 Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr())
717 return
718 }
719
720 if client.brokers[broker.ID()] == nil {
721 client.brokers[broker.ID()] = broker
722 DebugLogger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
723 } else if broker.Addr() != client.brokers[broker.ID()].Addr() {
724 safeAsyncClose(client.brokers[broker.ID()])
725 client.brokers[broker.ID()] = broker
726 Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
727 }
728}
729
730// deregisterBroker removes a broker from the broker list, and if it's
731// not in the broker list, removes it from seedBrokers.
732func (client *client) deregisterBroker(broker *Broker) {
733 client.lock.Lock()
734 defer client.lock.Unlock()
735
736 _, ok := client.brokers[broker.ID()]
737 if ok {
738 Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
739 delete(client.brokers, broker.ID())
740 return
741 }
742 if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] {
743 client.deadSeeds = append(client.deadSeeds, broker)
744 client.seedBrokers = client.seedBrokers[1:]
745 }
746}
747
748func (client *client) resurrectDeadBrokers() {
749 client.lock.Lock()
750 defer client.lock.Unlock()
751
752 Logger.Printf("client/brokers resurrecting %d dead seed brokers", len(client.deadSeeds))
753 client.seedBrokers = append(client.seedBrokers, client.deadSeeds...)
754 client.deadSeeds = nil
755}
756
757// LeastLoadedBroker returns the broker with the least pending requests.
758// Firstly, choose the broker from cached broker list. If the broker list is empty, choose from seed brokers.
759func (client *client) LeastLoadedBroker() *Broker {
760 client.lock.RLock()
761 defer client.lock.RUnlock()
762
763 var leastLoadedBroker *Broker
764 pendingRequests := math.MaxInt
765 for _, broker := range client.brokers {
766 if pendingRequests > broker.ResponseSize() {
767 pendingRequests = broker.ResponseSize()
768 leastLoadedBroker = broker
769 }
770 }
771 if leastLoadedBroker != nil {
772 _ = leastLoadedBroker.Open(client.conf)
773 return leastLoadedBroker
774 }
775
776 if len(client.seedBrokers) > 0 {
777 _ = client.seedBrokers[0].Open(client.conf)
778 return client.seedBrokers[0]
779 }
780
781 return leastLoadedBroker
782}
783
784// private caching/lazy metadata helpers
785
786type partitionType int
787
788const (
789 allPartitions partitionType = iota
790 writablePartitions
791 // If you add any more types, update the partition cache in update()
792
793 // Ensure this is the last partition type value
794 maxPartitionIndex
795)
796
797func (client *client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
798 client.lock.RLock()
799 defer client.lock.RUnlock()
800
801 partitions := client.metadata[topic]
802 if partitions != nil {
803 return partitions[partitionID]
804 }
805
806 return nil
807}
808
809func (client *client) cachedPartitions(topic string, partitionSet partitionType) []int32 {
810 client.lock.RLock()
811 defer client.lock.RUnlock()
812
813 partitions, exists := client.cachedPartitionsResults[topic]
814
815 if !exists {
816 return nil
817 }
818 return partitions[partitionSet]
819}
820
821func (client *client) setPartitionCache(topic string, partitionSet partitionType) []int32 {
822 partitions := client.metadata[topic]
823
824 if partitions == nil {
825 return nil
826 }
827
828 ret := make([]int32, 0, len(partitions))
829 for _, partition := range partitions {
830 if partitionSet == writablePartitions && errors.Is(partition.Err, ErrLeaderNotAvailable) {
831 continue
832 }
833 ret = append(ret, partition.ID)
834 }
835
836 sort.Sort(int32Slice(ret))
837 return ret
838}
839
840func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, int32, error) {
841 client.lock.RLock()
842 defer client.lock.RUnlock()
843
844 partitions := client.metadata[topic]
845 if partitions != nil {
846 metadata, ok := partitions[partitionID]
847 if ok {
848 if errors.Is(metadata.Err, ErrLeaderNotAvailable) {
849 return nil, -1, ErrLeaderNotAvailable
850 }
851 b := client.brokers[metadata.Leader]
852 if b == nil {
853 return nil, -1, ErrLeaderNotAvailable
854 }
855 _ = b.Open(client.conf)
856 return b, metadata.LeaderEpoch, nil
857 }
858 }
859
860 return nil, -1, ErrUnknownTopicOrPartition
861}
862
863func (client *client) getOffset(topic string, partitionID int32, timestamp int64) (int64, error) {
864 broker, err := client.Leader(topic, partitionID)
865 if err != nil {
866 return -1, err
867 }
868
869 request := NewOffsetRequest(client.conf.Version)
870 request.AddBlock(topic, partitionID, timestamp, 1)
871
872 response, err := broker.GetAvailableOffsets(request)
873 if err != nil {
874 _ = broker.Close()
875 return -1, err
876 }
877
878 block := response.GetBlock(topic, partitionID)
879 if block == nil {
880 _ = broker.Close()
881 return -1, ErrIncompleteResponse
882 }
883 if !errors.Is(block.Err, ErrNoError) {
884 return -1, block.Err
885 }
886 if len(block.Offsets) != 1 {
887 return -1, ErrOffsetOutOfRange
888 }
889
890 return block.Offsets[0], nil
891}
892
893// core metadata update logic
894
895func (client *client) backgroundMetadataUpdater() {
896 defer close(client.closed)
897
898 if client.conf.Metadata.RefreshFrequency == time.Duration(0) {
899 return
900 }
901
902 ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
903 defer ticker.Stop()
904
905 for {
906 select {
907 case <-ticker.C:
908 if err := client.refreshMetadata(); err != nil {
909 Logger.Println("Client background metadata update:", err)
910 }
911 case <-client.closer:
912 return
913 }
914 }
915}
916
917func (client *client) refreshMetadata() error {
918 var topics []string
919
920 if !client.conf.Metadata.Full {
921 if specificTopics, err := client.MetadataTopics(); err != nil {
922 return err
923 } else if len(specificTopics) == 0 {
924 return ErrNoTopicsToUpdateMetadata
925 } else {
926 topics = specificTopics
927 }
928 }
929
930 if err := client.RefreshMetadata(topics...); err != nil {
931 return err
932 }
933
934 return nil
935}
936
937func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
938 pastDeadline := func(backoff time.Duration) bool {
939 if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
940 // we are past the deadline
941 return true
942 }
943 return false
944 }
945 retry := func(err error) error {
946 if attemptsRemaining > 0 {
947 backoff := client.computeBackoff(attemptsRemaining)
948 if pastDeadline(backoff) {
949 Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
950 return err
951 }
952 if backoff > 0 {
953 time.Sleep(backoff)
954 }
955
956 t := client.updateMetadataMs.Load()
957 if time.Since(time.UnixMilli(t)) < backoff {
958 return err
959 }
960 attemptsRemaining--
961 Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
962
963 return client.tryRefreshMetadata(topics, attemptsRemaining, deadline)
964 }
965 return err
966 }
967
968 broker := client.LeastLoadedBroker()
969 brokerErrors := make([]error, 0)
970 for ; broker != nil && !pastDeadline(0); broker = client.LeastLoadedBroker() {
971 allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
972 if len(topics) > 0 {
973 DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
974 } else {
975 allowAutoTopicCreation = false
976 DebugLogger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
977 }
978
979 req := NewMetadataRequest(client.conf.Version, topics)
980 req.AllowAutoTopicCreation = allowAutoTopicCreation
981 client.updateMetadataMs.Store(time.Now().UnixMilli())
982
983 response, err := broker.GetMetadata(req)
984 var kerror KError
985 var packetEncodingError PacketEncodingError
986 if err == nil {
987 // When talking to the startup phase of a broker, it is possible to receive an empty metadata set. We should remove that broker and try next broker (https://issues.apache.org/jira/browse/KAFKA-7924).
988 if len(response.Brokers) == 0 {
989 Logger.Printf("client/metadata receiving empty brokers from the metadata response when requesting the broker #%d at %s", broker.ID(), broker.addr)
990 _ = broker.Close()
991 client.deregisterBroker(broker)
992 continue
993 }
994 allKnownMetaData := len(topics) == 0
995 // valid response, use it
996 shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
997 if shouldRetry {
998 Logger.Println("client/metadata found some partitions to be leaderless")
999 return retry(err) // note: err can be nil
1000 }
1001 return err
1002 } else if errors.As(err, &packetEncodingError) {
1003 // didn't even send, return the error
1004 return err
1005 } else if errors.As(err, &kerror) {
1006 // if SASL auth error return as this _should_ be a non retryable err for all brokers
1007 if errors.Is(err, ErrSASLAuthenticationFailed) {
1008 Logger.Println("client/metadata failed SASL authentication")
1009 return err
1010 }
1011
1012 if errors.Is(err, ErrTopicAuthorizationFailed) {
1013 Logger.Println("client is not authorized to access this topic. The topics were: ", topics)
1014 return err
1015 }
1016 // else remove that broker and try again
1017 Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
1018 _ = broker.Close()
1019 client.deregisterBroker(broker)
1020 } else {
1021 // some other error, remove that broker and try again
1022 Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
1023 brokerErrors = append(brokerErrors, err)
1024 _ = broker.Close()
1025 client.deregisterBroker(broker)
1026 }
1027 }
1028
1029 error := Wrap(ErrOutOfBrokers, brokerErrors...)
1030 if broker != nil {
1031 Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
1032 return retry(error)
1033 }
1034
1035 Logger.Println("client/metadata no available broker to send metadata request to")
1036 client.resurrectDeadBrokers()
1037 return retry(error)
1038}
1039
1040// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
1041func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
1042 if client.Closed() {
1043 return
1044 }
1045
1046 client.lock.Lock()
1047 defer client.lock.Unlock()
1048
1049 // For all the brokers we received:
1050 // - if it is a new ID, save it
1051 // - if it is an existing ID, but the address we have is stale, discard the old one and save it
1052 // - if some brokers is not exist in it, remove old broker
1053 // - otherwise ignore it, replacing our existing one would just bounce the connection
1054 client.updateBroker(data.Brokers)
1055
1056 client.controllerID = data.ControllerID
1057
1058 if allKnownMetaData {
1059 client.metadata = make(map[string]map[int32]*PartitionMetadata)
1060 client.metadataTopics = make(map[string]none)
1061 client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
1062 }
1063 for _, topic := range data.Topics {
1064 // topics must be added firstly to `metadataTopics` to guarantee that all
1065 // requested topics must be recorded to keep them trackable for periodically
1066 // metadata refresh.
1067 if _, exists := client.metadataTopics[topic.Name]; !exists {
1068 client.metadataTopics[topic.Name] = none{}
1069 }
1070 delete(client.metadata, topic.Name)
1071 delete(client.cachedPartitionsResults, topic.Name)
1072
1073 switch topic.Err {
1074 case ErrNoError:
1075 // no-op
1076 case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
1077 err = topic.Err
1078 continue
1079 case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
1080 err = topic.Err
1081 retry = true
1082 continue
1083 case ErrLeaderNotAvailable: // retry, but store partial partition results
1084 retry = true
1085 default: // don't retry, don't store partial results
1086 Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err)
1087 err = topic.Err
1088 continue
1089 }
1090
1091 client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
1092 for _, partition := range topic.Partitions {
1093 client.metadata[topic.Name][partition.ID] = partition
1094 if errors.Is(partition.Err, ErrLeaderNotAvailable) {
1095 retry = true
1096 }
1097 }
1098
1099 var partitionCache [maxPartitionIndex][]int32
1100 partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions)
1101 partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions)
1102 client.cachedPartitionsResults[topic.Name] = partitionCache
1103 }
1104
1105 return
1106}
1107
1108func (client *client) cachedCoordinator(consumerGroup string) *Broker {
1109 client.lock.RLock()
1110 defer client.lock.RUnlock()
1111 if coordinatorID, ok := client.coordinators[consumerGroup]; ok {
1112 return client.brokers[coordinatorID]
1113 }
1114 return nil
1115}
1116
1117func (client *client) cachedTransactionCoordinator(transactionID string) *Broker {
1118 client.lock.RLock()
1119 defer client.lock.RUnlock()
1120 if coordinatorID, ok := client.transactionCoordinators[transactionID]; ok {
1121 return client.brokers[coordinatorID]
1122 }
1123 return nil
1124}
1125
1126func (client *client) cachedController() *Broker {
1127 client.lock.RLock()
1128 defer client.lock.RUnlock()
1129
1130 return client.brokers[client.controllerID]
1131}
1132
1133func (client *client) computeBackoff(attemptsRemaining int) time.Duration {
1134 if client.conf.Metadata.Retry.BackoffFunc != nil {
1135 maxRetries := client.conf.Metadata.Retry.Max
1136 retries := maxRetries - attemptsRemaining
1137 return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
1138 }
1139 return client.conf.Metadata.Retry.Backoff
1140}
1141
1142func (client *client) findCoordinator(coordinatorKey string, coordinatorType CoordinatorType, attemptsRemaining int) (*FindCoordinatorResponse, error) {
1143 retry := func(err error) (*FindCoordinatorResponse, error) {
1144 if attemptsRemaining > 0 {
1145 backoff := client.computeBackoff(attemptsRemaining)
1146 attemptsRemaining--
1147 Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
1148 time.Sleep(backoff)
1149 return client.findCoordinator(coordinatorKey, coordinatorType, attemptsRemaining)
1150 }
1151 return nil, err
1152 }
1153
1154 brokerErrors := make([]error, 0)
1155 for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
1156 DebugLogger.Printf("client/coordinator requesting coordinator for %s from %s\n", coordinatorKey, broker.Addr())
1157
1158 request := new(FindCoordinatorRequest)
1159 request.CoordinatorKey = coordinatorKey
1160 request.CoordinatorType = coordinatorType
1161
1162 // Version 1 adds KeyType.
1163 if client.conf.Version.IsAtLeast(V0_11_0_0) {
1164 request.Version = 1
1165 }
1166 // Version 2 is the same as version 1.
1167 if client.conf.Version.IsAtLeast(V2_0_0_0) {
1168 request.Version = 2
1169 }
1170
1171 response, err := broker.FindCoordinator(request)
1172 if err != nil {
1173 Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)
1174
1175 var packetEncodingError PacketEncodingError
1176 if errors.As(err, &packetEncodingError) {
1177 return nil, err
1178 } else {
1179 _ = broker.Close()
1180 brokerErrors = append(brokerErrors, err)
1181 client.deregisterBroker(broker)
1182 continue
1183 }
1184 }
1185
1186 if errors.Is(response.Err, ErrNoError) {
1187 DebugLogger.Printf("client/coordinator coordinator for %s is #%d (%s)\n", coordinatorKey, response.Coordinator.ID(), response.Coordinator.Addr())
1188 return response, nil
1189 } else if errors.Is(response.Err, ErrConsumerCoordinatorNotAvailable) {
1190 Logger.Printf("client/coordinator coordinator for %s is not available\n", coordinatorKey)
1191
1192 // This is very ugly, but this scenario will only happen once per cluster.
1193 // The __consumer_offsets topic only has to be created one time.
1194 // The number of partitions not configurable, but partition 0 should always exist.
1195 if _, err := client.Leader("__consumer_offsets", 0); err != nil {
1196 Logger.Printf("client/coordinator the __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n")
1197 time.Sleep(2 * time.Second)
1198 }
1199 if coordinatorType == CoordinatorTransaction {
1200 if _, err := client.Leader("__transaction_state", 0); err != nil {
1201 Logger.Printf("client/coordinator the __transaction_state topic is not initialized completely yet. Waiting 2 seconds...\n")
1202 time.Sleep(2 * time.Second)
1203 }
1204 }
1205
1206 return retry(ErrConsumerCoordinatorNotAvailable)
1207 } else if errors.Is(response.Err, ErrGroupAuthorizationFailed) {
1208 Logger.Printf("client was not authorized to access group %s while attempting to find coordinator", coordinatorKey)
1209 return retry(ErrGroupAuthorizationFailed)
1210 } else {
1211 return nil, response.Err
1212 }
1213 }
1214
1215 Logger.Println("client/coordinator no available broker to send consumer metadata request to")
1216 client.resurrectDeadBrokers()
1217 return retry(Wrap(ErrOutOfBrokers, brokerErrors...))
1218}
1219
1220func (client *client) resolveCanonicalNames(addrs []string) ([]string, error) {
1221 ctx := context.Background()
1222
1223 dialer := client.Config().getDialer()
1224 resolver := net.Resolver{
1225 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1226 // dial func should only be called once, so switching within is acceptable
1227 switch d := dialer.(type) {
1228 case proxy.ContextDialer:
1229 return d.DialContext(ctx, network, address)
1230 default:
1231 // we have no choice but to ignore the context
1232 return d.Dial(network, address)
1233 }
1234 },
1235 }
1236
1237 canonicalAddrs := make(map[string]struct{}, len(addrs)) // dedupe as we go
1238 for _, addr := range addrs {
1239 host, port, err := net.SplitHostPort(addr)
1240 if err != nil {
1241 return nil, err // message includes addr
1242 }
1243
1244 ips, err := resolver.LookupHost(ctx, host)
1245 if err != nil {
1246 return nil, err // message includes host
1247 }
1248 for _, ip := range ips {
1249 ptrs, err := resolver.LookupAddr(ctx, ip)
1250 if err != nil {
1251 return nil, err // message includes ip
1252 }
1253
1254 // unlike the Java client, we do not further check that PTRs resolve
1255 ptr := strings.TrimSuffix(ptrs[0], ".") // trailing dot breaks GSSAPI
1256 canonicalAddrs[net.JoinHostPort(ptr, port)] = struct{}{}
1257 }
1258 }
1259
1260 addrs = make([]string, 0, len(canonicalAddrs))
1261 for addr := range canonicalAddrs {
1262 addrs = append(addrs, addr)
1263 }
1264 return addrs, nil
1265}
1266
1267// nopCloserClient embeds an existing Client, but disables
1268// the Close method (yet all other methods pass
1269// through unchanged). This is for use in larger structs
1270// where it is undesirable to close the client that was
1271// passed in by the caller.
1272type nopCloserClient struct {
1273 Client
1274}
1275
1276// Close intercepts and purposely does not call the underlying
1277// client's Close() method.
1278func (ncc *nopCloserClient) Close() error {
1279 return nil
1280}
1281
1282func (client *client) PartitionNotReadable(topic string, partition int32) bool {
1283 client.lock.RLock()
1284 defer client.lock.RUnlock()
1285
1286 pm := client.metadata[topic][partition]
1287 if pm == nil {
1288 return true
1289 }
1290 return pm.Leader == -1
1291}