(VOL-4959) Introduce retry mechanism when read/write to KAFKA for voltha-go
Change-Id: I21a63fd2f0969c04f6cbbf567ff631a25caef197
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/VERSION b/VERSION
index 33e5103..f9fb0e4 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.6.19
+3.6.20
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 8c53006..4356f1f 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,7 +24,9 @@
// RW Core service default constants
const (
- KVStoreName = "etcd"
+ KVStoreName = "etcd"
+ defaultProducerRetryMax = 10
+ defaultMetadataRetryMax = 15
)
// RWCoreFlags represents the set of configurations used by the read-write core service
@@ -62,6 +64,8 @@
DisplayVersionOnly bool
TraceEnabled bool
LogCorrelationEnabled bool
+ ProducerRetryMax int
+ MetadataRetryMax int
}
// ParseCommandArguments parses the arguments when running read-write core service
@@ -211,5 +215,13 @@
"max_grpc_client_retry",
0,
"The maximum number of times olt adaptor will retry in case grpc request timeouts")
+ fs.IntVar(&cf.ProducerRetryMax,
+ "producer_retry_max",
+ defaultProducerRetryMax,
+ "This option specifies the maximum number of times the producer will retry sending messages before giving up")
+ fs.IntVar(&cf.MetadataRetryMax,
+ "metadata_retry_max",
+ defaultMetadataRetryMax,
+ "This option specifies the maximum number of times retry to receive messages before giving up")
_ = fs.Parse(args)
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index a8490bf..0979592 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -75,10 +75,10 @@
kafka.Address(cf.KafkaClusterAddress),
kafka.ProducerReturnOnErrors(true),
kafka.ProducerReturnOnSuccess(true),
- kafka.ProducerMaxRetries(6),
+ kafka.ProducerMaxRetries(cf.ProducerRetryMax),
kafka.ProducerRetryBackoff(time.Millisecond*30),
kafka.AutoCreateTopic(true),
- kafka.MetadatMaxRetries(15),
+ kafka.MetadatMaxRetries(cf.MetadataRetryMax),
)
// new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called