blob: f27fc06fcfd93a020eb958b66d1873ade335dae2 [file] [log] [blame]
serkant.uluderyae5afeff2021-02-23 18:00:23 +03001/*
Joey Armstrong9cdee9f2024-01-03 04:56:14 -05002* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors
serkant.uluderyae5afeff2021-02-23 18:00:23 +03003
Joey Armstrong7f8436c2023-07-09 20:23:27 -04004* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
serkant.uluderyae5afeff2021-02-23 18:00:23 +03007
Joey Armstrong7f8436c2023-07-09 20:23:27 -04008* http://www.apache.org/licenses/LICENSE-2.0
serkant.uluderyae5afeff2021-02-23 18:00:23 +03009
Joey Armstrong7f8436c2023-07-09 20:23:27 -040010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
serkant.uluderyae5afeff2021-02-23 18:00:23 +030015 */
16package kvstore
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 "strings"
23 "sync"
24 "time"
25
26 "github.com/go-redis/redis/v8"
27 "github.com/opencord/voltha-lib-go/v7/pkg/log"
28)
29
30type RedisClient struct {
31 redisAPI *redis.Client
32 keyReservations map[string]time.Duration
33 watchedChannels sync.Map
34 writeLock sync.Mutex
35 keyReservationsLock sync.RWMutex
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +053036 pageSize int
serkant.uluderyae5afeff2021-02-23 18:00:23 +030037}
38
39func NewRedisClient(addr string, timeout time.Duration, useSentinel bool) (*RedisClient, error) {
40 var r *redis.Client
41 if !useSentinel {
42 r = redis.NewClient(&redis.Options{Addr: addr})
43 } else {
44 // Redis Master-Replicas with Sentinel, sentinel masterSet config
45 // should be set to sebaRedis
46 r = redis.NewFailoverClient(&redis.FailoverOptions{
47 MasterName: "sebaRedis",
48 SentinelAddrs: []string{addr},
49 })
50 }
51
52 reservations := make(map[string]time.Duration)
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +053053 return &RedisClient{redisAPI: r, keyReservations: reservations, pageSize: 5000}, nil
serkant.uluderyae5afeff2021-02-23 18:00:23 +030054}
55
56func (c *RedisClient) Get(ctx context.Context, key string) (*KVPair, error) {
57
58 val, err := c.redisAPI.Get(ctx, key).Result()
59 valBytes, _ := ToByte(val)
60 if err != nil {
61 return nil, nil
62 }
63 return NewKVPair(key, valBytes, "", 0, 0), nil
64}
65
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +053066const (
67 // Sorted set name for storing all keys
68 keysSetName = "voltha:keys"
69)
serkant.uluderyae5afeff2021-02-23 18:00:23 +030070
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +053071func (c *RedisClient) Put(ctx context.Context, key string, value interface{}) error {
serkant.uluderyae5afeff2021-02-23 18:00:23 +030072 // Validate that we can convert value to a string as etcd API expects a string
73 var val string
74 var er error
75 if val, er = ToString(value); er != nil {
76 return fmt.Errorf("unexpected-type-%T", value)
77 }
78
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +053079 // Use a pipeline for atomic operations
80 pipe := c.redisAPI.TxPipeline()
serkant.uluderyae5afeff2021-02-23 18:00:23 +030081
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +053082 // Set the key-value pair
83 pipe.Set(ctx, key, val, 0)
84
85 // Add key to sorted set with score 0 for efficient prefix queries
86 pipe.ZAdd(ctx, keysSetName, &redis.Z{
87 Score: 0,
88 Member: key,
89 })
90
91 // Execute pipeline
92 cmds, err := pipe.Exec(ctx)
serkant.uluderyae5afeff2021-02-23 18:00:23 +030093 if err != nil {
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +053094 switch err {
serkant.uluderyae5afeff2021-02-23 18:00:23 +030095 case context.Canceled:
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +053096 logger.Warnw(ctx, "redis put failed : context-cancelled", log.Fields{"error": err, "key": key})
serkant.uluderyae5afeff2021-02-23 18:00:23 +030097 case context.DeadlineExceeded:
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +053098 logger.Warnw(ctx, "redis put failed : context-deadline-exceeded", log.Fields{"error": err, "key": key})
serkant.uluderyae5afeff2021-02-23 18:00:23 +030099 default:
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +0530100 logger.Warnw(ctx, "redis put failed : bad-endpoints", log.Fields{"error": err, "key": key})
serkant.uluderyae5afeff2021-02-23 18:00:23 +0300101 }
102 return err
103 }
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +0530104
105 // Check individual command errors
106 for _, cmd := range cmds {
107 if cmd.Err() != nil {
108 logger.Warnw(ctx, "redis-pipeline-command-failed", log.Fields{"error": cmd.Err(), "key": key, "Cmd": cmd.String()})
109 return cmd.Err()
110 }
111 }
serkant.uluderyae5afeff2021-02-23 18:00:23 +0300112 return nil
113}
114
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +0530115func (c *RedisClient) Delete(ctx context.Context, key string) error {
116 // Use a pipeline for atomic operations
117 pipe := c.redisAPI.TxPipeline()
serkant.uluderyae5afeff2021-02-23 18:00:23 +0300118
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +0530119 // Delete the key
120 pipe.Del(ctx, key)
121
122 // Remove key from sorted set
123 pipe.ZRem(ctx, keysSetName, key)
124
125 // Execute pipeline
126 cmds, err := pipe.Exec(ctx)
127 if err != nil {
128 logger.Errorw(ctx, "failed-to-delete-key", log.Fields{"key": key, "error": err})
129 return err
130 }
131 // Check individual command errors
132 for _, cmd := range cmds {
133 if cmd.Err() != nil && cmd.Err() != redis.Nil {
134 logger.Warnw(ctx, "redis-pipeline-command-failed", log.Fields{"error": cmd.Err(), "key": key, "Cmd": cmd.String()})
135 return cmd.Err()
136 }
137 }
138
139 logger.Debugw(ctx, "key-deleted", log.Fields{"key": key})
140 return nil
141}
142
143func (c *RedisClient) DeleteWithPrefix(ctx context.Context, prefixKey string) error {
144 var keys []string
145 var err error
146
147 // Use ZRANGEBYLEX to get keys with prefix efficiently
148 if keys, err = c.getKeysWithPrefixFromSortedSet(ctx, prefixKey); err != nil {
149 return err
150 }
151
152 if len(keys) == 0 {
153 logger.Warn(ctx, "nothing-to-delete-from-kv", log.Fields{"key": prefixKey})
154 return nil
155 }
156
157 // Delete keys in batches using pipeline
158 entryCount := int64(0)
159 start := 0
160 length := len(keys)
161
162 for start < length {
163 end := start + c.pageSize
164 if end >= length {
165 end = length
166 }
167 keysToDelete := keys[start:end]
168
169 pipe := c.redisAPI.TxPipeline()
170
171 // Delete the actual key-value pairs
172 pipe.Del(ctx, keysToDelete...)
173
174 // Remove keys from sorted set
175 members := make([]interface{}, len(keysToDelete))
176 for i, k := range keysToDelete {
177 members[i] = k
178 }
179 pipe.ZRem(ctx, keysSetName, members...)
180
181 // Execute pipeline
182 cmds, err := pipe.Exec(ctx)
183 if err != nil {
184 logger.Errorw(ctx, "DeleteWithPrefix method failed", log.Fields{"prefixKey": prefixKey, "numOfMatchedKeys": len(keysToDelete), "err": err})
185 return err
186 }
187 // Check individual command errors
188 for _, cmd := range cmds {
189 if cmd.Err() != nil && cmd.Err() != redis.Nil {
190 logger.Warnw(ctx, "redis-pipeline-command-failed", log.Fields{"error": cmd.Err(), "key": prefixKey, "Cmd": cmd.String()})
191 return cmd.Err()
192 }
193 }
194
195 // First command is Del, get its result
196 if len(cmds) > 0 {
197 if delCmd, ok := cmds[0].(*redis.IntCmd); ok {
198 count, _ := delCmd.Result()
199 entryCount += count
200 }
201 }
202
203 start = end
204 }
205
206 logger.Debugf(ctx, "%d entries matching with the key prefix %s have been deleted successfully", entryCount, prefixKey)
207 return nil
208}
209
210func (c *RedisClient) GetWithPrefix(ctx context.Context, prefix string) (map[string]*KVPair, error) {
211 var err error
212 var keys []string
213 m := make(map[string]*KVPair)
214 var values []interface{}
215
216 // Use ZRANGEBYLEX to get keys with prefix efficiently
217 if keys, err = c.getKeysWithPrefixFromSortedSet(ctx, prefix); err != nil {
218 return nil, err
219 }
220
221 if len(keys) != 0 {
222 values, err = c.redisAPI.MGet(ctx, keys...).Result()
serkant.uluderyae5afeff2021-02-23 18:00:23 +0300223 if err != nil {
224 return nil, err
225 }
serkant.uluderyae5afeff2021-02-23 18:00:23 +0300226 }
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +0530227
228 for i, key := range keys {
229 if valBytes, err := ToByte(values[i]); err == nil {
230 m[key] = NewKVPair(key, interface{}(valBytes), "", 0, 0)
231 }
232 }
233 return m, nil
234}
235
236func (c *RedisClient) GetWithPrefixKeysOnly(ctx context.Context, prefix string) ([]string, error) {
237 // Use ZRANGEBYLEX to get keys with prefix efficiently
238 keys, err := c.getKeysWithPrefixFromSortedSet(ctx, prefix)
239 if err != nil {
240 return nil, fmt.Errorf("failed to get keys with prefix %s: %v", prefix, err)
241 }
242
243 if len(keys) == 0 {
244 logger.Debugw(ctx, "no-keys-found", log.Fields{"prefix": prefix})
245 }
246
247 return keys, nil
248}
249
250// Helper function to retrieve keys with prefix using ZRANGEBYLEX
251func (c *RedisClient) getKeysWithPrefixFromSortedSet(ctx context.Context, prefix string) ([]string, error) {
252 // ZRANGEBYLEX uses lexicographical ordering
253 // min: [prefix - includes prefix
254 // max: [prefix\xff - includes all keys starting with prefix
255 min := "[" + prefix
256 max := "[" + prefix + "\xff"
257
258 keys, err := c.redisAPI.ZRangeByLex(ctx, keysSetName, &redis.ZRangeBy{
259 Min: min,
260 Max: max,
261 }).Result()
262
263 if err != nil {
264 logger.Errorw(ctx, "failed-to-get-keys-with-prefix", log.Fields{"prefix": prefix, "error": err})
265 return nil, err
266 }
267
268 logger.Debugw(ctx, "keys-retrieved-with-prefix", log.Fields{"prefix": prefix, "count": len(keys)})
269 return keys, nil
serkant.uluderyae5afeff2021-02-23 18:00:23 +0300270}
271
Sridhar Ravindra037d7cd2025-10-23 12:52:50 +0530272func (c *RedisClient) KeyExists(ctx context.Context, key string) (bool, error) {
273 var err error
274 var keyCount int64
275
276 if keyCount, err = c.redisAPI.Exists(ctx, key).Result(); err != nil {
277 return false, err
278 }
279 if keyCount > 0 {
280 return true, nil
281 }
282 return false, nil
283}
284
serkant.uluderyae5afeff2021-02-23 18:00:23 +0300285func (c *RedisClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
286 var err error
287 var keys []string
288 m := make(map[string]*KVPair)
289 var values []interface{}
290
Sridhar Ravindra9f26bde2026-03-25 13:13:35 +0530291 // Use ZRANGEBYLEX to get keys with prefix efficiently
292 if keys, err = c.getKeysWithPrefixFromSortedSet(ctx, key); err != nil {
serkant.uluderyae5afeff2021-02-23 18:00:23 +0300293 return nil, err
294 }
295
296 if len(keys) != 0 {
297 values, err = c.redisAPI.MGet(ctx, keys...).Result()
298 if err != nil {
299 return nil, err
300 }
301 }
302 for i, key := range keys {
303 if valBytes, err := ToByte(values[i]); err == nil {
304 m[key] = NewKVPair(key, interface{}(valBytes), "", 0, 0)
305 }
306 }
307 return m, nil
308}
309
serkant.uluderyae5afeff2021-02-23 18:00:23 +0300310func (c *RedisClient) Reserve(ctx context.Context, key string, value interface{}, ttl time.Duration) (interface{}, error) {
311 var val string
312 var er error
313 if val, er = ToString(value); er != nil {
314 return nil, fmt.Errorf("unexpected-type%T", value)
315 }
316
317 // SetNX -- Only set the key if it does not already exist.
318 c.redisAPI.SetNX(ctx, key, value, ttl)
319
320 // Check if set is successful
321 redisVal := c.redisAPI.Get(ctx, key).Val()
322 if redisVal == "" {
323 println("NULL")
324 return nil, nil
325 }
326
327 if val == redisVal {
328 // set is successful, return new reservation value
329 c.keyReservationsLock.Lock()
330 c.keyReservations[key] = ttl
331 c.keyReservationsLock.Unlock()
332 bytes, _ := ToByte(val)
333 return bytes, nil
334 } else {
335 // set is not successful, return existing reservation value
336 bytes, _ := ToByte(redisVal)
337 return bytes, nil
338 }
339
340}
341
342func (c *RedisClient) ReleaseReservation(ctx context.Context, key string) error {
343
344 redisVal := c.redisAPI.Get(ctx, key).Val()
345 if redisVal == "" {
346 return nil
347 }
348
349 // Override SetNX value with no TTL
350 _, err := c.redisAPI.Set(ctx, key, redisVal, 0).Result()
351 if err != nil {
352 delete(c.keyReservations, key)
353 } else {
354 return err
355 }
356 return nil
357
358}
359
360func (c *RedisClient) ReleaseAllReservations(ctx context.Context) error {
361 c.writeLock.Lock()
362 defer c.writeLock.Unlock()
363 for key := range c.keyReservations {
364 err := c.ReleaseReservation(ctx, key)
365 if err != nil {
366 logger.Errorw(ctx, "cannot-release-reservation", log.Fields{"key": key, "error": err})
367 return err
368 }
369 }
370 return nil
371}
372
373func (c *RedisClient) RenewReservation(ctx context.Context, key string) error {
374 c.writeLock.Lock()
375 defer c.writeLock.Unlock()
376
377 // Verify the key was reserved
378 ttl, ok := c.keyReservations[key]
379 if !ok {
380 return errors.New("key-not-reserved. Key not found")
381 }
382
383 redisVal := c.redisAPI.Get(ctx, key).Val()
384 if redisVal != "" {
385 c.redisAPI.Set(ctx, key, redisVal, ttl)
386 }
387 return nil
388}
389
390func (c *RedisClient) listenForKeyChange(ctx context.Context, redisCh <-chan *redis.Message, ch chan<- *Event, cancel context.CancelFunc) {
391 logger.Debug(ctx, "start-listening-on-channel ...")
392 defer cancel()
393 defer close(ch)
394 for msg := range redisCh {
395 words := strings.Split(msg.Channel, ":")
396 key := words[1]
397 msgType := getMessageType(msg.Payload)
398 var valBytes []byte
399 if msgType == PUT {
400 ev, _ := c.Get(ctx, key)
401 valBytes, _ = ToByte(ev.Value)
402 }
403 ch <- NewEvent(getMessageType(msg.Payload), []byte(key), valBytes, 0)
404 }
405 logger.Debug(ctx, "stop-listening-on-channel ...")
406}
407
408func getMessageType(msg string) int {
409 isPut := strings.HasSuffix(msg, "set")
410 isDel := strings.HasSuffix(msg, "del")
411 if isPut {
412 return PUT
413 } else if isDel {
414 return DELETE
415 } else {
416 return UNKNOWN
417 }
418}
419
420func (c *RedisClient) addChannelMap(key string, channelMap map[chan *Event]*redis.PubSub) []map[chan *Event]*redis.PubSub {
421
422 var channels interface{}
423 var exists bool
424
425 if channels, exists = c.watchedChannels.Load(key); exists {
426 channels = append(channels.([]map[chan *Event]*redis.PubSub), channelMap)
427 } else {
428 channels = []map[chan *Event]*redis.PubSub{channelMap}
429 }
430 c.watchedChannels.Store(key, channels)
431
432 return channels.([]map[chan *Event]*redis.PubSub)
433}
434
435func (c *RedisClient) removeChannelMap(key string, pos int) []map[chan *Event]*redis.PubSub {
436 var channels interface{}
437 var exists bool
438
439 if channels, exists = c.watchedChannels.Load(key); exists {
440 channels = append(channels.([]map[chan *Event]*redis.PubSub)[:pos], channels.([]map[chan *Event]*redis.PubSub)[pos+1:]...)
441 c.watchedChannels.Store(key, channels)
442 }
443
444 return channels.([]map[chan *Event]*redis.PubSub)
445}
446
447func (c *RedisClient) getChannelMaps(key string) ([]map[chan *Event]*redis.PubSub, bool) {
448 var channels interface{}
449 var exists bool
450
451 channels, exists = c.watchedChannels.Load(key)
452
453 if channels == nil {
454 return nil, exists
455 }
456
457 return channels.([]map[chan *Event]*redis.PubSub), exists
458}
459
460func (c *RedisClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
461
462 ctx, cancel := context.WithCancel(ctx)
463
464 var subscribePath string
465 subscribePath = "__key*__:" + key
466 if withPrefix {
467 subscribePath += "*"
468 }
469 pubsub := c.redisAPI.PSubscribe(ctx, subscribePath)
470 redisCh := pubsub.Channel()
471
472 // Create new channel
473 ch := make(chan *Event, maxClientChannelBufferSize)
474
475 // Keep track of the created channels so they can be closed when required
476 channelMap := make(map[chan *Event]*redis.PubSub)
477 channelMap[ch] = pubsub
478
479 channelMaps := c.addChannelMap(key, channelMap)
480 logger.Debugw(ctx, "watched-channels", log.Fields{"len": len(channelMaps)})
481
482 // Launch a go routine to listen for updates
483 go c.listenForKeyChange(ctx, redisCh, ch, cancel)
484 return ch
485}
486
487func (c *RedisClient) CloseWatch(ctx context.Context, key string, ch chan *Event) {
488 // Get the array of channels mapping
489 var watchedChannels []map[chan *Event]*redis.PubSub
490 var ok bool
491
492 if watchedChannels, ok = c.getChannelMaps(key); !ok {
493 logger.Warnw(ctx, "key-has-no-watched-channels", log.Fields{"key": key})
494 return
495 }
496 // Look for the channels
497 var pos = -1
498 for i, chMap := range watchedChannels {
499 if t, ok := chMap[ch]; ok {
500 logger.Debug(ctx, "channel-found")
501 // Close the Redis watcher before the client channel. This should close the etcd channel as well
502 if err := t.Close(); err != nil {
503 logger.Errorw(ctx, "watcher-cannot-be-closed", log.Fields{"key": key, "error": err})
504 }
505 pos = i
506 break
507 }
508 }
509
510 channelMaps, _ := c.getChannelMaps(key)
511 // Remove that entry if present
512 if pos >= 0 {
513 channelMaps = c.removeChannelMap(key, pos)
514 }
515 logger.Infow(ctx, "watcher-channel-exiting", log.Fields{"key": key, "channel": channelMaps})
516}
517func (c *RedisClient) AcquireLock(ctx context.Context, lockName string, timeout time.Duration) error {
518 return nil
519}
520
521func (c *RedisClient) ReleaseLock(lockName string) error {
522 return nil
523}
524
525func (c *RedisClient) IsConnectionUp(ctx context.Context) bool {
526 if _, err := c.redisAPI.Set(ctx, "connection-check", "1", 0).Result(); err != nil {
527 return false
528 }
529 return true
530
531}
532
533func (c *RedisClient) Close(ctx context.Context) {
534 if err := c.redisAPI.Close(); err != nil {
535 logger.Errorw(ctx, "error-closing-client", log.Fields{"error": err})
536 }
537}