(VOL-4959) Introduce retry mechanism when read/write to KAFKA

Change-Id: Ie082afa8f2caff446e13751d4447fb793ab9cf76
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/VERSION b/VERSION
index fe10d56..910b97b 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-4.5.18
+4.5.19
diff --git a/cmd/openolt-adapter/main.go b/cmd/openolt-adapter/main.go
index fbe7880..af8b52b 100644
--- a/cmd/openolt-adapter/main.go
+++ b/cmd/openolt-adapter/main.go
@@ -114,7 +114,7 @@
 	go conf.StartLogFeaturesConfigProcessing(cm, ctx)
 
 	// Setup Kafka Client
-	if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
+	if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config); err != nil {
 		logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
 	}
 
@@ -312,17 +312,17 @@
 	return nil, errors.New("unsupported-kv-store")
 }
 
-func newKafkaClient(ctx context.Context, clientType, address string) (kafka.Client, error) {
+func newKafkaClient(ctx context.Context, clientType string, config *config.AdapterFlags) (kafka.Client, error) {
 	logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
 	switch clientType {
 	case "sarama":
 		return kafka.NewSaramaClient(
-			kafka.Address(address),
+			kafka.Address(config.KafkaClusterAddress),
 			kafka.ProducerReturnOnErrors(true),
 			kafka.ProducerReturnOnSuccess(true),
-			kafka.ProducerMaxRetries(6),
+			kafka.ProducerMaxRetries(config.ProducerRetryMax),
 			kafka.ProducerRetryBackoff(time.Millisecond*30),
-			kafka.MetadatMaxRetries(15)), nil
+			kafka.MetadatMaxRetries(config.MetadataRetryMax)), nil
 	}
 
 	return nil, errors.New("unsupported-client-type")
diff --git a/cmd/openolt-adapter/main_test.go b/cmd/openolt-adapter/main_test.go
index 3f7e52e..5618cfc 100644
--- a/cmd/openolt-adapter/main_test.go
+++ b/cmd/openolt-adapter/main_test.go
@@ -123,7 +123,7 @@
 	adapter := newMockAdapter()
 	type args struct {
 		clientType string
-		address    string
+		config     *config.AdapterFlags
 	}
 	tests := []struct {
 		name    string
@@ -131,12 +131,12 @@
 		wantErr bool
 	}{
 		// TODO: Add test cases.
-		{"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaClusterAddress}, false},
-		{"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaClusterAddress}, false},
+		{"newKafkaClient", args{clientType: "sarama", config: adapter.config}, false},
+		{"newKafkaClient", args{clientType: "sarama", config: adapter.config}, false},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			_, err := newKafkaClient(context.Background(), tt.args.clientType, tt.args.address)
+			_, err := newKafkaClient(context.Background(), tt.args.clientType, tt.args.config)
 			if (err != nil) != tt.wantErr {
 				t.Errorf("newKafkaClient() error = %v, wantErr %v", err, tt.wantErr)
 				return
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index a7dc890..ba0e564 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -63,6 +63,8 @@
 	defaultCheckOnuDevExistenceAtOnuDiscovery = false
 	defaultForceOnuDiscIndProcessing          = false
 	defaultMaxRetries                         = 10
+	defaultProducerRetryMax                   = 10
+	defaultMetadataRetryMax                   = 15
 )
 
 // AdapterFlags represents the set of configurations used by the read-write adaptercore service
@@ -103,6 +105,8 @@
 	EnableGEMStats                     bool
 	CheckOnuDevExistenceAtOnuDiscovery bool
 	ForceOnuDiscIndProcessing          bool
+	ProducerRetryMax                   int
+	MetadataRetryMax                   int
 }
 
 // NewAdapterFlags returns a new RWCore config
@@ -139,6 +143,8 @@
 		CheckOnuDevExistenceAtOnuDiscovery: defaultCheckOnuDevExistenceAtOnuDiscovery,
 		ForceOnuDiscIndProcessing:          defaultForceOnuDiscIndProcessing,
 		MaxRetries:                         defaultMaxRetries,
+		ProducerRetryMax:                   defaultProducerRetryMax,
+		MetadataRetryMax:                   defaultMetadataRetryMax,
 	}
 	return &adapterFlags
 }
@@ -308,6 +314,14 @@
 		"force_onu_disc_processing",
 		defaultForceOnuDiscIndProcessing,
 		"Skip the check for onu device existence on onu discovery")
+	flag.IntVar(&(so.ProducerRetryMax),
+		"producer_retry_max",
+		defaultProducerRetryMax,
+		"This option specifies the maximum number of times the producer will retry sending messages before giving up")
+	flag.IntVar(&(so.MetadataRetryMax),
+		"metadata_retry_max",
+		defaultMetadataRetryMax,
+		"This option specifies the maximum number of times retry to receive messages before giving up")
 
 	flag.Parse()
 	containerName := getContainerInfo()