blob: 59f894867c9c0592ce96939a1dd835a85e15443e [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "container/heap"
5 "errors"
6 "fmt"
7 "maps"
8 "math"
9 "slices"
10 "sort"
11 "strings"
12)
13
14const (
15 // RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
16 RangeBalanceStrategyName = "range"
17
18 // RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
19 RoundRobinBalanceStrategyName = "roundrobin"
20
21 // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
22 StickyBalanceStrategyName = "sticky"
23
24 defaultGeneration = -1
25)
26
27// BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
28// It contains an allocation of topic/partitions by memberID in the form of
29// a `memberID -> topic -> partitions` map.
30type BalanceStrategyPlan map[string]map[string][]int32
31
32// Add assigns a topic with a number partitions to a member.
33func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
34 if len(partitions) == 0 {
35 return
36 }
37 if _, ok := p[memberID]; !ok {
38 p[memberID] = make(map[string][]int32, 1)
39 }
40 p[memberID][topic] = append(p[memberID][topic], partitions...)
41}
42
43// --------------------------------------------------------------------
44
45// BalanceStrategy is used to balance topics and partitions
46// across members of a consumer group
47type BalanceStrategy interface {
48 // Name uniquely identifies the strategy.
49 Name() string
50
51 // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
52 // and returns a distribution plan.
53 Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
54
55 // AssignmentData returns the serialized assignment data for the specified
56 // memberID
57 AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
58}
59
60// --------------------------------------------------------------------
61
62// NewBalanceStrategyRange returns a range balance strategy,
63// which is the default and assigns partitions as ranges to consumer group members.
64// This follows the same logic as
65// https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html
66//
67// Example with two topics T1 and T2 with six partitions each (0..5) and two members (M1, M2):
68//
69// M1: {T1: [0, 1, 2], T2: [0, 1, 2]}
70// M2: {T1: [3, 4, 5], T2: [3, 4, 5]}
71func NewBalanceStrategyRange() BalanceStrategy {
72 return &balanceStrategy{
73 name: RangeBalanceStrategyName,
74 coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
75 partitionsPerConsumer := len(partitions) / len(memberIDs)
76 consumersWithExtraPartition := len(partitions) % len(memberIDs)
77
78 sort.Strings(memberIDs)
79
80 for i, memberID := range memberIDs {
81 min := i*partitionsPerConsumer + int(math.Min(float64(consumersWithExtraPartition), float64(i)))
82 extra := 0
83 if i < consumersWithExtraPartition {
84 extra = 1
85 }
86 max := min + partitionsPerConsumer + extra
87 plan.Add(memberID, topic, partitions[min:max]...)
88 }
89 },
90 }
91}
92
93// Deprecated: use NewBalanceStrategyRange to avoid data race issue
94var BalanceStrategyRange = NewBalanceStrategyRange()
95
96// NewBalanceStrategySticky returns a sticky balance strategy,
97// which assigns partitions to members with an attempt to preserve earlier assignments
98// while maintain a balanced partition distribution.
99// Example with topic T with six partitions (0..5) and two members (M1, M2):
100//
101// M1: {T: [0, 2, 4]}
102// M2: {T: [1, 3, 5]}
103//
104// On reassignment with an additional consumer, you might get an assignment plan like:
105//
106// M1: {T: [0, 2]}
107// M2: {T: [1, 3]}
108// M3: {T: [4, 5]}
109func NewBalanceStrategySticky() BalanceStrategy {
110 return &stickyBalanceStrategy{}
111}
112
113// Deprecated: use NewBalanceStrategySticky to avoid data race issue
114var BalanceStrategySticky = NewBalanceStrategySticky()
115
116// --------------------------------------------------------------------
117
118type balanceStrategy struct {
119 coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
120 name string
121}
122
123// Name implements BalanceStrategy.
124func (s *balanceStrategy) Name() string { return s.name }
125
126// Plan implements BalanceStrategy.
127func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
128 // Build members by topic map
129 mbt := make(map[string][]string)
130 for memberID, meta := range members {
131 for _, topic := range meta.Topics {
132 mbt[topic] = append(mbt[topic], memberID)
133 }
134 }
135
136 // func to sort and de-duplicate a StringSlice
137 uniq := func(ss sort.StringSlice) []string {
138 if ss.Len() < 2 {
139 return ss
140 }
141 sort.Sort(ss)
142 var i, j int
143 for i = 1; i < ss.Len(); i++ {
144 if ss[i] == ss[j] {
145 continue
146 }
147 j++
148 ss.Swap(i, j)
149 }
150 return ss[:j+1]
151 }
152
153 // Assemble plan
154 plan := make(BalanceStrategyPlan, len(members))
155 for topic, memberIDs := range mbt {
156 s.coreFn(plan, uniq(memberIDs), topic, topics[topic])
157 }
158 return plan, nil
159}
160
161// AssignmentData simple strategies do not require any shared assignment data
162func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
163 return nil, nil
164}
165
166type stickyBalanceStrategy struct {
167 movements partitionMovements
168}
169
170// Name implements BalanceStrategy.
171func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }
172
173// Plan implements BalanceStrategy.
174func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
175 // track partition movements during generation of the partition assignment plan
176 s.movements = partitionMovements{
177 Movements: make(map[topicPartitionAssignment]consumerPair),
178 PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
179 }
180
181 // prepopulate the current assignment state from userdata on the consumer group members
182 currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
183 if err != nil {
184 return nil, err
185 }
186
187 // determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
188 isFreshAssignment := len(currentAssignment) == 0
189
190 // create a mapping of all current topic partitions and the consumers that can be assigned to them
191 partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
192 for topic, partitions := range topics {
193 for _, partition := range partitions {
194 partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
195 }
196 }
197
198 // create a mapping of all consumers to all potential topic partitions that can be assigned to them
199 // also, populate the mapping of partitions to potential consumers
200 consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
201 for memberID, meta := range members {
202 consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
203 for _, topicSubscription := range meta.Topics {
204 // only evaluate topic subscriptions that are present in the supplied topics map
205 if _, found := topics[topicSubscription]; found {
206 for _, partition := range topics[topicSubscription] {
207 topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
208 consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
209 partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
210 }
211 }
212 }
213
214 // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
215 if _, exists := currentAssignment[memberID]; !exists {
216 currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
217 }
218 }
219
220 // create a mapping of each partition to its current consumer, where possible
221 currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
222 unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
223 for partition := range partition2AllPotentialConsumers {
224 unvisitedPartitions[partition] = true
225 }
226 var unassignedPartitions []topicPartitionAssignment
227 for memberID, partitions := range currentAssignment {
228 var keepPartitions []topicPartitionAssignment
229 for _, partition := range partitions {
230 // If this partition no longer exists at all, likely due to the
231 // topic being deleted, we remove the partition from the member.
232 if _, exists := partition2AllPotentialConsumers[partition]; !exists {
233 continue
234 }
235 delete(unvisitedPartitions, partition)
236 currentPartitionConsumers[partition] = memberID
237
238 if !slices.Contains(members[memberID].Topics, partition.Topic) {
239 unassignedPartitions = append(unassignedPartitions, partition)
240 continue
241 }
242 keepPartitions = append(keepPartitions, partition)
243 }
244 currentAssignment[memberID] = keepPartitions
245 }
246 for unvisited := range unvisitedPartitions {
247 unassignedPartitions = append(unassignedPartitions, unvisited)
248 }
249
250 // sort the topic partitions in order of priority for reassignment
251 sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
252
253 // at this point we have preserved all valid topic partition to consumer assignments and removed
254 // all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
255 // to consumers so that the topic partition assignments are as balanced as possible.
256
257 // an ascending sorted set of consumers based on how many topic partitions are already assigned to them
258 sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
259 s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
260
261 // Assemble plan
262 plan := make(BalanceStrategyPlan, len(currentAssignment))
263 for memberID, assignments := range currentAssignment {
264 if len(assignments) == 0 {
265 plan[memberID] = make(map[string][]int32)
266 } else {
267 for _, assignment := range assignments {
268 plan.Add(memberID, assignment.Topic, assignment.Partition)
269 }
270 }
271 }
272 return plan, nil
273}
274
275// AssignmentData serializes the set of topics currently assigned to the
276// specified member as part of the supplied balance plan
277func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
278 return encode(&StickyAssignorUserDataV1{
279 Topics: topics,
280 Generation: generationID,
281 }, nil)
282}
283
284// Balance assignments across consumers for maximum fairness and stickiness.
285func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
286 initializing := len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0
287
288 // assign all unassigned partitions
289 for _, partition := range unassignedPartitions {
290 // skip if there is no potential consumer for the partition
291 if len(partition2AllPotentialConsumers[partition]) == 0 {
292 continue
293 }
294 sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)
295 }
296
297 // narrow down the reassignment scope to only those partitions that can actually be reassigned
298 for partition := range partition2AllPotentialConsumers {
299 if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
300 sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)
301 }
302 }
303
304 // narrow down the reassignment scope to only those consumers that are subject to reassignment
305 fixedAssignments := make(map[string][]topicPartitionAssignment)
306 for memberID := range consumer2AllPotentialPartitions {
307 if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {
308 fixedAssignments[memberID] = currentAssignment[memberID]
309 delete(currentAssignment, memberID)
310 sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
311 }
312 }
313
314 // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
315 preBalanceAssignment := deepCopyAssignment(currentAssignment)
316 preBalancePartitionConsumers := maps.Clone(currentPartitionConsumer)
317
318 reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
319
320 // if we are not preserving existing assignments and we have made changes to the current assignment
321 // make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
322 if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
323 currentAssignment = deepCopyAssignment(preBalanceAssignment)
324 clear(currentPartitionConsumer)
325 maps.Copy(currentPartitionConsumer, preBalancePartitionConsumers)
326 }
327
328 // add the fixed assignments (those that could not change) back
329 maps.Copy(currentAssignment, fixedAssignments)
330}
331
332// NewBalanceStrategyRoundRobin returns a round-robin balance strategy,
333// which assigns partitions to members in alternating order.
334// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
335// M0: [t0p0, t0p2, t1p1]
336// M1: [t0p1, t1p0, t1p2]
337func NewBalanceStrategyRoundRobin() BalanceStrategy {
338 return new(roundRobinBalancer)
339}
340
341// Deprecated: use NewBalanceStrategyRoundRobin to avoid data race issue
342var BalanceStrategyRoundRobin = NewBalanceStrategyRoundRobin()
343
344type roundRobinBalancer struct{}
345
346func (b *roundRobinBalancer) Name() string {
347 return RoundRobinBalanceStrategyName
348}
349
350func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
351 if len(memberAndMetadata) == 0 || len(topics) == 0 {
352 return nil, errors.New("members and topics are not provided")
353 }
354 // sort partitions
355 var topicPartitions []topicAndPartition
356 for topic, partitions := range topics {
357 for _, partition := range partitions {
358 topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition})
359 }
360 }
361 sort.SliceStable(topicPartitions, func(i, j int) bool {
362 pi := topicPartitions[i]
363 pj := topicPartitions[j]
364 return pi.comparedValue() < pj.comparedValue()
365 })
366
367 // sort members
368 var members []memberAndTopic
369 for memberID, meta := range memberAndMetadata {
370 m := memberAndTopic{
371 memberID: memberID,
372 topics: make(map[string]struct{}),
373 }
374 for _, t := range meta.Topics {
375 m.topics[t] = struct{}{}
376 }
377 members = append(members, m)
378 }
379 sort.SliceStable(members, func(i, j int) bool {
380 mi := members[i]
381 mj := members[j]
382 return mi.memberID < mj.memberID
383 })
384
385 // assign partitions
386 plan := make(BalanceStrategyPlan, len(members))
387 i := 0
388 n := len(members)
389 for _, tp := range topicPartitions {
390 m := members[i%n]
391 for !m.hasTopic(tp.topic) {
392 i++
393 m = members[i%n]
394 }
395 plan.Add(m.memberID, tp.topic, tp.partition)
396 i++
397 }
398 return plan, nil
399}
400
401func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
402 return nil, nil // do nothing for now
403}
404
405type topicAndPartition struct {
406 topic string
407 partition int32
408}
409
410func (tp *topicAndPartition) comparedValue() string {
411 return fmt.Sprintf("%s-%d", tp.topic, tp.partition)
412}
413
414type memberAndTopic struct {
415 topics map[string]struct{}
416 memberID string
417}
418
419func (m *memberAndTopic) hasTopic(topic string) bool {
420 _, isExist := m.topics[topic]
421 return isExist
422}
423
424// Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
425// A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
426// Lower balance score indicates a more balanced assignment.
427func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
428 consumer2AssignmentSize := make(map[string]int, len(assignment))
429 for memberID, partitions := range assignment {
430 consumer2AssignmentSize[memberID] = len(partitions)
431 }
432
433 var score float64
434 for memberID, consumerAssignmentSize := range consumer2AssignmentSize {
435 delete(consumer2AssignmentSize, memberID)
436 for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {
437 score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))
438 }
439 }
440 return int(score)
441}
442
443// Determine whether the current assignment plan is balanced.
444func isBalanced(currentAssignment map[string][]topicPartitionAssignment, allSubscriptions map[string][]topicPartitionAssignment) bool {
445 sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
446 min := len(currentAssignment[sortedCurrentSubscriptions[0]])
447 max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
448 if min >= max-1 {
449 // if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
450 return true
451 }
452
453 // create a mapping from partitions to the consumer assigned to them
454 allPartitions := make(map[topicPartitionAssignment]string)
455 for memberID, partitions := range currentAssignment {
456 for _, partition := range partitions {
457 if _, exists := allPartitions[partition]; exists {
458 Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)
459 }
460 allPartitions[partition] = memberID
461 }
462 }
463
464 // for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it
465 // could but did not get cannot be moved to it (because that would break the balance)
466 for _, memberID := range sortedCurrentSubscriptions {
467 consumerPartitions := currentAssignment[memberID]
468 consumerPartitionCount := len(consumerPartitions)
469
470 // skip if this consumer already has all the topic partitions it can get
471 if consumerPartitionCount == len(allSubscriptions[memberID]) {
472 continue
473 }
474
475 // otherwise make sure it cannot get any more
476 potentialTopicPartitions := allSubscriptions[memberID]
477 for _, partition := range potentialTopicPartitions {
478 if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {
479 otherConsumer := allPartitions[partition]
480 otherConsumerPartitionCount := len(currentAssignment[otherConsumer])
481 if consumerPartitionCount < otherConsumerPartitionCount {
482 return false
483 }
484 }
485 }
486 }
487 return true
488}
489
490// Reassign all topic partitions that need reassignment until balanced.
491func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {
492 reassignmentPerformed := false
493 modified := false
494
495 // repeat reassignment until no partition can be moved to improve the balance
496 for {
497 modified = false
498 // reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
499 // until the full list is processed or a balance is achieved
500 for _, partition := range reassignablePartitions {
501 if isBalanced(currentAssignment, consumer2AllPotentialPartitions) {
502 break
503 }
504
505 // the partition must have at least two consumers
506 if len(partition2AllPotentialConsumers[partition]) <= 1 {
507 Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)
508 }
509
510 // the partition must have a consumer
511 consumer := currentPartitionConsumer[partition]
512 if consumer == "" {
513 Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)
514 }
515
516 if _, exists := prevAssignment[partition]; exists {
517 if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {
518 sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)
519 reassignmentPerformed = true
520 modified = true
521 continue
522 }
523 }
524
525 // check if a better-suited consumer exists for the partition; if so, reassign it
526 for _, otherConsumer := range partition2AllPotentialConsumers[partition] {
527 if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {
528 sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)
529 reassignmentPerformed = true
530 modified = true
531 break
532 }
533 }
534 }
535 if !modified {
536 return reassignmentPerformed
537 }
538 }
539}
540
541// Identify a new consumer for a topic partition and reassign it.
542func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {
543 for _, anotherConsumer := range sortedCurrentSubscriptions {
544 if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {
545 return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)
546 }
547 }
548 return sortedCurrentSubscriptions
549}
550
551// Reassign a specific partition to a new consumer
552func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {
553 consumer := currentPartitionConsumer[partition]
554 // find the correct partition movement considering the stickiness requirement
555 partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)
556 return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)
557}
558
559// Track the movement of a topic partition after assignment
560func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
561 oldConsumer := currentPartitionConsumer[partition]
562 s.movements.movePartition(partition, oldConsumer, newConsumer)
563
564 currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)
565 currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)
566 currentPartitionConsumer[partition] = newConsumer
567 return sortMemberIDsByPartitionAssignments(currentAssignment)
568}
569
570// Determine whether a specific consumer should be considered for topic partition assignment.
571func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
572 currentPartitions := currentAssignment[memberID]
573 currentAssignmentSize := len(currentPartitions)
574 maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])
575 if currentAssignmentSize > maxAssignmentSize {
576 Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)
577 }
578 if currentAssignmentSize < maxAssignmentSize {
579 // if a consumer is not assigned all its potential partitions it is subject to reassignment
580 return true
581 }
582 for _, partition := range currentPartitions {
583 if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
584 return true
585 }
586 }
587 return false
588}
589
590// Only consider reassigning those topic partitions that have two or more potential consumers.
591func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
592 return len(partition2AllPotentialConsumers[partition]) >= 2
593}
594
595// The assignment should improve the overall balance of the partition assignments to consumers.
596func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
597 for _, memberID := range sortedCurrentSubscriptions {
598 if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
599 currentAssignment[memberID] = append(currentAssignment[memberID], partition)
600 currentPartitionConsumer[partition] = memberID
601 break
602 }
603 }
604 return sortMemberIDsByPartitionAssignments(currentAssignment)
605}
606
607// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
608func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
609 userDataV1 := &StickyAssignorUserDataV1{}
610 if err := decode(userDataBytes, userDataV1, nil); err != nil {
611 userDataV0 := &StickyAssignorUserDataV0{}
612 if err := decode(userDataBytes, userDataV0, nil); err != nil {
613 return nil, err
614 }
615 return userDataV0, nil
616 }
617 return userDataV1, nil
618}
619
620// filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
621// to those topic partitions currently reported by the Kafka cluster.
622func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
623 assignments := deepCopyAssignment(currentAssignment)
624 for memberID, partitions := range assignments {
625 // perform in-place filtering
626 i := 0
627 for _, partition := range partitions {
628 if _, exists := partition2AllPotentialConsumers[partition]; exists {
629 partitions[i] = partition
630 i++
631 }
632 }
633 assignments[memberID] = partitions[:i]
634 }
635 return assignments
636}
637
638func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
639 for i, assignment := range assignments {
640 if assignment == topic {
641 return append(assignments[:i], assignments[i+1:]...)
642 }
643 }
644 return assignments
645}
646
647func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
648 return slices.Contains(assignments, topic)
649}
650
651func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
652 unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
653 for partition := range partition2AllPotentialConsumers {
654 unassignedPartitions[partition] = true
655 }
656
657 sortedPartitions := make([]topicPartitionAssignment, 0)
658 if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
659 // if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
660 // then we just need to simply list partitions in a round robin fashion (from consumers with
661 // most assigned partitions to those with least)
662 assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
663
664 // use priority-queue to evaluate consumer group members in descending-order based on
665 // the number of topic partition assignments (i.e. consumers with most assignments first)
666 pq := make(assignmentPriorityQueue, len(assignments))
667 i := 0
668 for consumerID, consumerAssignments := range assignments {
669 pq[i] = &consumerGroupMember{
670 id: consumerID,
671 assignments: consumerAssignments,
672 }
673 i++
674 }
675 heap.Init(&pq)
676
677 // loop until no consumer-group members remain
678 for pq.Len() != 0 {
679 member := pq[0]
680
681 // partitions that were assigned to a different consumer last time
682 var prevPartitionIndex int
683 for i, partition := range member.assignments {
684 if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
685 prevPartitionIndex = i
686 break
687 }
688 }
689
690 if len(member.assignments) > 0 {
691 partition := member.assignments[prevPartitionIndex]
692 sortedPartitions = append(sortedPartitions, partition)
693 delete(unassignedPartitions, partition)
694 if prevPartitionIndex == 0 {
695 member.assignments = member.assignments[1:]
696 } else {
697 member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
698 }
699 heap.Fix(&pq, 0)
700 } else {
701 heap.Pop(&pq)
702 }
703 }
704
705 for partition := range unassignedPartitions {
706 sortedPartitions = append(sortedPartitions, partition)
707 }
708 } else {
709 // an ascending sorted set of topic partitions based on how many consumers can potentially use them
710 sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)
711 }
712 return sortedPartitions
713}
714
715func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {
716 // sort the members by the number of partition assignments in ascending order
717 sortedMemberIDs := make([]string, 0, len(assignments))
718 for memberID := range assignments {
719 sortedMemberIDs = append(sortedMemberIDs, memberID)
720 }
721 sort.SliceStable(sortedMemberIDs, func(i, j int) bool {
722 ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])
723 if ret == 0 {
724 return sortedMemberIDs[i] < sortedMemberIDs[j]
725 }
726 return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])
727 })
728 return sortedMemberIDs
729}
730
731func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
732 // sort the members by the number of partition assignments in descending order
733 sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))
734 i := 0
735 for partition := range partition2AllPotentialConsumers {
736 sortedPartionIDs[i] = partition
737 i++
738 }
739 sort.Slice(sortedPartionIDs, func(i, j int) bool {
740 if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {
741 ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)
742 if ret == 0 {
743 return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition
744 }
745 return ret < 0
746 }
747 return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
748 })
749 return sortedPartionIDs
750}
751
752func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
753 m := make(map[string][]topicPartitionAssignment, len(assignment))
754 for memberID, subscriptions := range assignment {
755 m[memberID] = append(subscriptions[:0:0], subscriptions...)
756 }
757 return m
758}
759
760func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
761 curMembers := make(map[string]int)
762 for _, cur := range partition2AllPotentialConsumers {
763 if len(curMembers) == 0 {
764 for _, curMembersElem := range cur {
765 curMembers[curMembersElem]++
766 }
767 continue
768 }
769
770 if len(curMembers) != len(cur) {
771 return false
772 }
773
774 yMap := make(map[string]int)
775 for _, yElem := range cur {
776 yMap[yElem]++
777 }
778
779 for curMembersMapKey, curMembersMapVal := range curMembers {
780 if yMap[curMembersMapKey] != curMembersMapVal {
781 return false
782 }
783 }
784 }
785
786 curPartitions := make(map[topicPartitionAssignment]int)
787 for _, cur := range consumer2AllPotentialPartitions {
788 if len(curPartitions) == 0 {
789 for _, curPartitionElem := range cur {
790 curPartitions[curPartitionElem]++
791 }
792 continue
793 }
794
795 if len(curPartitions) != len(cur) {
796 return false
797 }
798
799 yMap := make(map[topicPartitionAssignment]int)
800 for _, yElem := range cur {
801 yMap[yElem]++
802 }
803
804 for curMembersMapKey, curMembersMapVal := range curPartitions {
805 if yMap[curMembersMapKey] != curMembersMapVal {
806 return false
807 }
808 }
809 }
810 return true
811}
812
813// We need to process subscriptions' user data with each consumer's reported generation in mind
814// higher generations overwrite lower generations in case of a conflict
815// note that a conflict could exist only if user data is for different generations
816func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
817 currentAssignment := make(map[string][]topicPartitionAssignment)
818 prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)
819
820 // for each partition we create a sorted map of its consumers by generation
821 sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
822 for memberID, meta := range members {
823 consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
824 if err != nil {
825 return nil, nil, err
826 }
827 for _, partition := range consumerUserData.partitions() {
828 if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
829 if consumerUserData.hasGeneration() {
830 if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
831 // same partition is assigned to two consumers during the same rebalance.
832 // log a warning and skip this record
833 Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
834 continue
835 } else {
836 consumers[consumerUserData.generation()] = memberID
837 }
838 } else {
839 consumers[defaultGeneration] = memberID
840 }
841 } else {
842 generation := defaultGeneration
843 if consumerUserData.hasGeneration() {
844 generation = consumerUserData.generation()
845 }
846 sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
847 }
848 }
849 }
850
851 // prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition
852 // current and previous consumers are the last two consumers of each partition in the above sorted map
853 for partition, consumers := range sortedPartitionConsumersByGeneration {
854 // sort consumers by generation in decreasing order
855 var generations []int
856 for generation := range consumers {
857 generations = append(generations, generation)
858 }
859 sort.Sort(sort.Reverse(sort.IntSlice(generations)))
860
861 consumer := consumers[generations[0]]
862 if _, exists := currentAssignment[consumer]; !exists {
863 currentAssignment[consumer] = []topicPartitionAssignment{partition}
864 } else {
865 currentAssignment[consumer] = append(currentAssignment[consumer], partition)
866 }
867
868 // check for previous assignment, if any
869 if len(generations) > 1 {
870 prevAssignment[partition] = consumerGenerationPair{
871 MemberID: consumers[generations[1]],
872 Generation: generations[1],
873 }
874 }
875 }
876 return currentAssignment, prevAssignment, nil
877}
878
879type consumerGenerationPair struct {
880 MemberID string
881 Generation int
882}
883
884// consumerPair represents a pair of Kafka consumer ids involved in a partition reassignment.
885type consumerPair struct {
886 SrcMemberID string
887 DstMemberID string
888}
889
890// partitionMovements maintains some data structures to simplify lookup of partition movements among consumers.
891type partitionMovements struct {
892 PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool
893 Movements map[topicPartitionAssignment]consumerPair
894}
895
896func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {
897 pair := p.Movements[partition]
898 delete(p.Movements, partition)
899
900 partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
901 delete(partitionMovementsForThisTopic[pair], partition)
902 if len(partitionMovementsForThisTopic[pair]) == 0 {
903 delete(partitionMovementsForThisTopic, pair)
904 }
905 if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {
906 delete(p.PartitionMovementsByTopic, partition.Topic)
907 }
908 return pair
909}
910
911func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {
912 p.Movements[partition] = pair
913 if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
914 p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)
915 }
916 partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
917 if _, exists := partitionMovementsForThisTopic[pair]; !exists {
918 partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)
919 }
920 partitionMovementsForThisTopic[pair][partition] = true
921}
922
923func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {
924 pair := consumerPair{
925 SrcMemberID: oldConsumer,
926 DstMemberID: newConsumer,
927 }
928 if _, exists := p.Movements[partition]; exists {
929 // this partition has previously moved
930 existingPair := p.removeMovementRecordOfPartition(partition)
931 if existingPair.DstMemberID != oldConsumer {
932 Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)
933 }
934 if existingPair.SrcMemberID != newConsumer {
935 // the partition is not moving back to its previous consumer
936 p.addPartitionMovementRecord(partition, consumerPair{
937 SrcMemberID: existingPair.SrcMemberID,
938 DstMemberID: newConsumer,
939 })
940 }
941 } else {
942 p.addPartitionMovementRecord(partition, pair)
943 }
944}
945
946func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {
947 if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
948 return partition
949 }
950 if _, exists := p.Movements[partition]; exists {
951 // this partition has previously moved
952 if oldConsumer != p.Movements[partition].DstMemberID {
953 Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)
954 }
955 oldConsumer = p.Movements[partition].SrcMemberID
956 }
957
958 partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
959 reversePair := consumerPair{
960 SrcMemberID: newConsumer,
961 DstMemberID: oldConsumer,
962 }
963 if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {
964 return partition
965 }
966 var reversePairPartition topicPartitionAssignment
967 for otherPartition := range partitionMovementsForThisTopic[reversePair] {
968 reversePairPartition = otherPartition
969 }
970 return reversePairPartition
971}
972
973//lint:ignore U1000 // this is used but only in unittests as a helper (which are excluded by the integration build tag)
974func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
975 if src == dst {
976 return currentPath, false
977 }
978 if len(pairs) == 0 {
979 return currentPath, false
980 }
981 for _, pair := range pairs {
982 if src == pair.SrcMemberID && dst == pair.DstMemberID {
983 currentPath = append(currentPath, src, dst)
984 return currentPath, true
985 }
986 }
987
988 for _, pair := range pairs {
989 if pair.SrcMemberID != src {
990 continue
991 }
992 // create a deep copy of the pairs, excluding the current pair
993 reducedSet := make([]consumerPair, len(pairs)-1)
994 i := 0
995 for _, p := range pairs {
996 if p != pair {
997 reducedSet[i] = pair
998 i++
999 }
1000 }
1001
1002 currentPath = append(currentPath, pair.SrcMemberID)
1003 return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
1004 }
1005 return currentPath, false
1006}
1007
1008//lint:ignore U1000 // this is used but only in unittests as a helper (which are excluded by the integration build tag)
1009func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
1010 superCycle := make([]string, len(cycle)-1)
1011 for i := 0; i < len(cycle)-1; i++ {
1012 superCycle[i] = cycle[i]
1013 }
1014 superCycle = append(superCycle, cycle...)
1015 for _, foundCycle := range cycles {
1016 if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
1017 return true
1018 }
1019 }
1020 return false
1021}
1022
1023//lint:ignore U1000 // this is used but only in unittests as a helper (which are excluded by the integration build tag)
1024func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
1025 cycles := make([][]string, 0)
1026 for _, pair := range pairs {
1027 // create a deep copy of the pairs, excluding the current pair
1028 reducedPairs := make([]consumerPair, len(pairs)-1)
1029 i := 0
1030 for _, p := range pairs {
1031 if p != pair {
1032 reducedPairs[i] = pair
1033 i++
1034 }
1035 }
1036 if path, linked := p.isLinked(pair.DstMemberID, pair.SrcMemberID, reducedPairs, []string{pair.SrcMemberID}); linked {
1037 if !p.in(path, cycles) {
1038 cycles = append(cycles, path)
1039 Logger.Printf("A cycle of length %d was found: %v", len(path)-1, path)
1040 }
1041 }
1042 }
1043
1044 // for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
1045 // the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
1046 // tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
1047 for _, cycle := range cycles {
1048 if len(cycle) == 3 {
1049 return true
1050 }
1051 }
1052 return false
1053}
1054
1055//lint:ignore U1000 // this is used but only in unittests as a helper (which are excluded by the integration build tag)
1056func (p *partitionMovements) isSticky() bool {
1057 for topic, movements := range p.PartitionMovementsByTopic {
1058 movementPairs := make([]consumerPair, len(movements))
1059 i := 0
1060 for pair := range movements {
1061 movementPairs[i] = pair
1062 i++
1063 }
1064 if p.hasCycles(movementPairs) {
1065 Logger.Printf("Stickiness is violated for topic %s", topic)
1066 Logger.Printf("Partition movements for this topic occurred among the following consumer pairs: %v", movements)
1067 return false
1068 }
1069 }
1070 return true
1071}
1072
1073//lint:ignore U1000 // this is used but only in unittests as a helper (which are excluded by the integration build tag)
1074func indexOfSubList(source []string, target []string) int {
1075 targetSize := len(target)
1076 maxCandidate := len(source) - targetSize
1077nextCand:
1078 for candidate := 0; candidate <= maxCandidate; candidate++ {
1079 j := candidate
1080 for i := 0; i < targetSize; i++ {
1081 if target[i] != source[j] {
1082 // Element mismatch, try next cand
1083 continue nextCand
1084 }
1085 j++
1086 }
1087 // All elements of candidate matched target
1088 return candidate
1089 }
1090 return -1
1091}
1092
1093type consumerGroupMember struct {
1094 id string
1095 assignments []topicPartitionAssignment
1096}
1097
1098// assignmentPriorityQueue is a priority-queue of consumer group members that is sorted
1099// in descending order (most assignments to least assignments).
1100type assignmentPriorityQueue []*consumerGroupMember
1101
1102func (pq assignmentPriorityQueue) Len() int { return len(pq) }
1103
1104func (pq assignmentPriorityQueue) Less(i, j int) bool {
1105 // order assignment priority queue in descending order using assignment-count/member-id
1106 if len(pq[i].assignments) == len(pq[j].assignments) {
1107 return pq[i].id > pq[j].id
1108 }
1109 return len(pq[i].assignments) > len(pq[j].assignments)
1110}
1111
1112func (pq assignmentPriorityQueue) Swap(i, j int) {
1113 pq[i], pq[j] = pq[j], pq[i]
1114}
1115
1116func (pq *assignmentPriorityQueue) Push(x interface{}) {
1117 member := x.(*consumerGroupMember)
1118 *pq = append(*pq, member)
1119}
1120
1121func (pq *assignmentPriorityQueue) Pop() interface{} {
1122 old := *pq
1123 n := len(old)
1124 member := old[n-1]
1125 *pq = old[0 : n-1]
1126 return member
1127}