[VOL-5586] - Implement sorted set in redis-client for faster retrieval

Change-Id: Ie97a4d3dfa7f0cad77e3cbe86e76f09b6f84f9cb
Signed-off-by: Sridhar Ravindra <sridhar.ravindra@radisys.com>
diff --git a/VERSION b/VERSION
index 2ecabea..efe4dc6 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-7.8.5
+7.8.6
diff --git a/pkg/db/kvstore/redisclient.go b/pkg/db/kvstore/redisclient.go
index 3a7e512..f27fc06 100644
--- a/pkg/db/kvstore/redisclient.go
+++ b/pkg/db/kvstore/redisclient.go
@@ -33,6 +33,7 @@
 	watchedChannels     sync.Map
 	writeLock           sync.Mutex
 	keyReservationsLock sync.RWMutex
+	pageSize            int
 }
 
 func NewRedisClient(addr string, timeout time.Duration, useSentinel bool) (*RedisClient, error) {
@@ -49,7 +50,7 @@
 	}
 
 	reservations := make(map[string]time.Duration)
-	return &RedisClient{redisAPI: r, keyReservations: reservations}, nil
+	return &RedisClient{redisAPI: r, keyReservations: reservations, pageSize: 5000}, nil
 }
 
 func (c *RedisClient) Get(ctx context.Context, key string) (*KVPair, error) {
@@ -62,8 +63,12 @@
 	return NewKVPair(key, valBytes, "", 0, 0), nil
 }
 
-func (c *RedisClient) Put(ctx context.Context, key string, value interface{}) error {
+const (
+	// Sorted set name for storing all keys
+	keysSetName = "voltha:keys"
+)
 
+func (c *RedisClient) Put(ctx context.Context, key string, value interface{}) error {
 	// Validate that we can convert value to a string as etcd API expects a string
 	var val string
 	var er error
@@ -71,53 +76,197 @@
 		return fmt.Errorf("unexpected-type-%T", value)
 	}
 
-	// Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
-	// that KV key permanent instead of automatically removing it after a lease expiration
-	setErr := c.redisAPI.Set(ctx, key, val, 0)
-	err := setErr.Err()
+	// Use a pipeline for atomic operations
+	pipe := c.redisAPI.TxPipeline()
 
+	// Set the key-value pair
+	pipe.Set(ctx, key, val, 0)
+
+	// Add key to sorted set with score 0 for efficient prefix queries
+	pipe.ZAdd(ctx, keysSetName, &redis.Z{
+		Score:  0,
+		Member: key,
+	})
+
+	// Execute pipeline
+	cmds, err := pipe.Exec(ctx)
 	if err != nil {
-		switch setErr.Err() {
+		switch err {
 		case context.Canceled:
-			logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
+			logger.Warnw(ctx, "redis put failed : context-cancelled", log.Fields{"error": err, "key": key})
 		case context.DeadlineExceeded:
-			logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err})
+			logger.Warnw(ctx, "redis put failed : context-deadline-exceeded", log.Fields{"error": err, "key": key})
 		default:
-			logger.Warnw(ctx, "bad-endpoints", log.Fields{"error": err})
+			logger.Warnw(ctx, "redis put failed : bad-endpoints", log.Fields{"error": err, "key": key})
 		}
 		return err
 	}
+
+	// Check individual command errors
+	for _, cmd := range cmds {
+		if cmd.Err() != nil {
+			logger.Warnw(ctx, "redis-pipeline-command-failed", log.Fields{"error": cmd.Err(), "key": key, "Cmd": cmd.String()})
+			return cmd.Err()
+		}
+	}
 	return nil
 }
 
-func (c *RedisClient) scanAllKeysWithPrefix(ctx context.Context, key string) ([]string, error) {
-	var err error
-	allkeys := []string{}
-	cont := true
-	cursor := uint64(0)
-	matchPrefix := key + "*"
+func (c *RedisClient) Delete(ctx context.Context, key string) error {
+	// Use a pipeline for atomic operations
+	pipe := c.redisAPI.TxPipeline()
 
-	for cont {
-		// search in the first 10000 entries starting from the point indicated by the cursor
-		logger.Debugw(ctx, "redis-scan", log.Fields{"matchPrefix": matchPrefix, "cursor": cursor})
-		var keys []string
-		keys, cursor, err = c.redisAPI.Scan(context.Background(), cursor, matchPrefix, 10000).Result()
+	// Delete the key
+	pipe.Del(ctx, key)
+
+	// Remove key from sorted set
+	pipe.ZRem(ctx, keysSetName, key)
+
+	// Execute pipeline
+	cmds, err := pipe.Exec(ctx)
+	if err != nil {
+		logger.Errorw(ctx, "failed-to-delete-key", log.Fields{"key": key, "error": err})
+		return err
+	}
+	// Check individual command errors
+	for _, cmd := range cmds {
+		if cmd.Err() != nil && cmd.Err() != redis.Nil {
+			logger.Warnw(ctx, "redis-pipeline-command-failed", log.Fields{"error": cmd.Err(), "key": key, "Cmd": cmd.String()})
+			return cmd.Err()
+		}
+	}
+
+	logger.Debugw(ctx, "key-deleted", log.Fields{"key": key})
+	return nil
+}
+
+func (c *RedisClient) DeleteWithPrefix(ctx context.Context, prefixKey string) error {
+	var keys []string
+	var err error
+
+	// Use ZRANGEBYLEX to get keys with prefix efficiently
+	if keys, err = c.getKeysWithPrefixFromSortedSet(ctx, prefixKey); err != nil {
+		return err
+	}
+
+	if len(keys) == 0 {
+		logger.Warn(ctx, "nothing-to-delete-from-kv", log.Fields{"key": prefixKey})
+		return nil
+	}
+
+	// Delete keys in batches using pipeline
+	entryCount := int64(0)
+	start := 0
+	length := len(keys)
+
+	for start < length {
+		end := start + c.pageSize
+		if end >= length {
+			end = length
+		}
+		keysToDelete := keys[start:end]
+
+		pipe := c.redisAPI.TxPipeline()
+
+		// Delete the actual key-value pairs
+		pipe.Del(ctx, keysToDelete...)
+
+		// Remove keys from sorted set
+		members := make([]interface{}, len(keysToDelete))
+		for i, k := range keysToDelete {
+			members[i] = k
+		}
+		pipe.ZRem(ctx, keysSetName, members...)
+
+		// Execute pipeline
+		cmds, err := pipe.Exec(ctx)
+		if err != nil {
+			logger.Errorw(ctx, "DeleteWithPrefix method failed", log.Fields{"prefixKey": prefixKey, "numOfMatchedKeys": len(keysToDelete), "err": err})
+			return err
+		}
+		// Check individual command errors
+		for _, cmd := range cmds {
+			if cmd.Err() != nil && cmd.Err() != redis.Nil {
+				logger.Warnw(ctx, "redis-pipeline-command-failed", log.Fields{"error": cmd.Err(), "key": prefixKey, "Cmd": cmd.String()})
+				return cmd.Err()
+			}
+		}
+
+		// First command is Del, get its result
+		if len(cmds) > 0 {
+			if delCmd, ok := cmds[0].(*redis.IntCmd); ok {
+				count, _ := delCmd.Result()
+				entryCount += count
+			}
+		}
+
+		start = end
+	}
+
+	logger.Debugf(ctx, "%d entries matching with the key prefix %s have been deleted successfully", entryCount, prefixKey)
+	return nil
+}
+
+func (c *RedisClient) GetWithPrefix(ctx context.Context, prefix string) (map[string]*KVPair, error) {
+	var err error
+	var keys []string
+	m := make(map[string]*KVPair)
+	var values []interface{}
+
+	// Use ZRANGEBYLEX to get keys with prefix efficiently
+	if keys, err = c.getKeysWithPrefixFromSortedSet(ctx, prefix); err != nil {
+		return nil, err
+	}
+
+	if len(keys) != 0 {
+		values, err = c.redisAPI.MGet(ctx, keys...).Result()
 		if err != nil {
 			return nil, err
 		}
-		if cursor == 0 {
-			// all data searched. break the loop
-			logger.Debugw(ctx, "redis-scan-ended", log.Fields{"matchPrefix": matchPrefix, "cursor": cursor})
-			cont = false
-		}
-		if len(keys) == 0 {
-			// no matched data found in this cycle. Continue to search
-			logger.Debugw(ctx, "redis-scan-no-data-found-continue", log.Fields{"matchPrefix": matchPrefix, "cursor": cursor})
-			continue
-		}
-		allkeys = append(allkeys, keys...)
 	}
-	return allkeys, nil
+
+	for i, key := range keys {
+		if valBytes, err := ToByte(values[i]); err == nil {
+			m[key] = NewKVPair(key, interface{}(valBytes), "", 0, 0)
+		}
+	}
+	return m, nil
+}
+
+func (c *RedisClient) GetWithPrefixKeysOnly(ctx context.Context, prefix string) ([]string, error) {
+	// Use ZRANGEBYLEX to get keys with prefix efficiently
+	keys, err := c.getKeysWithPrefixFromSortedSet(ctx, prefix)
+	if err != nil {
+		return nil, fmt.Errorf("failed to get keys with prefix %s: %v", prefix, err)
+	}
+
+	if len(keys) == 0 {
+		logger.Debugw(ctx, "no-keys-found", log.Fields{"prefix": prefix})
+	}
+
+	return keys, nil
+}
+
+// Helper function to retrieve keys with prefix using ZRANGEBYLEX
+func (c *RedisClient) getKeysWithPrefixFromSortedSet(ctx context.Context, prefix string) ([]string, error) {
+	// ZRANGEBYLEX uses lexicographical ordering
+	// min: [prefix - includes prefix
+	// max: [prefix\xff - includes all keys starting with prefix
+	min := "[" + prefix
+	max := "[" + prefix + "\xff"
+
+	keys, err := c.redisAPI.ZRangeByLex(ctx, keysSetName, &redis.ZRangeBy{
+		Min: min,
+		Max: max,
+	}).Result()
+
+	if err != nil {
+		logger.Errorw(ctx, "failed-to-get-keys-with-prefix", log.Fields{"prefix": prefix, "error": err})
+		return nil, err
+	}
+
+	logger.Debugw(ctx, "keys-retrieved-with-prefix", log.Fields{"prefix": prefix, "count": len(keys)})
+	return keys, nil
 }
 
 func (c *RedisClient) KeyExists(ctx context.Context, key string) (bool, error) {
@@ -139,7 +288,8 @@
 	m := make(map[string]*KVPair)
 	var values []interface{}
 
-	if keys, err = c.scanAllKeysWithPrefix(ctx, key); err != nil {
+	// Use ZRANGEBYLEX to get keys with prefix efficiently
+	if keys, err = c.getKeysWithPrefixFromSortedSet(ctx, key); err != nil {
 		return nil, err
 	}
 
@@ -157,49 +307,6 @@
 	return m, nil
 }
 
-func (c *RedisClient) Delete(ctx context.Context, key string) error {
-	// delete the key
-	if _, err := c.redisAPI.Del(ctx, key).Result(); err != nil {
-		logger.Errorw(ctx, "failed-to-delete-key", log.Fields{"key": key, "error": err})
-		return err
-	}
-	logger.Debugw(ctx, "key(s)-deleted", log.Fields{"key": key})
-	return nil
-}
-
-func (c *RedisClient) DeleteWithPrefix(ctx context.Context, prefixKey string) error {
-	var keys []string
-	var err error
-	if keys, err = c.scanAllKeysWithPrefix(ctx, prefixKey); err != nil {
-		return err
-	}
-	if len(keys) == 0 {
-		logger.Warn(ctx, "nothing-to-delete-from-kv", log.Fields{"key": prefixKey})
-		return nil
-	}
-	//call delete for keys
-	entryCount := int64(0)
-	start := 0
-	pageSize := 5000
-	length := len(keys)
-	for start < length {
-		end := start + pageSize
-		if end >= length {
-			end = length
-		}
-		keysToDelete := keys[start:end]
-		count := int64(0)
-		if count, err = c.redisAPI.Del(ctx, keysToDelete...).Result(); err != nil {
-			logger.Errorw(ctx, "DeleteWithPrefix method failed", log.Fields{"prefixKey": prefixKey, "numOfMatchedKeys": len(keysToDelete), "err": err})
-			return err
-		}
-		entryCount += count
-		start = end
-	}
-	logger.Debugf(ctx, "%d entries matching with the key prefix %s have been deleted successfully", entryCount, prefixKey)
-	return nil
-}
-
 func (c *RedisClient) Reserve(ctx context.Context, key string, value interface{}, ttl time.Duration) (interface{}, error) {
 	var val string
 	var er error
@@ -428,41 +535,3 @@
 		logger.Errorw(ctx, "error-closing-client", log.Fields{"error": err})
 	}
 }
-
-func (c *RedisClient) GetWithPrefix(ctx context.Context, prefix string) (map[string]*KVPair, error) {
-	var err error
-	var keys []string
-	m := make(map[string]*KVPair)
-	var values []interface{}
-
-	if keys, err = c.scanAllKeysWithPrefix(ctx, prefix); err != nil {
-		return nil, err
-	}
-
-	if len(keys) != 0 {
-		values, err = c.redisAPI.MGet(ctx, keys...).Result()
-		if err != nil {
-			return nil, err
-		}
-	}
-	for i, key := range keys {
-		if valBytes, err := ToByte(values[i]); err == nil {
-			m[key] = NewKVPair(key, interface{}(valBytes), "", 0, 0)
-		}
-	}
-	return m, nil
-}
-
-func (c *RedisClient) GetWithPrefixKeysOnly(ctx context.Context, prefix string) ([]string, error) {
-	// Use the scanAllKeysWithPrefix function to fetch keys matching the prefix
-	keys, err := c.scanAllKeysWithPrefix(ctx, prefix)
-	if err != nil {
-		return nil, fmt.Errorf("failed to scan keys with prefix %s: %v", prefix, err)
-	}
-
-	if len(keys) == 0 {
-		logger.Debugw(ctx, "no-keys-found", log.Fields{"prefix": prefix})
-	}
-
-	return keys, nil
-}