blob: 071dc8a677fedc11e8312686e096e75af5eee485 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001// Copyright (c) 2018-2022 Burak Sezer
Matteo Scandolod525ae32020-04-02 17:27:29 -07002// All rights reserved.
3//
4// This code is licensed under the MIT License.
5//
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files(the "Software"), to deal
8// in the Software without restriction, including without limitation the rights
9// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
10// copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions :
12//
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15//
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22// THE SOFTWARE.
23
Abhay Kumara2ae5992025-11-10 14:02:24 +000024// Package consistent provides a consistent hashing function with bounded loads. This implementation also adds
25// partitioning logic on top of the original algorithm. For more information about the underlying algorithm,
26// please take a look at https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
Matteo Scandolod525ae32020-04-02 17:27:29 -070027//
28// Example Use:
Abhay Kumara2ae5992025-11-10 14:02:24 +000029//
30// cfg := consistent.Config{
31// PartitionCount: 71,
32// ReplicationFactor: 20,
33// Load: 1.25,
34// Hasher: hasher{},
Matteo Scandolod525ae32020-04-02 17:27:29 -070035// }
36//
Abhay Kumara2ae5992025-11-10 14:02:24 +000037// Now you can create a new Consistent instance. This function can take a list of the members.
38//
Matteo Scandolod525ae32020-04-02 17:27:29 -070039// c := consistent.New(members, cfg)
40//
Abhay Kumara2ae5992025-11-10 14:02:24 +000041// In the following sample, you add a new Member to the consistent hash ring. myMember is just a Go struct that
42// implements the Member interface. You should know that modifying the consistent hash ring distributes partitions among
43// members using the algorithm defined on Google Research Blog.
44//
Matteo Scandolod525ae32020-04-02 17:27:29 -070045// c.Add(myMember)
46//
Abhay Kumara2ae5992025-11-10 14:02:24 +000047// Remove a member from the consistent hash ring:
Matteo Scandolod525ae32020-04-02 17:27:29 -070048//
Abhay Kumara2ae5992025-11-10 14:02:24 +000049// c.Remove(member-name)
50//
51// LocateKey hashes the key and calculates partition ID with this modulo operation: MOD(hash result, partition count)
52// The owner of the partition is already calculated by New/Add/Remove. LocateKey just returns the member that is responsible
53// for the key.
54//
55// key := []byte("my-key")
56// member := c.LocateKey(key)
Matteo Scandolod525ae32020-04-02 17:27:29 -070057package consistent
58
59import (
60 "encoding/binary"
61 "errors"
62 "fmt"
63 "math"
64 "sort"
65 "sync"
66)
67
Abhay Kumara2ae5992025-11-10 14:02:24 +000068const (
69 DefaultPartitionCount int = 271
70 DefaultReplicationFactor int = 20
71 DefaultLoad float64 = 1.25
Matteo Scandolod525ae32020-04-02 17:27:29 -070072)
73
Abhay Kumara2ae5992025-11-10 14:02:24 +000074// ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task.
75var ErrInsufficientMemberCount = errors.New("insufficient member count")
76
77// Hasher is responsible for generating unsigned, 64-bit hash of provided byte slice.
Matteo Scandolod525ae32020-04-02 17:27:29 -070078// Hasher should minimize collisions (generating same hash for different byte slice)
79// and while performance is also important fast functions are preferable (i.e.
80// you can use FarmHash family).
81type Hasher interface {
82 Sum64([]byte) uint64
83}
84
85// Member interface represents a member in consistent hash ring.
86type Member interface {
87 String() string
88}
89
90// Config represents a structure to control consistent package.
91type Config struct {
Abhay Kumara2ae5992025-11-10 14:02:24 +000092 // Hasher is responsible for generating unsigned, 64-bit hash of provided byte slice.
Matteo Scandolod525ae32020-04-02 17:27:29 -070093 Hasher Hasher
94
95 // Keys are distributed among partitions. Prime numbers are good to
96 // distribute keys uniformly. Select a big PartitionCount if you have
97 // too many keys.
98 PartitionCount int
99
100 // Members are replicated on consistent hash ring. This number means that a member
101 // how many times replicated on the ring.
102 ReplicationFactor int
103
104 // Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it.
105 Load float64
106}
107
108// Consistent holds the information about the members of the consistent hash circle.
109type Consistent struct {
110 mu sync.RWMutex
111
112 config Config
113 hasher Hasher
114 sortedSet []uint64
115 partitionCount uint64
116 loads map[string]float64
117 members map[string]*Member
118 partitions map[int]*Member
119 ring map[uint64]*Member
120}
121
122// New creates and returns a new Consistent object.
123func New(members []Member, config Config) *Consistent {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000124 if config.Hasher == nil {
125 panic("Hasher cannot be nil")
126 }
127 if config.PartitionCount == 0 {
128 config.PartitionCount = DefaultPartitionCount
129 }
130 if config.ReplicationFactor == 0 {
131 config.ReplicationFactor = DefaultReplicationFactor
132 }
133 if config.Load == 0 {
134 config.Load = DefaultLoad
135 }
136
Matteo Scandolod525ae32020-04-02 17:27:29 -0700137 c := &Consistent{
138 config: config,
139 members: make(map[string]*Member),
140 partitionCount: uint64(config.PartitionCount),
141 ring: make(map[uint64]*Member),
142 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000143
Matteo Scandolod525ae32020-04-02 17:27:29 -0700144 c.hasher = config.Hasher
145 for _, member := range members {
146 c.add(member)
147 }
148 if members != nil {
149 c.distributePartitions()
150 }
151 return c
152}
153
Abhay Kumara2ae5992025-11-10 14:02:24 +0000154// GetMembers returns a thread-safe copy of members. If there are no members, it returns an empty slice of Member.
Matteo Scandolod525ae32020-04-02 17:27:29 -0700155func (c *Consistent) GetMembers() []Member {
156 c.mu.RLock()
157 defer c.mu.RUnlock()
158
159 // Create a thread-safe copy of member list.
160 members := make([]Member, 0, len(c.members))
161 for _, member := range c.members {
162 members = append(members, *member)
163 }
164 return members
165}
166
167// AverageLoad exposes the current average load.
168func (c *Consistent) AverageLoad() float64 {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000169 c.mu.RLock()
170 defer c.mu.RUnlock()
171
172 return c.averageLoad()
173}
174
175func (c *Consistent) averageLoad() float64 {
176 if len(c.members) == 0 {
177 return 0
178 }
179
Matteo Scandolod525ae32020-04-02 17:27:29 -0700180 avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load
181 return math.Ceil(avgLoad)
182}
183
184func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000185 avgLoad := c.averageLoad()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700186 var count int
187 for {
188 count++
189 if count >= len(c.sortedSet) {
190 // User needs to decrease partition count, increase member count or increase load factor.
191 panic("not enough room to distribute partitions")
192 }
193 i := c.sortedSet[idx]
194 member := *c.ring[i]
195 load := loads[member.String()]
196 if load+1 <= avgLoad {
197 partitions[partID] = &member
198 loads[member.String()]++
199 return
200 }
201 idx++
202 if idx >= len(c.sortedSet) {
203 idx = 0
204 }
205 }
206}
207
208func (c *Consistent) distributePartitions() {
209 loads := make(map[string]float64)
210 partitions := make(map[int]*Member)
211
212 bs := make([]byte, 8)
213 for partID := uint64(0); partID < c.partitionCount; partID++ {
214 binary.LittleEndian.PutUint64(bs, partID)
215 key := c.hasher.Sum64(bs)
216 idx := sort.Search(len(c.sortedSet), func(i int) bool {
217 return c.sortedSet[i] >= key
218 })
219 if idx >= len(c.sortedSet) {
220 idx = 0
221 }
222 c.distributeWithLoad(int(partID), idx, partitions, loads)
223 }
224 c.partitions = partitions
225 c.loads = loads
226}
227
228func (c *Consistent) add(member Member) {
229 for i := 0; i < c.config.ReplicationFactor; i++ {
230 key := []byte(fmt.Sprintf("%s%d", member.String(), i))
231 h := c.hasher.Sum64(key)
232 c.ring[h] = &member
233 c.sortedSet = append(c.sortedSet, h)
234 }
235 // sort hashes ascendingly
236 sort.Slice(c.sortedSet, func(i int, j int) bool {
237 return c.sortedSet[i] < c.sortedSet[j]
238 })
239 // Storing member at this map is useful to find backup members of a partition.
240 c.members[member.String()] = &member
241}
242
243// Add adds a new member to the consistent hash circle.
244func (c *Consistent) Add(member Member) {
245 c.mu.Lock()
246 defer c.mu.Unlock()
247
248 if _, ok := c.members[member.String()]; ok {
249 // We already have this member. Quit immediately.
250 return
251 }
252 c.add(member)
253 c.distributePartitions()
254}
255
256func (c *Consistent) delSlice(val uint64) {
257 for i := 0; i < len(c.sortedSet); i++ {
258 if c.sortedSet[i] == val {
259 c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...)
260 break
261 }
262 }
263}
264
265// Remove removes a member from the consistent hash circle.
266func (c *Consistent) Remove(name string) {
267 c.mu.Lock()
268 defer c.mu.Unlock()
269
270 if _, ok := c.members[name]; !ok {
271 // There is no member with that name. Quit immediately.
272 return
273 }
274
275 for i := 0; i < c.config.ReplicationFactor; i++ {
276 key := []byte(fmt.Sprintf("%s%d", name, i))
277 h := c.hasher.Sum64(key)
278 delete(c.ring, h)
279 c.delSlice(h)
280 }
281 delete(c.members, name)
282 if len(c.members) == 0 {
283 // consistent hash ring is empty now. Reset the partition table.
284 c.partitions = make(map[int]*Member)
285 return
286 }
287 c.distributePartitions()
288}
289
290// LoadDistribution exposes load distribution of members.
291func (c *Consistent) LoadDistribution() map[string]float64 {
292 c.mu.RLock()
293 defer c.mu.RUnlock()
294
295 // Create a thread-safe copy
296 res := make(map[string]float64)
297 for member, load := range c.loads {
298 res[member] = load
299 }
300 return res
301}
302
303// FindPartitionID returns partition id for given key.
304func (c *Consistent) FindPartitionID(key []byte) int {
305 hkey := c.hasher.Sum64(key)
306 return int(hkey % c.partitionCount)
307}
308
309// GetPartitionOwner returns the owner of the given partition.
310func (c *Consistent) GetPartitionOwner(partID int) Member {
311 c.mu.RLock()
312 defer c.mu.RUnlock()
313
Abhay Kumara2ae5992025-11-10 14:02:24 +0000314 return c.getPartitionOwner(partID)
315}
316
317// getPartitionOwner returns the owner of the given partition. It's not thread-safe.
318func (c *Consistent) getPartitionOwner(partID int) Member {
Matteo Scandolod525ae32020-04-02 17:27:29 -0700319 member, ok := c.partitions[partID]
320 if !ok {
321 return nil
322 }
323 // Create a thread-safe copy of member and return it.
324 return *member
325}
326
327// LocateKey finds a home for given key
328func (c *Consistent) LocateKey(key []byte) Member {
329 partID := c.FindPartitionID(key)
330 return c.GetPartitionOwner(partID)
331}
332
333func (c *Consistent) getClosestN(partID, count int) ([]Member, error) {
334 c.mu.RLock()
335 defer c.mu.RUnlock()
336
Abhay Kumara2ae5992025-11-10 14:02:24 +0000337 var res []Member
Matteo Scandolod525ae32020-04-02 17:27:29 -0700338 if count > len(c.members) {
339 return res, ErrInsufficientMemberCount
340 }
341
342 var ownerKey uint64
Abhay Kumara2ae5992025-11-10 14:02:24 +0000343 owner := c.getPartitionOwner(partID)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700344 // Hash and sort all the names.
Abhay Kumara2ae5992025-11-10 14:02:24 +0000345 var keys []uint64
Matteo Scandolod525ae32020-04-02 17:27:29 -0700346 kmems := make(map[uint64]*Member)
347 for name, member := range c.members {
348 key := c.hasher.Sum64([]byte(name))
349 if name == owner.String() {
350 ownerKey = key
351 }
352 keys = append(keys, key)
353 kmems[key] = member
354 }
355 sort.Slice(keys, func(i, j int) bool {
356 return keys[i] < keys[j]
357 })
358
359 // Find the key owner
360 idx := 0
361 for idx < len(keys) {
362 if keys[idx] == ownerKey {
363 key := keys[idx]
364 res = append(res, *kmems[key])
365 break
366 }
367 idx++
368 }
369
370 // Find the closest(replica owners) members.
371 for len(res) < count {
372 idx++
373 if idx >= len(keys) {
374 idx = 0
375 }
376 key := keys[idx]
377 res = append(res, *kmems[key])
378 }
379 return res, nil
380}
381
382// GetClosestN returns the closest N member to a key in the hash ring.
383// This may be useful to find members for replication.
384func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) {
385 partID := c.FindPartitionID(key)
386 return c.getClosestN(partID, count)
387}
388
389// GetClosestNForPartition returns the closest N member for given partition.
390// This may be useful to find members for replication.
391func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) {
392 return c.getClosestN(partID, count)
393}