[VOL-4959] kvStore with retry option
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
Change-Id: Iaf4aa96d29fdbfdbe9bf3c32cb6f91b36b1c73f5
diff --git a/VERSION b/VERSION
index bfe365e..fca8580 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-7.7.1
\ No newline at end of file
+7.7.2
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index 6828700..b1080ef 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -36,8 +36,10 @@
)
const (
- defaultMaxPoolCapacity = 1000 // Default size of an Etcd Client pool
- defaultMaxPoolUsage = 100 // Maximum concurrent request an Etcd Client is allowed to process
+ defaultMaxPoolCapacity = 1000 // Default size of an Etcd Client pool
+ defaultMaxPoolUsage = 100 // Maximum concurrent request an Etcd Client is allowed to process
+ defaultMaxAttempts = 10 // Default number of attempts to retry an operation
+ defaultOperationContextTimeout = 3 * time.Second // Default context timeout for operations
)
// EtcdClient represents the Etcd KV store client
@@ -152,35 +154,49 @@
defer c.pool.Put(client)
attempt := 0
-
startLoop:
for {
- resp, err := client.Get(ctx, key)
+ retryCtx, cancel := context.WithTimeout(ctx, defaultOperationContextTimeout)
+ resp, err := client.Get(retryCtx, key)
+ cancel()
+ if attempt >= defaultMaxAttempts {
+ logger.Warnw(ctx, "get-retries-exceeded", log.Fields{"key": key, "error": err, "attempt": attempt})
+ return nil, err
+ }
if err != nil {
switch err {
case context.Canceled:
+ // Check if the parent context was cancelled, if so don't retry
+ if ctx.Err() != nil {
+ logger.Warnw(ctx, "parent-context-cancelled", log.Fields{"error": err})
+ return nil, err
+ }
+ // Otherwise retry
logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
case context.DeadlineExceeded:
- logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "context": ctx})
+ logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "attempt": attempt})
case v3rpcTypes.ErrEmptyKey:
logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
+ return nil, err
case v3rpcTypes.ErrLeaderChanged,
v3rpcTypes.ErrGRPCNoLeader,
v3rpcTypes.ErrTimeout,
v3rpcTypes.ErrTimeoutDueToLeaderFail,
v3rpcTypes.ErrTimeoutDueToConnectionLost:
// Retry for these server errors
- attempt += 1
- if er := backoff(ctx, attempt); er != nil {
- logger.Warnw(ctx, "get-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
- return nil, err
- }
- logger.Warnw(ctx, "retrying-get", log.Fields{"key": key, "error": err, "attempt": attempt})
- goto startLoop
- default:
logger.Warnw(ctx, "etcd-server-error", log.Fields{"error": err})
+ default:
+ logger.Warnw(ctx, "etcd-unknown-error", log.Fields{"error": err})
}
- return nil, err
+
+ // Common retry logic for all error cases
+ attempt++
+ if er := backoff(ctx, attempt); er != nil {
+ logger.Warnw(ctx, "get-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
+ return nil, err
+ }
+ logger.Warnw(ctx, "retrying-get", log.Fields{"key": key, "error": err, "attempt": attempt})
+ goto startLoop
}
for _, ev := range resp.Kvs {
@@ -246,7 +262,6 @@
// accepts only a string as a value for a put operation. Timeout defines how long the function will
// wait for a response
func (c *EtcdClient) Put(ctx context.Context, key string, value interface{}) error {
-
// Validate that we can convert value to a string as etcd API expects a string
var val string
var err error
@@ -263,32 +278,47 @@
attempt := 0
startLoop:
for {
- _, err = client.Put(ctx, key, val)
+ retryCtx, cancel := context.WithTimeout(ctx, defaultOperationContextTimeout)
+ _, err = client.Put(retryCtx, key, val)
+ cancel()
+ if attempt >= defaultMaxAttempts {
+ logger.Warnw(ctx, "put-retries-exceeded", log.Fields{"key": key, "error": err, "attempt": attempt})
+ return err
+ }
if err != nil {
switch err {
case context.Canceled:
+ // Check if the parent context was cancelled, if so don't retry
+ if ctx.Err() != nil {
+ logger.Warnw(ctx, "parent-context-cancelled", log.Fields{"error": err})
+ return err
+ }
+ // Otherwise retry
logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
case context.DeadlineExceeded:
- logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "context": ctx})
+ logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "attempt": attempt})
case v3rpcTypes.ErrEmptyKey:
logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
+ return err
case v3rpcTypes.ErrLeaderChanged,
v3rpcTypes.ErrGRPCNoLeader,
v3rpcTypes.ErrTimeout,
v3rpcTypes.ErrTimeoutDueToLeaderFail,
v3rpcTypes.ErrTimeoutDueToConnectionLost:
// Retry for these server errors
- attempt += 1
- if er := backoff(ctx, attempt); er != nil {
- logger.Warnw(ctx, "put-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
- return err
- }
- logger.Warnw(ctx, "retrying-put", log.Fields{"key": key, "error": err, "attempt": attempt})
- goto startLoop
- default:
logger.Warnw(ctx, "etcd-server-error", log.Fields{"error": err})
+ default:
+ logger.Warnw(ctx, "etcd-unknown-error", log.Fields{"error": err})
}
- return err
+
+ // Common retry logic for all error cases
+ attempt++
+ if er := backoff(ctx, attempt); er != nil {
+ logger.Warnw(ctx, "put-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
+ return err
+ }
+ logger.Warnw(ctx, "retrying-put", log.Fields{"key": key, "error": err, "attempt": attempt})
+ goto startLoop
}
return nil
}
@@ -306,32 +336,47 @@
attempt := 0
startLoop:
for {
- _, err = client.Delete(ctx, key)
+ retryCtx, cancel := context.WithTimeout(ctx, defaultOperationContextTimeout)
+ _, err = client.Delete(retryCtx, key)
+ cancel()
+ if attempt >= defaultMaxAttempts {
+ logger.Warnw(ctx, "delete-retries-exceeded", log.Fields{"key": key, "error": err, "attempt": attempt})
+ return err
+ }
if err != nil {
switch err {
case context.Canceled:
+ // Check if the parent context was cancelled, if so don't retry
+ if ctx.Err() != nil {
+ logger.Warnw(ctx, "parent-context-cancelled", log.Fields{"error": err})
+ return err
+ }
+ // Otherwise retry
logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
case context.DeadlineExceeded:
- logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "context": ctx})
+ logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "attempt": attempt})
case v3rpcTypes.ErrEmptyKey:
logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
+ return err
case v3rpcTypes.ErrLeaderChanged,
v3rpcTypes.ErrGRPCNoLeader,
v3rpcTypes.ErrTimeout,
v3rpcTypes.ErrTimeoutDueToLeaderFail,
v3rpcTypes.ErrTimeoutDueToConnectionLost:
// Retry for these server errors
- attempt += 1
- if er := backoff(ctx, attempt); er != nil {
- logger.Warnw(ctx, "delete-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
- return err
- }
- logger.Warnw(ctx, "retrying-delete", log.Fields{"key": key, "error": err, "attempt": attempt})
- goto startLoop
- default:
logger.Warnw(ctx, "etcd-server-error", log.Fields{"error": err})
+ default:
+ logger.Warnw(ctx, "etcd-unknown-error", log.Fields{"error": err})
}
- return err
+
+ // Common retry logic for all error cases
+ attempt++
+ if er := backoff(ctx, attempt); er != nil {
+ logger.Warnw(ctx, "delete-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
+ return err
+ }
+ logger.Warnw(ctx, "retrying-delete", log.Fields{"key": key, "error": err, "attempt": attempt})
+ goto startLoop
}
logger.Debugw(ctx, "key(s)-deleted", log.Fields{"key": key})
return nil