blob: f47eac725730f3dfb6897bcd0a0f69b16bca71be [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sort"
8 "sync"
9 "time"
10
11 "github.com/rcrowley/go-metrics"
12)
13
14// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
15var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
16
17// ConsumerGroup is responsible for dividing up processing of topics and partitions
18// over a collection of processes (the members of the consumer group).
19type ConsumerGroup interface {
20 // Consume joins a cluster of consumers for a given list of topics and
21 // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
22 //
23 // The life-cycle of a session is represented by the following steps:
24 //
25 // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
26 // and is assigned their "fair share" of partitions, aka 'claims'.
27 // 2. Before processing starts, the handler's Setup() hook is called to notify the user
28 // of the claims and allow any necessary preparation or alteration of state.
29 // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
30 // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
31 // from concurrent reads/writes.
32 // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
33 // parent context is canceled or when a server-side rebalance cycle is initiated.
34 // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
35 // to allow the user to perform any final tasks before a rebalance.
36 // 6. Finally, marked offsets are committed one last time before claims are released.
37 //
38 // Please note, that once a rebalance is triggered, sessions must be completed within
39 // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
40 // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
41 // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
42 // commit failures.
43 // This method should be called inside an infinite loop, when a
44 // server-side rebalance happens, the consumer session will need to be
45 // recreated to get the new claims.
46 Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
47
48 // Errors returns a read channel of errors that occurred during the consumer life-cycle.
49 // By default, errors are logged and not returned over this channel.
50 // If you want to implement any custom error handling, set your config's
51 // Consumer.Return.Errors setting to true, and read from this channel.
52 Errors() <-chan error
53
54 // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
55 // this function before the object passes out of scope, as it will otherwise leak memory.
56 Close() error
57
58 // Pause suspends fetching from the requested partitions. Future calls to the broker will not return any
59 // records from these partitions until they have been resumed using Resume()/ResumeAll().
60 // Note that this method does not affect partition subscription.
61 // In particular, it does not cause a group rebalance when automatic assignment is used.
62 Pause(partitions map[string][]int32)
63
64 // Resume resumes specified partitions which have been paused with Pause()/PauseAll().
65 // New calls to the broker will return records from these partitions if there are any to be fetched.
66 Resume(partitions map[string][]int32)
67
68 // Pause suspends fetching from all partitions. Future calls to the broker will not return any
69 // records from these partitions until they have been resumed using Resume()/ResumeAll().
70 // Note that this method does not affect partition subscription.
71 // In particular, it does not cause a group rebalance when automatic assignment is used.
72 PauseAll()
73
74 // Resume resumes all partitions which have been paused with Pause()/PauseAll().
75 // New calls to the broker will return records from these partitions if there are any to be fetched.
76 ResumeAll()
77}
78
79type consumerGroup struct {
80 client Client
81
82 config *Config
83 consumer Consumer
84 groupID string
85 groupInstanceId *string
86 memberID string
87 errors chan error
88
89 lock sync.Mutex
90 errorsLock sync.RWMutex
91 closed chan none
92 closeOnce sync.Once
93
94 userData []byte
95
96 metricRegistry metrics.Registry
97}
98
99// NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
100func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
101 client, err := NewClient(addrs, config)
102 if err != nil {
103 return nil, err
104 }
105
106 c, err := newConsumerGroup(groupID, client)
107 if err != nil {
108 _ = client.Close()
109 }
110 return c, err
111}
112
113// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
114// necessary to call Close() on the underlying client when shutting down this consumer.
115// PLEASE NOTE: consumer groups can only re-use but not share clients.
116func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
117 if client == nil {
118 return nil, ConfigurationError("client must not be nil")
119 }
120 // For clients passed in by the client, ensure we don't
121 // call Close() on it.
122 cli := &nopCloserClient{client}
123 return newConsumerGroup(groupID, cli)
124}
125
126func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
127 config := client.Config()
128 if !config.Version.IsAtLeast(V0_10_2_0) {
129 return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
130 }
131
132 consumer, err := newConsumer(client)
133 if err != nil {
134 return nil, err
135 }
136
137 cg := &consumerGroup{
138 client: client,
139 consumer: consumer,
140 config: config,
141 groupID: groupID,
142 errors: make(chan error, config.ChannelBufferSize),
143 closed: make(chan none),
144 userData: config.Consumer.Group.Member.UserData,
145 metricRegistry: newCleanupRegistry(config.MetricRegistry),
146 }
147 if config.Consumer.Group.InstanceId != "" && config.Version.IsAtLeast(V2_3_0_0) {
148 cg.groupInstanceId = &config.Consumer.Group.InstanceId
149 }
150 return cg, nil
151}
152
153// Errors implements ConsumerGroup.
154func (c *consumerGroup) Errors() <-chan error { return c.errors }
155
156// Close implements ConsumerGroup.
157func (c *consumerGroup) Close() (err error) {
158 c.closeOnce.Do(func() {
159 close(c.closed)
160
161 // leave group
162 if e := c.leave(); e != nil {
163 err = e
164 }
165
166 go func() {
167 c.errorsLock.Lock()
168 defer c.errorsLock.Unlock()
169 close(c.errors)
170 }()
171
172 // drain errors
173 for e := range c.errors {
174 err = e
175 }
176
177 if e := c.client.Close(); e != nil {
178 err = e
179 }
180
181 c.metricRegistry.UnregisterAll()
182 })
183 return
184}
185
186// Consume implements ConsumerGroup.
187func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
188 // Ensure group is not closed
189 select {
190 case <-c.closed:
191 return ErrClosedConsumerGroup
192 default:
193 }
194
195 c.lock.Lock()
196 defer c.lock.Unlock()
197
198 // Quick exit when no topics are provided
199 if len(topics) == 0 {
200 return fmt.Errorf("no topics provided")
201 }
202
203 // Refresh metadata for requested topics
204 if err := c.client.RefreshMetadata(topics...); err != nil {
205 return err
206 }
207
208 // Init session
209 sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
210 if errors.Is(err, ErrClosedClient) {
211 return ErrClosedConsumerGroup
212 } else if err != nil {
213 return err
214 }
215
216 // Wait for session exit signal or Close() call
217 select {
218 case <-c.closed:
219 case <-sess.ctx.Done():
220 }
221
222 // Gracefully release session claims
223 return sess.release(true)
224}
225
226// Pause implements ConsumerGroup.
227func (c *consumerGroup) Pause(partitions map[string][]int32) {
228 c.consumer.Pause(partitions)
229}
230
231// Resume implements ConsumerGroup.
232func (c *consumerGroup) Resume(partitions map[string][]int32) {
233 c.consumer.Resume(partitions)
234}
235
236// PauseAll implements ConsumerGroup.
237func (c *consumerGroup) PauseAll() {
238 c.consumer.PauseAll()
239}
240
241// ResumeAll implements ConsumerGroup.
242func (c *consumerGroup) ResumeAll() {
243 c.consumer.ResumeAll()
244}
245
246func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
247 select {
248 case <-ctx.Done():
249 return nil, ctx.Err()
250 case <-c.closed:
251 return nil, ErrClosedConsumerGroup
252 case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
253 }
254
255 if refreshCoordinator {
256 err := c.client.RefreshCoordinator(c.groupID)
257 if err != nil {
258 if retries <= 0 {
259 return nil, err
260 }
261 return c.retryNewSession(ctx, topics, handler, retries-1, true)
262 }
263 }
264
265 return c.newSession(ctx, topics, handler, retries-1)
266}
267
268func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
269 if ctx.Err() != nil {
270 return nil, ctx.Err()
271 }
272 coordinator, err := c.client.Coordinator(c.groupID)
273 if err != nil {
274 if retries <= 0 {
275 return nil, err
276 }
277
278 return c.retryNewSession(ctx, topics, handler, retries, true)
279 }
280
281 var (
282 metricRegistry = c.metricRegistry
283 consumerGroupJoinTotal metrics.Counter
284 consumerGroupJoinFailed metrics.Counter
285 consumerGroupSyncTotal metrics.Counter
286 consumerGroupSyncFailed metrics.Counter
287 )
288
289 if metricRegistry != nil {
290 consumerGroupJoinTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-total-%s", c.groupID), metricRegistry)
291 consumerGroupJoinFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-failed-%s", c.groupID), metricRegistry)
292 consumerGroupSyncTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-total-%s", c.groupID), metricRegistry)
293 consumerGroupSyncFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-failed-%s", c.groupID), metricRegistry)
294 }
295
296 // Join consumer group
297 join, err := c.joinGroupRequest(coordinator, topics)
298 if consumerGroupJoinTotal != nil {
299 consumerGroupJoinTotal.Inc(1)
300 }
301 if err != nil {
302 _ = coordinator.Close()
303 if consumerGroupJoinFailed != nil {
304 consumerGroupJoinFailed.Inc(1)
305 }
306 return nil, err
307 }
308 if !errors.Is(join.Err, ErrNoError) {
309 if consumerGroupJoinFailed != nil {
310 consumerGroupJoinFailed.Inc(1)
311 }
312 }
313 switch join.Err {
314 case ErrNoError:
315 c.memberID = join.MemberId
316 case ErrUnknownMemberId, ErrIllegalGeneration:
317 // reset member ID and retry immediately
318 c.memberID = ""
319 return c.newSession(ctx, topics, handler, retries)
320 case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress:
321 // retry after backoff
322 if retries <= 0 {
323 return nil, join.Err
324 }
325 return c.retryNewSession(ctx, topics, handler, retries, true)
326 case ErrMemberIdRequired:
327 // from JoinGroupRequest v4 onwards (due to KIP-394) if the client starts
328 // with an empty member id, it needs to get the assigned id from the
329 // response and send another join request with that id to actually join the
330 // group
331 c.memberID = join.MemberId
332 return c.newSession(ctx, topics, handler, retries)
333 case ErrFencedInstancedId:
334 if c.groupInstanceId != nil {
335 Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
336 }
337 return nil, join.Err
338 default:
339 return nil, join.Err
340 }
341
342 var strategy BalanceStrategy
343 var ok bool
344 if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy == nil {
345 strategy, ok = c.findStrategy(join.GroupProtocol, c.config.Consumer.Group.Rebalance.GroupStrategies)
346 if !ok {
347 // this case shouldn't happen in practice, since the leader will choose the protocol
348 // that all the members support
349 return nil, fmt.Errorf("unable to find selected strategy: %s", join.GroupProtocol)
350 }
351 }
352
353 // Prepare distribution plan if we joined as the leader
354 var plan BalanceStrategyPlan
355 var members map[string]ConsumerGroupMemberMetadata
356 var allSubscribedTopicPartitions map[string][]int32
357 var allSubscribedTopics []string
358 if join.LeaderId == join.MemberId {
359 members, err = join.GetMembers()
360 if err != nil {
361 return nil, err
362 }
363
364 allSubscribedTopicPartitions, allSubscribedTopics, plan, err = c.balance(strategy, members)
365 if err != nil {
366 return nil, err
367 }
368 }
369
370 // Sync consumer group
371 syncGroupResponse, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId, strategy)
372 if consumerGroupSyncTotal != nil {
373 consumerGroupSyncTotal.Inc(1)
374 }
375 if err != nil {
376 _ = coordinator.Close()
377 if consumerGroupSyncFailed != nil {
378 consumerGroupSyncFailed.Inc(1)
379 }
380 return nil, err
381 }
382 if !errors.Is(syncGroupResponse.Err, ErrNoError) {
383 if consumerGroupSyncFailed != nil {
384 consumerGroupSyncFailed.Inc(1)
385 }
386 }
387
388 switch syncGroupResponse.Err {
389 case ErrNoError:
390 case ErrUnknownMemberId, ErrIllegalGeneration:
391 // reset member ID and retry immediately
392 c.memberID = ""
393 return c.newSession(ctx, topics, handler, retries)
394 case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress:
395 // retry after backoff
396 if retries <= 0 {
397 return nil, syncGroupResponse.Err
398 }
399 return c.retryNewSession(ctx, topics, handler, retries, true)
400 case ErrFencedInstancedId:
401 if c.groupInstanceId != nil {
402 Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
403 }
404 return nil, syncGroupResponse.Err
405 default:
406 return nil, syncGroupResponse.Err
407 }
408
409 // Retrieve and sort claims
410 var claims map[string][]int32
411 if len(syncGroupResponse.MemberAssignment) > 0 {
412 members, err := syncGroupResponse.GetMemberAssignment()
413 if err != nil {
414 return nil, err
415 }
416 claims = members.Topics
417
418 // in the case of stateful balance strategies, hold on to the returned
419 // assignment metadata, otherwise, reset the statically defined consumer
420 // group metadata
421 if members.UserData != nil {
422 c.userData = members.UserData
423 } else {
424 c.userData = c.config.Consumer.Group.Member.UserData
425 }
426
427 for _, partitions := range claims {
428 sort.Sort(int32Slice(partitions))
429 }
430 }
431
432 session, err := newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
433 if err != nil {
434 return nil, err
435 }
436
437 // only the leader needs to check whether there are newly-added partitions in order to trigger a rebalance
438 if join.LeaderId == join.MemberId {
439 go c.loopCheckPartitionNumbers(allSubscribedTopicPartitions, allSubscribedTopics, session)
440 }
441
442 return session, err
443}
444
445func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
446 req := &JoinGroupRequest{
447 GroupId: c.groupID,
448 MemberId: c.memberID,
449 SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
450 ProtocolType: "consumer",
451 }
452 if c.config.Version.IsAtLeast(V0_10_1_0) {
453 req.Version = 1
454 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
455 }
456 if c.config.Version.IsAtLeast(V0_11_0_0) {
457 req.Version = 2
458 }
459 if c.config.Version.IsAtLeast(V0_11_0_0) {
460 req.Version = 2
461 }
462 if c.config.Version.IsAtLeast(V2_0_0_0) {
463 req.Version = 3
464 }
465 // from JoinGroupRequest v4 onwards (due to KIP-394) the client will actually
466 // send two JoinGroupRequests, once with the empty member id, and then again
467 // with the assigned id from the first response. This is handled via the
468 // ErrMemberIdRequired case.
469 if c.config.Version.IsAtLeast(V2_2_0_0) {
470 req.Version = 4
471 }
472 if c.config.Version.IsAtLeast(V2_3_0_0) {
473 req.Version = 5
474 req.GroupInstanceId = c.groupInstanceId
475 if c.config.Version.IsAtLeast(V2_4_0_0) {
476 req.Version = 6
477 }
478 }
479
480 meta := &ConsumerGroupMemberMetadata{
481 Topics: topics,
482 UserData: c.userData,
483 }
484 var strategy BalanceStrategy
485 if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy != nil {
486 if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
487 return nil, err
488 }
489 } else {
490 for _, strategy = range c.config.Consumer.Group.Rebalance.GroupStrategies {
491 if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
492 return nil, err
493 }
494 }
495 }
496
497 return coordinator.JoinGroup(req)
498}
499
500// findStrategy returns the BalanceStrategy with the specified protocolName
501// from the slice provided.
502func (c *consumerGroup) findStrategy(name string, groupStrategies []BalanceStrategy) (BalanceStrategy, bool) {
503 for _, strategy := range groupStrategies {
504 if strategy.Name() == name {
505 return strategy, true
506 }
507 }
508 return nil, false
509}
510
511func (c *consumerGroup) syncGroupRequest(
512 coordinator *Broker,
513 members map[string]ConsumerGroupMemberMetadata,
514 plan BalanceStrategyPlan,
515 generationID int32,
516 strategy BalanceStrategy,
517) (*SyncGroupResponse, error) {
518 req := &SyncGroupRequest{
519 GroupId: c.groupID,
520 MemberId: c.memberID,
521 GenerationId: generationID,
522 }
523
524 // Versions 1 and 2 are the same as version 0.
525 if c.config.Version.IsAtLeast(V0_11_0_0) {
526 req.Version = 1
527 }
528 if c.config.Version.IsAtLeast(V2_0_0_0) {
529 req.Version = 2
530 }
531 // Starting from version 3, we add a new field called groupInstanceId to indicate member identity across restarts.
532 if c.config.Version.IsAtLeast(V2_3_0_0) {
533 req.Version = 3
534 req.GroupInstanceId = c.groupInstanceId
535 if c.config.Version.IsAtLeast(V2_4_0_0) {
536 req.Version = 4
537 }
538 }
539
540 for memberID, topics := range plan {
541 assignment := &ConsumerGroupMemberAssignment{Topics: topics}
542 userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
543 if err != nil {
544 return nil, err
545 }
546 assignment.UserData = userDataBytes
547 if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
548 return nil, err
549 }
550 delete(members, memberID)
551 }
552 // add empty assignments for any remaining members
553 for memberID := range members {
554 if err := req.AddGroupAssignmentMember(memberID, &ConsumerGroupMemberAssignment{}); err != nil {
555 return nil, err
556 }
557 }
558
559 return coordinator.SyncGroup(req)
560}
561
562func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
563 req := &HeartbeatRequest{
564 GroupId: c.groupID,
565 MemberId: memberID,
566 GenerationId: generationID,
567 }
568
569 // Version 1 and version 2 are the same as version 0.
570 if c.config.Version.IsAtLeast(V0_11_0_0) {
571 req.Version = 1
572 }
573 if c.config.Version.IsAtLeast(V2_0_0_0) {
574 req.Version = 2
575 }
576 // Starting from version 3, we add a new field called groupInstanceId to indicate member identity across restarts.
577 if c.config.Version.IsAtLeast(V2_3_0_0) {
578 req.Version = 3
579 req.GroupInstanceId = c.groupInstanceId
580 // Version 4 is the first flexible version
581 if c.config.Version.IsAtLeast(V2_4_0_0) {
582 req.Version = 4
583 }
584 }
585
586 return coordinator.Heartbeat(req)
587}
588
589func (c *consumerGroup) balance(strategy BalanceStrategy, members map[string]ConsumerGroupMemberMetadata) (map[string][]int32, []string, BalanceStrategyPlan, error) {
590 topicPartitions := make(map[string][]int32)
591 for _, meta := range members {
592 for _, topic := range meta.Topics {
593 topicPartitions[topic] = nil
594 }
595 }
596
597 allSubscribedTopics := make([]string, 0, len(topicPartitions))
598 for topic := range topicPartitions {
599 allSubscribedTopics = append(allSubscribedTopics, topic)
600 }
601
602 // refresh metadata for all the subscribed topics in the consumer group
603 // to avoid using stale metadata to assigning partitions
604 err := c.client.RefreshMetadata(allSubscribedTopics...)
605 if err != nil {
606 return nil, nil, nil, err
607 }
608
609 for topic := range topicPartitions {
610 partitions, err := c.client.Partitions(topic)
611 if err != nil {
612 return nil, nil, nil, err
613 }
614 topicPartitions[topic] = partitions
615 }
616
617 plan, err := strategy.Plan(members, topicPartitions)
618 return topicPartitions, allSubscribedTopics, plan, err
619}
620
621// Leaves the cluster, called by Close.
622func (c *consumerGroup) leave() error {
623 c.lock.Lock()
624 defer c.lock.Unlock()
625 if c.memberID == "" {
626 return nil
627 }
628
629 coordinator, err := c.client.Coordinator(c.groupID)
630 if err != nil {
631 return err
632 }
633
634 // as per KIP-345 if groupInstanceId is set, i.e. static membership is in action, then do not leave group when consumer closed, just clear memberID
635 if c.groupInstanceId != nil {
636 c.memberID = ""
637 return nil
638 }
639 req := &LeaveGroupRequest{
640 GroupId: c.groupID,
641 MemberId: c.memberID,
642 }
643 if c.config.Version.IsAtLeast(V0_11_0_0) {
644 req.Version = 1
645 }
646 if c.config.Version.IsAtLeast(V2_0_0_0) {
647 req.Version = 2
648 }
649 if c.config.Version.IsAtLeast(V2_4_0_0) {
650 req.Version = 4
651 req.Members = append(req.Members, MemberIdentity{
652 MemberId: c.memberID,
653 })
654 }
655
656 resp, err := coordinator.LeaveGroup(req)
657 if err != nil {
658 _ = coordinator.Close()
659 return err
660 }
661
662 // clear the memberID
663 c.memberID = ""
664
665 switch resp.Err {
666 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
667 return nil
668 default:
669 return resp.Err
670 }
671}
672
673func (c *consumerGroup) handleError(err error, topic string, partition int32) {
674 var consumerError *ConsumerError
675 if ok := errors.As(err, &consumerError); !ok && topic != "" && partition > -1 {
676 err = &ConsumerError{
677 Topic: topic,
678 Partition: partition,
679 Err: err,
680 }
681 }
682
683 if !c.config.Consumer.Return.Errors {
684 Logger.Println(err)
685 return
686 }
687
688 c.errorsLock.RLock()
689 defer c.errorsLock.RUnlock()
690 select {
691 case <-c.closed:
692 // consumer is closed
693 return
694 default:
695 }
696
697 select {
698 case c.errors <- err:
699 default:
700 // no error listener
701 }
702}
703
704func (c *consumerGroup) loopCheckPartitionNumbers(allSubscribedTopicPartitions map[string][]int32, topics []string, session *consumerGroupSession) {
705 if c.config.Metadata.RefreshFrequency == time.Duration(0) {
706 return
707 }
708
709 defer session.cancel()
710
711 oldTopicToPartitionNum := make(map[string]int, len(allSubscribedTopicPartitions))
712 for topic, partitions := range allSubscribedTopicPartitions {
713 oldTopicToPartitionNum[topic] = len(partitions)
714 }
715
716 pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
717 defer pause.Stop()
718 for {
719 if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
720 return
721 } else {
722 for topic, num := range oldTopicToPartitionNum {
723 if newTopicToPartitionNum[topic] != num {
724 Logger.Printf(
725 "consumergroup/%s loop check partition number goroutine find partitions in topics %s changed from %d to %d\n",
726 c.groupID, topics, num, newTopicToPartitionNum[topic])
727 return // trigger the end of the session on exit
728 }
729 }
730 }
731 select {
732 case <-pause.C:
733 case <-session.ctx.Done():
734 Logger.Printf(
735 "consumergroup/%s loop check partition number goroutine will exit, topics %s\n",
736 c.groupID, topics)
737 // if session closed by other, should be exited
738 return
739 case <-c.closed:
740 return
741 }
742 }
743}
744
745func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
746 topicToPartitionNum := make(map[string]int, len(topics))
747 for _, topic := range topics {
748 if partitionNum, err := c.client.Partitions(topic); err != nil {
749 Logger.Printf(
750 "consumergroup/%s topic %s get partition number failed due to '%v'\n",
751 c.groupID, topic, err)
752 return nil, err
753 } else {
754 topicToPartitionNum[topic] = len(partitionNum)
755 }
756 }
757 return topicToPartitionNum, nil
758}
759
760// --------------------------------------------------------------------
761
762// ConsumerGroupSession represents a consumer group member session.
763type ConsumerGroupSession interface {
764 // Claims returns information about the claimed partitions by topic.
765 Claims() map[string][]int32
766
767 // MemberID returns the cluster member ID.
768 MemberID() string
769
770 // GenerationID returns the current generation ID.
771 GenerationID() int32
772
773 // MarkOffset marks the provided offset, alongside a metadata string
774 // that represents the state of the partition consumer at that point in time. The
775 // metadata string can be used by another consumer to restore that state, so it
776 // can resume consumption.
777 //
778 // To follow upstream conventions, you are expected to mark the offset of the
779 // next message to read, not the last message read. Thus, when calling `MarkOffset`
780 // you should typically add one to the offset of the last consumed message.
781 //
782 // Note: calling MarkOffset does not necessarily commit the offset to the backend
783 // store immediately for efficiency reasons, and it may never be committed if
784 // your application crashes. This means that you may end up processing the same
785 // message twice, and your processing should ideally be idempotent.
786 MarkOffset(topic string, partition int32, offset int64, metadata string)
787
788 // Commit the offset to the backend
789 //
790 // Note: calling Commit performs a blocking synchronous operation.
791 Commit()
792
793 // ResetOffset resets to the provided offset, alongside a metadata string that
794 // represents the state of the partition consumer at that point in time. Reset
795 // acts as a counterpart to MarkOffset, the difference being that it allows to
796 // reset an offset to an earlier or smaller value, where MarkOffset only
797 // allows incrementing the offset. cf MarkOffset for more details.
798 ResetOffset(topic string, partition int32, offset int64, metadata string)
799
800 // MarkMessage marks a message as consumed.
801 MarkMessage(msg *ConsumerMessage, metadata string)
802
803 // Context returns the session context.
804 Context() context.Context
805}
806
807type consumerGroupSession struct {
808 parent *consumerGroup
809 memberID string
810 generationID int32
811 handler ConsumerGroupHandler
812
813 claims map[string][]int32
814 offsets *offsetManager
815 ctx context.Context
816 cancel func()
817
818 waitGroup sync.WaitGroup
819 releaseOnce sync.Once
820 hbDying, hbDead chan none
821}
822
823func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
824 // init context
825 ctx, cancel := context.WithCancel(ctx)
826
827 // init offset manager
828 offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client, cancel)
829 if err != nil {
830 return nil, err
831 }
832
833 // init session
834 sess := &consumerGroupSession{
835 parent: parent,
836 memberID: memberID,
837 generationID: generationID,
838 handler: handler,
839 offsets: offsets,
840 claims: claims,
841 ctx: ctx,
842 cancel: cancel,
843 hbDying: make(chan none),
844 hbDead: make(chan none),
845 }
846
847 // start heartbeat loop
848 go sess.heartbeatLoop()
849
850 // create a POM for each claim
851 for topic, partitions := range claims {
852 for _, partition := range partitions {
853 pom, err := offsets.ManagePartition(topic, partition)
854 if err != nil {
855 _ = sess.release(false)
856 return nil, err
857 }
858
859 // handle POM errors
860 go func(topic string, partition int32) {
861 for err := range pom.Errors() {
862 sess.parent.handleError(err, topic, partition)
863 }
864 }(topic, partition)
865 }
866 }
867
868 // perform setup
869 if err := handler.Setup(sess); err != nil {
870 _ = sess.release(true)
871 return nil, err
872 }
873
874 // start consuming each topic partition in its own goroutine
875 for topic, partitions := range claims {
876 for _, partition := range partitions {
877 sess.waitGroup.Add(1) // increment wait group before spawning goroutine
878 go func(topic string, partition int32) {
879 defer sess.waitGroup.Done()
880 // cancel the group session as soon as any of the consume calls return
881 defer sess.cancel()
882
883 // if partition not currently readable, wait for it to become readable
884 if sess.parent.client.PartitionNotReadable(topic, partition) {
885 timer := time.NewTimer(5 * time.Second)
886 defer timer.Stop()
887
888 for sess.parent.client.PartitionNotReadable(topic, partition) {
889 select {
890 case <-ctx.Done():
891 return
892 case <-parent.closed:
893 return
894 case <-timer.C:
895 timer.Reset(5 * time.Second)
896 }
897 }
898 }
899
900 // consume a single topic/partition, blocking
901 sess.consume(topic, partition)
902 }(topic, partition)
903 }
904 }
905 return sess, nil
906}
907
908func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
909func (s *consumerGroupSession) MemberID() string { return s.memberID }
910func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
911
912func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
913 if pom := s.offsets.findPOM(topic, partition); pom != nil {
914 pom.MarkOffset(offset, metadata)
915 }
916}
917
918func (s *consumerGroupSession) Commit() {
919 s.offsets.Commit()
920}
921
922func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
923 if pom := s.offsets.findPOM(topic, partition); pom != nil {
924 pom.ResetOffset(offset, metadata)
925 }
926}
927
928func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
929 s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
930}
931
932func (s *consumerGroupSession) Context() context.Context {
933 return s.ctx
934}
935
936func (s *consumerGroupSession) consume(topic string, partition int32) {
937 // quick exit if rebalance is due
938 select {
939 case <-s.ctx.Done():
940 return
941 case <-s.parent.closed:
942 return
943 default:
944 }
945
946 // get next offset
947 offset := s.parent.config.Consumer.Offsets.Initial
948 if pom := s.offsets.findPOM(topic, partition); pom != nil {
949 offset, _ = pom.NextOffset()
950 }
951
952 // create new claim
953 claim, err := newConsumerGroupClaim(s, topic, partition, offset)
954 if err != nil {
955 s.parent.handleError(err, topic, partition)
956 return
957 }
958
959 // handle errors
960 go func() {
961 for err := range claim.Errors() {
962 s.parent.handleError(err, topic, partition)
963 }
964 }()
965
966 // trigger close when session is done
967 go func() {
968 select {
969 case <-s.ctx.Done():
970 case <-s.parent.closed:
971 }
972 claim.AsyncClose()
973 }()
974
975 // start processing
976 if err := s.handler.ConsumeClaim(s, claim); err != nil {
977 s.parent.handleError(err, topic, partition)
978 }
979
980 // ensure consumer is closed & drained
981 claim.AsyncClose()
982 for _, err := range claim.waitClosed() {
983 s.parent.handleError(err, topic, partition)
984 }
985}
986
987func (s *consumerGroupSession) release(withCleanup bool) (err error) {
988 // signal release, stop heartbeat
989 s.cancel()
990
991 // wait for consumers to exit
992 s.waitGroup.Wait()
993
994 // perform release
995 s.releaseOnce.Do(func() {
996 if withCleanup {
997 if e := s.handler.Cleanup(s); e != nil {
998 s.parent.handleError(e, "", -1)
999 err = e
1000 }
1001 }
1002
1003 if e := s.offsets.Close(); e != nil {
1004 err = e
1005 }
1006
1007 close(s.hbDying)
1008 <-s.hbDead
1009 })
1010
1011 Logger.Printf(
1012 "consumergroup/session/%s/%d released\n",
1013 s.MemberID(), s.GenerationID())
1014
1015 return
1016}
1017
1018func (s *consumerGroupSession) heartbeatLoop() {
1019 defer close(s.hbDead)
1020 defer s.cancel() // trigger the end of the session on exit
1021 defer func() {
1022 Logger.Printf(
1023 "consumergroup/session/%s/%d heartbeat loop stopped\n",
1024 s.MemberID(), s.GenerationID())
1025 }()
1026
1027 pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
1028 defer pause.Stop()
1029
1030 retryBackoff := time.NewTimer(s.parent.config.Metadata.Retry.Backoff)
1031 defer retryBackoff.Stop()
1032
1033 retries := s.parent.config.Metadata.Retry.Max
1034 for {
1035 coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
1036 if err != nil {
1037 if retries <= 0 {
1038 s.parent.handleError(err, "", -1)
1039 return
1040 }
1041 retryBackoff.Reset(s.parent.config.Metadata.Retry.Backoff)
1042 select {
1043 case <-s.hbDying:
1044 return
1045 case <-retryBackoff.C:
1046 retries--
1047 }
1048 continue
1049 }
1050
1051 resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
1052 if err != nil {
1053 _ = coordinator.Close()
1054
1055 if retries <= 0 {
1056 s.parent.handleError(err, "", -1)
1057 return
1058 }
1059
1060 retries--
1061 continue
1062 }
1063
1064 switch resp.Err {
1065 case ErrNoError:
1066 retries = s.parent.config.Metadata.Retry.Max
1067 case ErrRebalanceInProgress:
1068 retries = s.parent.config.Metadata.Retry.Max
1069 s.cancel()
1070 case ErrUnknownMemberId, ErrIllegalGeneration:
1071 return
1072 case ErrFencedInstancedId:
1073 if s.parent.groupInstanceId != nil {
1074 Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *s.parent.groupInstanceId)
1075 }
1076 s.parent.handleError(resp.Err, "", -1)
1077 return
1078 default:
1079 s.parent.handleError(resp.Err, "", -1)
1080 return
1081 }
1082
1083 select {
1084 case <-pause.C:
1085 case <-s.hbDying:
1086 return
1087 }
1088 }
1089}
1090
1091// --------------------------------------------------------------------
1092
1093// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
1094// It also provides hooks for your consumer group session life-cycle and allow you to
1095// trigger logic before or after the consume loop(s).
1096//
1097// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
1098// ensure that all state is safely protected against race conditions.
1099type ConsumerGroupHandler interface {
1100 // Setup is run at the beginning of a new session, before ConsumeClaim.
1101 Setup(ConsumerGroupSession) error
1102
1103 // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
1104 // but before the offsets are committed for the very last time.
1105 Cleanup(ConsumerGroupSession) error
1106
1107 // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
1108 // Once the Messages() channel is closed, the Handler must finish its processing
1109 // loop and exit.
1110 ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
1111}
1112
1113// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
1114type ConsumerGroupClaim interface {
1115 // Topic returns the consumed topic name.
1116 Topic() string
1117
1118 // Partition returns the consumed partition.
1119 Partition() int32
1120
1121 // InitialOffset returns the initial offset that was used as a starting point for this claim.
1122 InitialOffset() int64
1123
1124 // HighWaterMarkOffset returns the high watermark offset of the partition,
1125 // i.e. the offset that will be used for the next message that will be produced.
1126 // You can use this to determine how far behind the processing is.
1127 HighWaterMarkOffset() int64
1128
1129 // Messages returns the read channel for the messages that are returned by
1130 // the broker. The messages channel will be closed when a new rebalance cycle
1131 // is due. You must finish processing and mark offsets within
1132 // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
1133 // re-assigned to another group member.
1134 Messages() <-chan *ConsumerMessage
1135}
1136
1137type consumerGroupClaim struct {
1138 topic string
1139 partition int32
1140 offset int64
1141 PartitionConsumer
1142}
1143
1144func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
1145 pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
1146
1147 if errors.Is(err, ErrOffsetOutOfRange) && sess.parent.config.Consumer.Group.ResetInvalidOffsets {
1148 offset = sess.parent.config.Consumer.Offsets.Initial
1149 pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
1150 }
1151 if err != nil {
1152 return nil, err
1153 }
1154
1155 go func() {
1156 for err := range pcm.Errors() {
1157 sess.parent.handleError(err, topic, partition)
1158 }
1159 }()
1160
1161 return &consumerGroupClaim{
1162 topic: topic,
1163 partition: partition,
1164 offset: offset,
1165 PartitionConsumer: pcm,
1166 }, nil
1167}
1168
1169func (c *consumerGroupClaim) Topic() string { return c.topic }
1170func (c *consumerGroupClaim) Partition() int32 { return c.partition }
1171func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
1172
1173// Drains messages and errors, ensures the claim is fully closed.
1174func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
1175 go func() {
1176 for range c.Messages() {
1177 }
1178 }()
1179
1180 for err := range c.Errors() {
1181 errs = append(errs, err)
1182 }
1183 return
1184}