(VOL-4959) Introduce retry mechanism when read/write to KAFKA for voltha-go

Change-Id: I21a63fd2f0969c04f6cbbf567ff631a25caef197
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/VERSION b/VERSION
index 33e5103..f9fb0e4 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.6.19
+3.6.20
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 8c53006..4356f1f 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,7 +24,9 @@
 
 // RW Core service default constants
 const (
-	KVStoreName = "etcd"
+	KVStoreName             = "etcd"
+	defaultProducerRetryMax = 10
+	defaultMetadataRetryMax = 15
 )
 
 // RWCoreFlags represents the set of configurations used by the read-write core service
@@ -62,6 +64,8 @@
 	DisplayVersionOnly          bool
 	TraceEnabled                bool
 	LogCorrelationEnabled       bool
+	ProducerRetryMax            int
+	MetadataRetryMax            int
 }
 
 // ParseCommandArguments parses the arguments when running read-write core service
@@ -211,5 +215,13 @@
 		"max_grpc_client_retry",
 		0,
 		"The maximum number of times olt adaptor will retry in case grpc request timeouts")
+	fs.IntVar(&cf.ProducerRetryMax,
+		"producer_retry_max",
+		defaultProducerRetryMax,
+		"This option specifies the maximum number of times the producer will retry sending messages before giving up")
+	fs.IntVar(&cf.MetadataRetryMax,
+		"metadata_retry_max",
+		defaultMetadataRetryMax,
+		"This option specifies the maximum number of times retry to receive messages before giving up")
 	_ = fs.Parse(args)
 }
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index a8490bf..0979592 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -75,10 +75,10 @@
 		kafka.Address(cf.KafkaClusterAddress),
 		kafka.ProducerReturnOnErrors(true),
 		kafka.ProducerReturnOnSuccess(true),
-		kafka.ProducerMaxRetries(6),
+		kafka.ProducerMaxRetries(cf.ProducerRetryMax),
 		kafka.ProducerRetryBackoff(time.Millisecond*30),
 		kafka.AutoCreateTopic(true),
-		kafka.MetadatMaxRetries(15),
+		kafka.MetadatMaxRetries(cf.MetadataRetryMax),
 	)
 
 	// new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called