(VOL-4959) Introduce retry mechanism when read/write to KAFKA for openonu-adapter

Change-Id: I5c3f566b144ecec9d71d0b1461ca1823468b4845
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/cmd/openonu-adapter/main.go b/cmd/openonu-adapter/main.go
index a21629f..9bccc93 100644
--- a/cmd/openonu-adapter/main.go
+++ b/cmd/openonu-adapter/main.go
@@ -106,7 +106,7 @@
 	go conf.StartLogLevelConfigProcessing(cm, ctx)
 
 	// Setup Kafka Client
-	if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
+	if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config); err != nil {
 		logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
 	}
 
@@ -244,19 +244,19 @@
 	return nil, errors.New("unsupported-kv-store")
 }
 
-func newKafkaClient(ctx context.Context, clientType, addr string) (kafka.Client, error) {
+func newKafkaClient(ctx context.Context, clientType string, config *config.AdapterFlags) (kafka.Client, error) {
 
 	logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
 
 	switch clientType {
 	case "sarama":
 		return kafka.NewSaramaClient(
-			kafka.Address(addr),
+			kafka.Address(config.KafkaClusterAddress),
 			kafka.ProducerReturnOnErrors(true),
 			kafka.ProducerReturnOnSuccess(true),
-			kafka.ProducerMaxRetries(6),
+			kafka.ProducerMaxRetries(config.ProducerRetryMax),
 			kafka.ProducerRetryBackoff(time.Millisecond*30),
-			kafka.MetadatMaxRetries(15)), nil
+			kafka.MetadatMaxRetries(config.MetadataRetryMax)), nil
 	}
 
 	return nil, errors.New("unsupported-client-type")