blob: 50a345a3eb29e966c7801662082bcec82fc1457e [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "hash"
5 "hash/crc32"
6 "hash/fnv"
7 "math/rand"
8 "time"
9)
10
11// Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1],
12// decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
13// as simple default implementations.
14type Partitioner interface {
15 // Partition takes a message and partition count and chooses a partition
16 Partition(message *ProducerMessage, numPartitions int32) (int32, error)
17
18 // RequiresConsistency indicates to the user of the partitioner whether the
19 // mapping of key->partition is consistent or not. Specifically, if a
20 // partitioner requires consistency then it must be allowed to choose from all
21 // partitions (even ones known to be unavailable), and its choice must be
22 // respected by the caller. The obvious example is the HashPartitioner.
23 RequiresConsistency() bool
24}
25
26// DynamicConsistencyPartitioner can optionally be implemented by Partitioners
27// in order to allow more flexibility than is originally allowed by the
28// RequiresConsistency method in the Partitioner interface. This allows
29// partitioners to require consistency sometimes, but not all times. It's useful
30// for, e.g., the HashPartitioner, which does not require consistency if the
31// message key is nil.
32type DynamicConsistencyPartitioner interface {
33 Partitioner
34
35 // MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
36 // but takes in the message being partitioned so that the partitioner can
37 // make a per-message determination.
38 MessageRequiresConsistency(message *ProducerMessage) bool
39}
40
41// PartitionerConstructor is the type for a function capable of constructing new Partitioners.
42type PartitionerConstructor func(topic string) Partitioner
43
44type manualPartitioner struct{}
45
46// HashPartitionerOption lets you modify default values of the partitioner
47type HashPartitionerOption func(*hashPartitioner)
48
49// WithAbsFirst means that the partitioner handles absolute values
50// in the same way as the reference Java implementation
51func WithAbsFirst() HashPartitionerOption {
52 return func(hp *hashPartitioner) {
53 hp.referenceAbs = true
54 }
55}
56
57// WithHashUnsigned means the partitioner treats the hashed value as unsigned when
58// partitioning. This is intended to be combined with the crc32 hash algorithm to
59// be compatible with librdkafka's implementation
60func WithHashUnsigned() HashPartitionerOption {
61 return func(hp *hashPartitioner) {
62 hp.hashUnsigned = true
63 }
64}
65
66// WithCustomHashFunction lets you specify what hash function to use for the partitioning
67func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
68 return func(hp *hashPartitioner) {
69 hp.hasher = hasher()
70 }
71}
72
73// WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
74func WithCustomFallbackPartitioner(randomHP Partitioner) HashPartitionerOption {
75 return func(hp *hashPartitioner) {
76 hp.random = randomHP
77 }
78}
79
80// NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
81// ProducerMessage's Partition field as the partition to produce to.
82func NewManualPartitioner(topic string) Partitioner {
83 return new(manualPartitioner)
84}
85
86func (p *manualPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
87 return message.Partition, nil
88}
89
90func (p *manualPartitioner) RequiresConsistency() bool {
91 return true
92}
93
94type randomPartitioner struct {
95 generator *rand.Rand
96}
97
98// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
99func NewRandomPartitioner(topic string) Partitioner {
100 p := new(randomPartitioner)
101 p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
102 return p
103}
104
105func (p *randomPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
106 return int32(p.generator.Intn(int(numPartitions))), nil
107}
108
109func (p *randomPartitioner) RequiresConsistency() bool {
110 return false
111}
112
113type roundRobinPartitioner struct {
114 partition int32
115}
116
117// NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
118func NewRoundRobinPartitioner(topic string) Partitioner {
119 return &roundRobinPartitioner{}
120}
121
122func (p *roundRobinPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
123 if p.partition >= numPartitions {
124 p.partition = 0
125 }
126 ret := p.partition
127 p.partition++
128 return ret, nil
129}
130
131func (p *roundRobinPartitioner) RequiresConsistency() bool {
132 return false
133}
134
135type hashPartitioner struct {
136 random Partitioner
137 hasher hash.Hash32
138 referenceAbs bool
139 hashUnsigned bool
140}
141
142// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
143// The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that
144// each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
145func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor {
146 return func(topic string) Partitioner {
147 p := new(hashPartitioner)
148 p.random = NewRandomPartitioner(topic)
149 p.hasher = hasher()
150 p.referenceAbs = false
151 p.hashUnsigned = false
152 return p
153 }
154}
155
156// NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
157func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor {
158 return func(topic string) Partitioner {
159 p := new(hashPartitioner)
160 p.random = NewRandomPartitioner(topic)
161 p.hasher = fnv.New32a()
162 p.referenceAbs = false
163 p.hashUnsigned = false
164 for _, option := range options {
165 option(p)
166 }
167 return p
168 }
169}
170
171// NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
172// random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
173// modulus the number of partitions. This ensures that messages with the same key always end up on the
174// same partition.
175func NewHashPartitioner(topic string) Partitioner {
176 p := new(hashPartitioner)
177 p.random = NewRandomPartitioner(topic)
178 p.hasher = fnv.New32a()
179 p.referenceAbs = false
180 p.hashUnsigned = false
181 return p
182}
183
184// NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values
185// in the same way as the reference Java implementation. NewHashPartitioner was supposed to do
186// that but it had a mistake and now there are people depending on both behaviors. This will
187// all go away on the next major version bump.
188func NewReferenceHashPartitioner(topic string) Partitioner {
189 p := new(hashPartitioner)
190 p.random = NewRandomPartitioner(topic)
191 p.hasher = fnv.New32a()
192 p.referenceAbs = true
193 p.hashUnsigned = false
194 return p
195}
196
197// NewConsistentCRCHashPartitioner is like NewHashPartitioner execpt that it uses the *unsigned* crc32 hash
198// of the encoded bytes of the message key modulus the number of partitions. This is compatible with
199// librdkafka's `consistent_random` partitioner
200func NewConsistentCRCHashPartitioner(topic string) Partitioner {
201 p := new(hashPartitioner)
202 p.random = NewRandomPartitioner(topic)
203 p.hasher = crc32.NewIEEE()
204 p.referenceAbs = false
205 p.hashUnsigned = true
206 return p
207}
208
209func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
210 if message.Key == nil {
211 return p.random.Partition(message, numPartitions)
212 }
213 bytes, err := message.Key.Encode()
214 if err != nil {
215 return -1, err
216 }
217 p.hasher.Reset()
218 _, err = p.hasher.Write(bytes)
219 if err != nil {
220 return -1, err
221 }
222 var partition int32
223 // Turns out we were doing our absolute value in a subtly different way from the upstream
224 // implementation, but now we need to maintain backwards compat for people who started using
225 // the old version; if referenceAbs is set we are compatible with the reference java client
226 // but not past Sarama versions
227 if p.referenceAbs {
228 partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
229 } else if p.hashUnsigned {
230 // librdkafka treats the hashed value as unsigned. If `hashUnsigned` is set we are compatible
231 // with librdkafka's `consistent` partitioning but not past Sarama versions
232 partition = int32(p.hasher.Sum32() % uint32(numPartitions))
233 } else {
234 partition = int32(p.hasher.Sum32()) % numPartitions
235 if partition < 0 {
236 partition = -partition
237 }
238 }
239 return partition, nil
240}
241
242func (p *hashPartitioner) RequiresConsistency() bool {
243 return true
244}
245
246func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool {
247 return message.Key != nil
248}