| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 1 | // Copyright (c) 2018-2022 Burak Sezer |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 2 | // All rights reserved. |
| 3 | // |
| 4 | // This code is licensed under the MIT License. |
| 5 | // |
| 6 | // Permission is hereby granted, free of charge, to any person obtaining a copy |
| 7 | // of this software and associated documentation files(the "Software"), to deal |
| 8 | // in the Software without restriction, including without limitation the rights |
| 9 | // to use, copy, modify, merge, publish, distribute, sublicense, and / or sell |
| 10 | // copies of the Software, and to permit persons to whom the Software is |
| 11 | // furnished to do so, subject to the following conditions : |
| 12 | // |
| 13 | // The above copyright notice and this permission notice shall be included in |
| 14 | // all copies or substantial portions of the Software. |
| 15 | // |
| 16 | // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 17 | // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 18 | // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE |
| 19 | // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 20 | // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 21 | // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| 22 | // THE SOFTWARE. |
| 23 | |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 24 | // Package consistent provides a consistent hashing function with bounded loads. This implementation also adds |
| 25 | // partitioning logic on top of the original algorithm. For more information about the underlying algorithm, |
| 26 | // please take a look at https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 27 | // |
| 28 | // Example Use: |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 29 | // |
| 30 | // cfg := consistent.Config{ |
| 31 | // PartitionCount: 71, |
| 32 | // ReplicationFactor: 20, |
| 33 | // Load: 1.25, |
| 34 | // Hasher: hasher{}, |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 35 | // } |
| 36 | // |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 37 | // Now you can create a new Consistent instance. This function can take a list of the members. |
| 38 | // |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 39 | // c := consistent.New(members, cfg) |
| 40 | // |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 41 | // In the following sample, you add a new Member to the consistent hash ring. myMember is just a Go struct that |
| 42 | // implements the Member interface. You should know that modifying the consistent hash ring distributes partitions among |
| 43 | // members using the algorithm defined on Google Research Blog. |
| 44 | // |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 45 | // c.Add(myMember) |
| 46 | // |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 47 | // Remove a member from the consistent hash ring: |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 48 | // |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 49 | // c.Remove(member-name) |
| 50 | // |
| 51 | // LocateKey hashes the key and calculates partition ID with this modulo operation: MOD(hash result, partition count) |
| 52 | // The owner of the partition is already calculated by New/Add/Remove. LocateKey just returns the member that is responsible |
| 53 | // for the key. |
| 54 | // |
| 55 | // key := []byte("my-key") |
| 56 | // member := c.LocateKey(key) |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 57 | package consistent |
| 58 | |
| 59 | import ( |
| 60 | "encoding/binary" |
| 61 | "errors" |
| 62 | "fmt" |
| 63 | "math" |
| 64 | "sort" |
| 65 | "sync" |
| 66 | ) |
| 67 | |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 68 | const ( |
| 69 | DefaultPartitionCount int = 271 |
| 70 | DefaultReplicationFactor int = 20 |
| 71 | DefaultLoad float64 = 1.25 |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 72 | ) |
| 73 | |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 74 | // ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task. |
| 75 | var ErrInsufficientMemberCount = errors.New("insufficient member count") |
| 76 | |
| 77 | // Hasher is responsible for generating unsigned, 64-bit hash of provided byte slice. |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 78 | // Hasher should minimize collisions (generating same hash for different byte slice) |
| 79 | // and while performance is also important fast functions are preferable (i.e. |
| 80 | // you can use FarmHash family). |
| 81 | type Hasher interface { |
| 82 | Sum64([]byte) uint64 |
| 83 | } |
| 84 | |
| 85 | // Member interface represents a member in consistent hash ring. |
| 86 | type Member interface { |
| 87 | String() string |
| 88 | } |
| 89 | |
| 90 | // Config represents a structure to control consistent package. |
| 91 | type Config struct { |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 92 | // Hasher is responsible for generating unsigned, 64-bit hash of provided byte slice. |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 93 | Hasher Hasher |
| 94 | |
| 95 | // Keys are distributed among partitions. Prime numbers are good to |
| 96 | // distribute keys uniformly. Select a big PartitionCount if you have |
| 97 | // too many keys. |
| 98 | PartitionCount int |
| 99 | |
| 100 | // Members are replicated on consistent hash ring. This number means that a member |
| 101 | // how many times replicated on the ring. |
| 102 | ReplicationFactor int |
| 103 | |
| 104 | // Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it. |
| 105 | Load float64 |
| 106 | } |
| 107 | |
| 108 | // Consistent holds the information about the members of the consistent hash circle. |
| 109 | type Consistent struct { |
| 110 | mu sync.RWMutex |
| 111 | |
| 112 | config Config |
| 113 | hasher Hasher |
| 114 | sortedSet []uint64 |
| 115 | partitionCount uint64 |
| 116 | loads map[string]float64 |
| 117 | members map[string]*Member |
| 118 | partitions map[int]*Member |
| 119 | ring map[uint64]*Member |
| 120 | } |
| 121 | |
| 122 | // New creates and returns a new Consistent object. |
| 123 | func New(members []Member, config Config) *Consistent { |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 124 | if config.Hasher == nil { |
| 125 | panic("Hasher cannot be nil") |
| 126 | } |
| 127 | if config.PartitionCount == 0 { |
| 128 | config.PartitionCount = DefaultPartitionCount |
| 129 | } |
| 130 | if config.ReplicationFactor == 0 { |
| 131 | config.ReplicationFactor = DefaultReplicationFactor |
| 132 | } |
| 133 | if config.Load == 0 { |
| 134 | config.Load = DefaultLoad |
| 135 | } |
| 136 | |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 137 | c := &Consistent{ |
| 138 | config: config, |
| 139 | members: make(map[string]*Member), |
| 140 | partitionCount: uint64(config.PartitionCount), |
| 141 | ring: make(map[uint64]*Member), |
| 142 | } |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 143 | |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 144 | c.hasher = config.Hasher |
| 145 | for _, member := range members { |
| 146 | c.add(member) |
| 147 | } |
| 148 | if members != nil { |
| 149 | c.distributePartitions() |
| 150 | } |
| 151 | return c |
| 152 | } |
| 153 | |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 154 | // GetMembers returns a thread-safe copy of members. If there are no members, it returns an empty slice of Member. |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 155 | func (c *Consistent) GetMembers() []Member { |
| 156 | c.mu.RLock() |
| 157 | defer c.mu.RUnlock() |
| 158 | |
| 159 | // Create a thread-safe copy of member list. |
| 160 | members := make([]Member, 0, len(c.members)) |
| 161 | for _, member := range c.members { |
| 162 | members = append(members, *member) |
| 163 | } |
| 164 | return members |
| 165 | } |
| 166 | |
| 167 | // AverageLoad exposes the current average load. |
| 168 | func (c *Consistent) AverageLoad() float64 { |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 169 | c.mu.RLock() |
| 170 | defer c.mu.RUnlock() |
| 171 | |
| 172 | return c.averageLoad() |
| 173 | } |
| 174 | |
| 175 | func (c *Consistent) averageLoad() float64 { |
| 176 | if len(c.members) == 0 { |
| 177 | return 0 |
| 178 | } |
| 179 | |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 180 | avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load |
| 181 | return math.Ceil(avgLoad) |
| 182 | } |
| 183 | |
| 184 | func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) { |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 185 | avgLoad := c.averageLoad() |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 186 | var count int |
| 187 | for { |
| 188 | count++ |
| 189 | if count >= len(c.sortedSet) { |
| 190 | // User needs to decrease partition count, increase member count or increase load factor. |
| 191 | panic("not enough room to distribute partitions") |
| 192 | } |
| 193 | i := c.sortedSet[idx] |
| 194 | member := *c.ring[i] |
| 195 | load := loads[member.String()] |
| 196 | if load+1 <= avgLoad { |
| 197 | partitions[partID] = &member |
| 198 | loads[member.String()]++ |
| 199 | return |
| 200 | } |
| 201 | idx++ |
| 202 | if idx >= len(c.sortedSet) { |
| 203 | idx = 0 |
| 204 | } |
| 205 | } |
| 206 | } |
| 207 | |
| 208 | func (c *Consistent) distributePartitions() { |
| 209 | loads := make(map[string]float64) |
| 210 | partitions := make(map[int]*Member) |
| 211 | |
| 212 | bs := make([]byte, 8) |
| 213 | for partID := uint64(0); partID < c.partitionCount; partID++ { |
| 214 | binary.LittleEndian.PutUint64(bs, partID) |
| 215 | key := c.hasher.Sum64(bs) |
| 216 | idx := sort.Search(len(c.sortedSet), func(i int) bool { |
| 217 | return c.sortedSet[i] >= key |
| 218 | }) |
| 219 | if idx >= len(c.sortedSet) { |
| 220 | idx = 0 |
| 221 | } |
| 222 | c.distributeWithLoad(int(partID), idx, partitions, loads) |
| 223 | } |
| 224 | c.partitions = partitions |
| 225 | c.loads = loads |
| 226 | } |
| 227 | |
| 228 | func (c *Consistent) add(member Member) { |
| 229 | for i := 0; i < c.config.ReplicationFactor; i++ { |
| 230 | key := []byte(fmt.Sprintf("%s%d", member.String(), i)) |
| 231 | h := c.hasher.Sum64(key) |
| 232 | c.ring[h] = &member |
| 233 | c.sortedSet = append(c.sortedSet, h) |
| 234 | } |
| 235 | // sort hashes ascendingly |
| 236 | sort.Slice(c.sortedSet, func(i int, j int) bool { |
| 237 | return c.sortedSet[i] < c.sortedSet[j] |
| 238 | }) |
| 239 | // Storing member at this map is useful to find backup members of a partition. |
| 240 | c.members[member.String()] = &member |
| 241 | } |
| 242 | |
| 243 | // Add adds a new member to the consistent hash circle. |
| 244 | func (c *Consistent) Add(member Member) { |
| 245 | c.mu.Lock() |
| 246 | defer c.mu.Unlock() |
| 247 | |
| 248 | if _, ok := c.members[member.String()]; ok { |
| 249 | // We already have this member. Quit immediately. |
| 250 | return |
| 251 | } |
| 252 | c.add(member) |
| 253 | c.distributePartitions() |
| 254 | } |
| 255 | |
| 256 | func (c *Consistent) delSlice(val uint64) { |
| 257 | for i := 0; i < len(c.sortedSet); i++ { |
| 258 | if c.sortedSet[i] == val { |
| 259 | c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...) |
| 260 | break |
| 261 | } |
| 262 | } |
| 263 | } |
| 264 | |
| 265 | // Remove removes a member from the consistent hash circle. |
| 266 | func (c *Consistent) Remove(name string) { |
| 267 | c.mu.Lock() |
| 268 | defer c.mu.Unlock() |
| 269 | |
| 270 | if _, ok := c.members[name]; !ok { |
| 271 | // There is no member with that name. Quit immediately. |
| 272 | return |
| 273 | } |
| 274 | |
| 275 | for i := 0; i < c.config.ReplicationFactor; i++ { |
| 276 | key := []byte(fmt.Sprintf("%s%d", name, i)) |
| 277 | h := c.hasher.Sum64(key) |
| 278 | delete(c.ring, h) |
| 279 | c.delSlice(h) |
| 280 | } |
| 281 | delete(c.members, name) |
| 282 | if len(c.members) == 0 { |
| 283 | // consistent hash ring is empty now. Reset the partition table. |
| 284 | c.partitions = make(map[int]*Member) |
| 285 | return |
| 286 | } |
| 287 | c.distributePartitions() |
| 288 | } |
| 289 | |
| 290 | // LoadDistribution exposes load distribution of members. |
| 291 | func (c *Consistent) LoadDistribution() map[string]float64 { |
| 292 | c.mu.RLock() |
| 293 | defer c.mu.RUnlock() |
| 294 | |
| 295 | // Create a thread-safe copy |
| 296 | res := make(map[string]float64) |
| 297 | for member, load := range c.loads { |
| 298 | res[member] = load |
| 299 | } |
| 300 | return res |
| 301 | } |
| 302 | |
| 303 | // FindPartitionID returns partition id for given key. |
| 304 | func (c *Consistent) FindPartitionID(key []byte) int { |
| 305 | hkey := c.hasher.Sum64(key) |
| 306 | return int(hkey % c.partitionCount) |
| 307 | } |
| 308 | |
| 309 | // GetPartitionOwner returns the owner of the given partition. |
| 310 | func (c *Consistent) GetPartitionOwner(partID int) Member { |
| 311 | c.mu.RLock() |
| 312 | defer c.mu.RUnlock() |
| 313 | |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 314 | return c.getPartitionOwner(partID) |
| 315 | } |
| 316 | |
| 317 | // getPartitionOwner returns the owner of the given partition. It's not thread-safe. |
| 318 | func (c *Consistent) getPartitionOwner(partID int) Member { |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 319 | member, ok := c.partitions[partID] |
| 320 | if !ok { |
| 321 | return nil |
| 322 | } |
| 323 | // Create a thread-safe copy of member and return it. |
| 324 | return *member |
| 325 | } |
| 326 | |
| 327 | // LocateKey finds a home for given key |
| 328 | func (c *Consistent) LocateKey(key []byte) Member { |
| 329 | partID := c.FindPartitionID(key) |
| 330 | return c.GetPartitionOwner(partID) |
| 331 | } |
| 332 | |
| 333 | func (c *Consistent) getClosestN(partID, count int) ([]Member, error) { |
| 334 | c.mu.RLock() |
| 335 | defer c.mu.RUnlock() |
| 336 | |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 337 | var res []Member |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 338 | if count > len(c.members) { |
| 339 | return res, ErrInsufficientMemberCount |
| 340 | } |
| 341 | |
| 342 | var ownerKey uint64 |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 343 | owner := c.getPartitionOwner(partID) |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 344 | // Hash and sort all the names. |
| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 345 | var keys []uint64 |
| Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 346 | kmems := make(map[uint64]*Member) |
| 347 | for name, member := range c.members { |
| 348 | key := c.hasher.Sum64([]byte(name)) |
| 349 | if name == owner.String() { |
| 350 | ownerKey = key |
| 351 | } |
| 352 | keys = append(keys, key) |
| 353 | kmems[key] = member |
| 354 | } |
| 355 | sort.Slice(keys, func(i, j int) bool { |
| 356 | return keys[i] < keys[j] |
| 357 | }) |
| 358 | |
| 359 | // Find the key owner |
| 360 | idx := 0 |
| 361 | for idx < len(keys) { |
| 362 | if keys[idx] == ownerKey { |
| 363 | key := keys[idx] |
| 364 | res = append(res, *kmems[key]) |
| 365 | break |
| 366 | } |
| 367 | idx++ |
| 368 | } |
| 369 | |
| 370 | // Find the closest(replica owners) members. |
| 371 | for len(res) < count { |
| 372 | idx++ |
| 373 | if idx >= len(keys) { |
| 374 | idx = 0 |
| 375 | } |
| 376 | key := keys[idx] |
| 377 | res = append(res, *kmems[key]) |
| 378 | } |
| 379 | return res, nil |
| 380 | } |
| 381 | |
| 382 | // GetClosestN returns the closest N member to a key in the hash ring. |
| 383 | // This may be useful to find members for replication. |
| 384 | func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) { |
| 385 | partID := c.FindPartitionID(key) |
| 386 | return c.getClosestN(partID, count) |
| 387 | } |
| 388 | |
| 389 | // GetClosestNForPartition returns the closest N member for given partition. |
| 390 | // This may be useful to find members for replication. |
| 391 | func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) { |
| 392 | return c.getClosestN(partID, count) |
| 393 | } |