(VOL-4959) Introduce retry mechanism when read/write to KAFKA

Change-Id: Ie082afa8f2caff446e13751d4447fb793ab9cf76
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index a7dc890..ba0e564 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -63,6 +63,8 @@
 	defaultCheckOnuDevExistenceAtOnuDiscovery = false
 	defaultForceOnuDiscIndProcessing          = false
 	defaultMaxRetries                         = 10
+	defaultProducerRetryMax                   = 10
+	defaultMetadataRetryMax                   = 15
 )
 
 // AdapterFlags represents the set of configurations used by the read-write adaptercore service
@@ -103,6 +105,8 @@
 	EnableGEMStats                     bool
 	CheckOnuDevExistenceAtOnuDiscovery bool
 	ForceOnuDiscIndProcessing          bool
+	ProducerRetryMax                   int
+	MetadataRetryMax                   int
 }
 
 // NewAdapterFlags returns a new RWCore config
@@ -139,6 +143,8 @@
 		CheckOnuDevExistenceAtOnuDiscovery: defaultCheckOnuDevExistenceAtOnuDiscovery,
 		ForceOnuDiscIndProcessing:          defaultForceOnuDiscIndProcessing,
 		MaxRetries:                         defaultMaxRetries,
+		ProducerRetryMax:                   defaultProducerRetryMax,
+		MetadataRetryMax:                   defaultMetadataRetryMax,
 	}
 	return &adapterFlags
 }
@@ -308,6 +314,14 @@
 		"force_onu_disc_processing",
 		defaultForceOnuDiscIndProcessing,
 		"Skip the check for onu device existence on onu discovery")
+	flag.IntVar(&(so.ProducerRetryMax),
+		"producer_retry_max",
+		defaultProducerRetryMax,
+		"This option specifies the maximum number of times the producer will retry sending messages before giving up")
+	flag.IntVar(&(so.MetadataRetryMax),
+		"metadata_retry_max",
+		defaultMetadataRetryMax,
+		"This option specifies the maximum number of times retry to receive messages before giving up")
 
 	flag.Parse()
 	containerName := getContainerInfo()