(VOL-4959) Introduce retry mechanism when read/write to KAFKA for openonu-adapter

Change-Id: I5c3f566b144ecec9d71d0b1461ca1823468b4845
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index 04e4293..e951584 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -25,8 +25,10 @@
 
 // Open ONU default constants
 const (
-	KVStoreName  = "etcd"
-	OnuVendorIds = "OPEN,ALCL,BRCM,TWSH,ALPH,ISKT,SFAA,BBSM,SCOM,ARPX,DACM,ERSN,HWTC,CIGG,ADTN,ARCA,AVMG,LEOX,ZYXE"
+	KVStoreName             = "etcd"
+	OnuVendorIds            = "OPEN,ALCL,BRCM,TWSH,ALPH,ISKT,SFAA,BBSM,SCOM,ARPX,DACM,ERSN,HWTC,CIGG,ADTN,ARCA,AVMG,LEOX,ZYXE"
+	defaultProducerRetryMax = 10
+	defaultMetadataRetryMax = 15
 )
 
 // AdapterFlags represents the set of configurations used by the read-write adaptercore service
@@ -77,6 +79,8 @@
 	ExtendedOmciSupportEnabled  bool
 	SkipOnuConfig               bool
 	CheckDeviceTechProfOnReboot bool
+	ProducerRetryMax            int
+	MetadataRetryMax            int
 }
 
 // ParseCommandArguments parses the arguments when running read-write adaptercore service
@@ -301,6 +305,14 @@
 		"max_grpc_client_retry",
 		0,
 		"The maximum number of times olt adaptor will retry in case grpc request timeouts")
+	fs.IntVar(&so.ProducerRetryMax,
+		"producer_retry_max",
+		defaultProducerRetryMax,
+		"This option specifies the maximum number of times the producer will retry sending messages before giving up")
+	fs.IntVar(&so.MetadataRetryMax,
+		"metadata_retry_max",
+		defaultMetadataRetryMax,
+		"This option specifies the maximum number of times retry to receive messages before giving up")
 	_ = fs.Parse(args)
 	containerName := getContainerInfo()
 	if len(containerName) > 0 {