[VOL-5586] - Implement sorted set in redis-client for faster retrieval
Change-Id: Ie97a4d3dfa7f0cad77e3cbe86e76f09b6f84f9cb
Signed-off-by: Sridhar Ravindra <sridhar.ravindra@radisys.com>
diff --git a/VERSION b/VERSION
index 2ecabea..efe4dc6 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-7.8.5
+7.8.6
diff --git a/pkg/db/kvstore/redisclient.go b/pkg/db/kvstore/redisclient.go
index 3a7e512..f27fc06 100644
--- a/pkg/db/kvstore/redisclient.go
+++ b/pkg/db/kvstore/redisclient.go
@@ -33,6 +33,7 @@
watchedChannels sync.Map
writeLock sync.Mutex
keyReservationsLock sync.RWMutex
+ pageSize int
}
func NewRedisClient(addr string, timeout time.Duration, useSentinel bool) (*RedisClient, error) {
@@ -49,7 +50,7 @@
}
reservations := make(map[string]time.Duration)
- return &RedisClient{redisAPI: r, keyReservations: reservations}, nil
+ return &RedisClient{redisAPI: r, keyReservations: reservations, pageSize: 5000}, nil
}
func (c *RedisClient) Get(ctx context.Context, key string) (*KVPair, error) {
@@ -62,8 +63,12 @@
return NewKVPair(key, valBytes, "", 0, 0), nil
}
-func (c *RedisClient) Put(ctx context.Context, key string, value interface{}) error {
+const (
+ // Sorted set name for storing all keys
+ keysSetName = "voltha:keys"
+)
+func (c *RedisClient) Put(ctx context.Context, key string, value interface{}) error {
// Validate that we can convert value to a string as etcd API expects a string
var val string
var er error
@@ -71,53 +76,197 @@
return fmt.Errorf("unexpected-type-%T", value)
}
- // Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
- // that KV key permanent instead of automatically removing it after a lease expiration
- setErr := c.redisAPI.Set(ctx, key, val, 0)
- err := setErr.Err()
+ // Use a pipeline for atomic operations
+ pipe := c.redisAPI.TxPipeline()
+ // Set the key-value pair
+ pipe.Set(ctx, key, val, 0)
+
+ // Add key to sorted set with score 0 for efficient prefix queries
+ pipe.ZAdd(ctx, keysSetName, &redis.Z{
+ Score: 0,
+ Member: key,
+ })
+
+ // Execute pipeline
+ cmds, err := pipe.Exec(ctx)
if err != nil {
- switch setErr.Err() {
+ switch err {
case context.Canceled:
- logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
+ logger.Warnw(ctx, "redis put failed : context-cancelled", log.Fields{"error": err, "key": key})
case context.DeadlineExceeded:
- logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err})
+ logger.Warnw(ctx, "redis put failed : context-deadline-exceeded", log.Fields{"error": err, "key": key})
default:
- logger.Warnw(ctx, "bad-endpoints", log.Fields{"error": err})
+ logger.Warnw(ctx, "redis put failed : bad-endpoints", log.Fields{"error": err, "key": key})
}
return err
}
+
+ // Check individual command errors
+ for _, cmd := range cmds {
+ if cmd.Err() != nil {
+ logger.Warnw(ctx, "redis-pipeline-command-failed", log.Fields{"error": cmd.Err(), "key": key, "Cmd": cmd.String()})
+ return cmd.Err()
+ }
+ }
return nil
}
-func (c *RedisClient) scanAllKeysWithPrefix(ctx context.Context, key string) ([]string, error) {
- var err error
- allkeys := []string{}
- cont := true
- cursor := uint64(0)
- matchPrefix := key + "*"
+func (c *RedisClient) Delete(ctx context.Context, key string) error {
+ // Use a pipeline for atomic operations
+ pipe := c.redisAPI.TxPipeline()
- for cont {
- // search in the first 10000 entries starting from the point indicated by the cursor
- logger.Debugw(ctx, "redis-scan", log.Fields{"matchPrefix": matchPrefix, "cursor": cursor})
- var keys []string
- keys, cursor, err = c.redisAPI.Scan(context.Background(), cursor, matchPrefix, 10000).Result()
+ // Delete the key
+ pipe.Del(ctx, key)
+
+ // Remove key from sorted set
+ pipe.ZRem(ctx, keysSetName, key)
+
+ // Execute pipeline
+ cmds, err := pipe.Exec(ctx)
+ if err != nil {
+ logger.Errorw(ctx, "failed-to-delete-key", log.Fields{"key": key, "error": err})
+ return err
+ }
+ // Check individual command errors
+ for _, cmd := range cmds {
+ if cmd.Err() != nil && cmd.Err() != redis.Nil {
+ logger.Warnw(ctx, "redis-pipeline-command-failed", log.Fields{"error": cmd.Err(), "key": key, "Cmd": cmd.String()})
+ return cmd.Err()
+ }
+ }
+
+ logger.Debugw(ctx, "key-deleted", log.Fields{"key": key})
+ return nil
+}
+
+func (c *RedisClient) DeleteWithPrefix(ctx context.Context, prefixKey string) error {
+ var keys []string
+ var err error
+
+ // Use ZRANGEBYLEX to get keys with prefix efficiently
+ if keys, err = c.getKeysWithPrefixFromSortedSet(ctx, prefixKey); err != nil {
+ return err
+ }
+
+ if len(keys) == 0 {
+ logger.Warn(ctx, "nothing-to-delete-from-kv", log.Fields{"key": prefixKey})
+ return nil
+ }
+
+ // Delete keys in batches using pipeline
+ entryCount := int64(0)
+ start := 0
+ length := len(keys)
+
+ for start < length {
+ end := start + c.pageSize
+ if end >= length {
+ end = length
+ }
+ keysToDelete := keys[start:end]
+
+ pipe := c.redisAPI.TxPipeline()
+
+ // Delete the actual key-value pairs
+ pipe.Del(ctx, keysToDelete...)
+
+ // Remove keys from sorted set
+ members := make([]interface{}, len(keysToDelete))
+ for i, k := range keysToDelete {
+ members[i] = k
+ }
+ pipe.ZRem(ctx, keysSetName, members...)
+
+ // Execute pipeline
+ cmds, err := pipe.Exec(ctx)
+ if err != nil {
+ logger.Errorw(ctx, "DeleteWithPrefix method failed", log.Fields{"prefixKey": prefixKey, "numOfMatchedKeys": len(keysToDelete), "err": err})
+ return err
+ }
+ // Check individual command errors
+ for _, cmd := range cmds {
+ if cmd.Err() != nil && cmd.Err() != redis.Nil {
+ logger.Warnw(ctx, "redis-pipeline-command-failed", log.Fields{"error": cmd.Err(), "key": prefixKey, "Cmd": cmd.String()})
+ return cmd.Err()
+ }
+ }
+
+ // First command is Del, get its result
+ if len(cmds) > 0 {
+ if delCmd, ok := cmds[0].(*redis.IntCmd); ok {
+ count, _ := delCmd.Result()
+ entryCount += count
+ }
+ }
+
+ start = end
+ }
+
+ logger.Debugf(ctx, "%d entries matching with the key prefix %s have been deleted successfully", entryCount, prefixKey)
+ return nil
+}
+
+func (c *RedisClient) GetWithPrefix(ctx context.Context, prefix string) (map[string]*KVPair, error) {
+ var err error
+ var keys []string
+ m := make(map[string]*KVPair)
+ var values []interface{}
+
+ // Use ZRANGEBYLEX to get keys with prefix efficiently
+ if keys, err = c.getKeysWithPrefixFromSortedSet(ctx, prefix); err != nil {
+ return nil, err
+ }
+
+ if len(keys) != 0 {
+ values, err = c.redisAPI.MGet(ctx, keys...).Result()
if err != nil {
return nil, err
}
- if cursor == 0 {
- // all data searched. break the loop
- logger.Debugw(ctx, "redis-scan-ended", log.Fields{"matchPrefix": matchPrefix, "cursor": cursor})
- cont = false
- }
- if len(keys) == 0 {
- // no matched data found in this cycle. Continue to search
- logger.Debugw(ctx, "redis-scan-no-data-found-continue", log.Fields{"matchPrefix": matchPrefix, "cursor": cursor})
- continue
- }
- allkeys = append(allkeys, keys...)
}
- return allkeys, nil
+
+ for i, key := range keys {
+ if valBytes, err := ToByte(values[i]); err == nil {
+ m[key] = NewKVPair(key, interface{}(valBytes), "", 0, 0)
+ }
+ }
+ return m, nil
+}
+
+func (c *RedisClient) GetWithPrefixKeysOnly(ctx context.Context, prefix string) ([]string, error) {
+ // Use ZRANGEBYLEX to get keys with prefix efficiently
+ keys, err := c.getKeysWithPrefixFromSortedSet(ctx, prefix)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get keys with prefix %s: %v", prefix, err)
+ }
+
+ if len(keys) == 0 {
+ logger.Debugw(ctx, "no-keys-found", log.Fields{"prefix": prefix})
+ }
+
+ return keys, nil
+}
+
+// Helper function to retrieve keys with prefix using ZRANGEBYLEX
+func (c *RedisClient) getKeysWithPrefixFromSortedSet(ctx context.Context, prefix string) ([]string, error) {
+ // ZRANGEBYLEX uses lexicographical ordering
+ // min: [prefix - includes prefix
+ // max: [prefix\xff - includes all keys starting with prefix
+ min := "[" + prefix
+ max := "[" + prefix + "\xff"
+
+ keys, err := c.redisAPI.ZRangeByLex(ctx, keysSetName, &redis.ZRangeBy{
+ Min: min,
+ Max: max,
+ }).Result()
+
+ if err != nil {
+ logger.Errorw(ctx, "failed-to-get-keys-with-prefix", log.Fields{"prefix": prefix, "error": err})
+ return nil, err
+ }
+
+ logger.Debugw(ctx, "keys-retrieved-with-prefix", log.Fields{"prefix": prefix, "count": len(keys)})
+ return keys, nil
}
func (c *RedisClient) KeyExists(ctx context.Context, key string) (bool, error) {
@@ -139,7 +288,8 @@
m := make(map[string]*KVPair)
var values []interface{}
- if keys, err = c.scanAllKeysWithPrefix(ctx, key); err != nil {
+ // Use ZRANGEBYLEX to get keys with prefix efficiently
+ if keys, err = c.getKeysWithPrefixFromSortedSet(ctx, key); err != nil {
return nil, err
}
@@ -157,49 +307,6 @@
return m, nil
}
-func (c *RedisClient) Delete(ctx context.Context, key string) error {
- // delete the key
- if _, err := c.redisAPI.Del(ctx, key).Result(); err != nil {
- logger.Errorw(ctx, "failed-to-delete-key", log.Fields{"key": key, "error": err})
- return err
- }
- logger.Debugw(ctx, "key(s)-deleted", log.Fields{"key": key})
- return nil
-}
-
-func (c *RedisClient) DeleteWithPrefix(ctx context.Context, prefixKey string) error {
- var keys []string
- var err error
- if keys, err = c.scanAllKeysWithPrefix(ctx, prefixKey); err != nil {
- return err
- }
- if len(keys) == 0 {
- logger.Warn(ctx, "nothing-to-delete-from-kv", log.Fields{"key": prefixKey})
- return nil
- }
- //call delete for keys
- entryCount := int64(0)
- start := 0
- pageSize := 5000
- length := len(keys)
- for start < length {
- end := start + pageSize
- if end >= length {
- end = length
- }
- keysToDelete := keys[start:end]
- count := int64(0)
- if count, err = c.redisAPI.Del(ctx, keysToDelete...).Result(); err != nil {
- logger.Errorw(ctx, "DeleteWithPrefix method failed", log.Fields{"prefixKey": prefixKey, "numOfMatchedKeys": len(keysToDelete), "err": err})
- return err
- }
- entryCount += count
- start = end
- }
- logger.Debugf(ctx, "%d entries matching with the key prefix %s have been deleted successfully", entryCount, prefixKey)
- return nil
-}
-
func (c *RedisClient) Reserve(ctx context.Context, key string, value interface{}, ttl time.Duration) (interface{}, error) {
var val string
var er error
@@ -428,41 +535,3 @@
logger.Errorw(ctx, "error-closing-client", log.Fields{"error": err})
}
}
-
-func (c *RedisClient) GetWithPrefix(ctx context.Context, prefix string) (map[string]*KVPair, error) {
- var err error
- var keys []string
- m := make(map[string]*KVPair)
- var values []interface{}
-
- if keys, err = c.scanAllKeysWithPrefix(ctx, prefix); err != nil {
- return nil, err
- }
-
- if len(keys) != 0 {
- values, err = c.redisAPI.MGet(ctx, keys...).Result()
- if err != nil {
- return nil, err
- }
- }
- for i, key := range keys {
- if valBytes, err := ToByte(values[i]); err == nil {
- m[key] = NewKVPair(key, interface{}(valBytes), "", 0, 0)
- }
- }
- return m, nil
-}
-
-func (c *RedisClient) GetWithPrefixKeysOnly(ctx context.Context, prefix string) ([]string, error) {
- // Use the scanAllKeysWithPrefix function to fetch keys matching the prefix
- keys, err := c.scanAllKeysWithPrefix(ctx, prefix)
- if err != nil {
- return nil, fmt.Errorf("failed to scan keys with prefix %s: %v", prefix, err)
- }
-
- if len(keys) == 0 {
- logger.Debugw(ctx, "no-keys-found", log.Fields{"prefix": prefix})
- }
-
- return keys, nil
-}