(VOL-4959) Introduce retry mechanism when read/write to KAFKA
Change-Id: Ie082afa8f2caff446e13751d4447fb793ab9cf76
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/cmd/openolt-adapter/main.go b/cmd/openolt-adapter/main.go
index fbe7880..af8b52b 100644
--- a/cmd/openolt-adapter/main.go
+++ b/cmd/openolt-adapter/main.go
@@ -114,7 +114,7 @@
go conf.StartLogFeaturesConfigProcessing(cm, ctx)
// Setup Kafka Client
- if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
+ if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config); err != nil {
logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
}
@@ -312,17 +312,17 @@
return nil, errors.New("unsupported-kv-store")
}
-func newKafkaClient(ctx context.Context, clientType, address string) (kafka.Client, error) {
+func newKafkaClient(ctx context.Context, clientType string, config *config.AdapterFlags) (kafka.Client, error) {
logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
switch clientType {
case "sarama":
return kafka.NewSaramaClient(
- kafka.Address(address),
+ kafka.Address(config.KafkaClusterAddress),
kafka.ProducerReturnOnErrors(true),
kafka.ProducerReturnOnSuccess(true),
- kafka.ProducerMaxRetries(6),
+ kafka.ProducerMaxRetries(config.ProducerRetryMax),
kafka.ProducerRetryBackoff(time.Millisecond*30),
- kafka.MetadatMaxRetries(15)), nil
+ kafka.MetadatMaxRetries(config.MetadataRetryMax)), nil
}
return nil, errors.New("unsupported-client-type")
diff --git a/cmd/openolt-adapter/main_test.go b/cmd/openolt-adapter/main_test.go
index 3f7e52e..5618cfc 100644
--- a/cmd/openolt-adapter/main_test.go
+++ b/cmd/openolt-adapter/main_test.go
@@ -123,7 +123,7 @@
adapter := newMockAdapter()
type args struct {
clientType string
- address string
+ config *config.AdapterFlags
}
tests := []struct {
name string
@@ -131,12 +131,12 @@
wantErr bool
}{
// TODO: Add test cases.
- {"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaClusterAddress}, false},
- {"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaClusterAddress}, false},
+ {"newKafkaClient", args{clientType: "sarama", config: adapter.config}, false},
+ {"newKafkaClient", args{clientType: "sarama", config: adapter.config}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- _, err := newKafkaClient(context.Background(), tt.args.clientType, tt.args.address)
+ _, err := newKafkaClient(context.Background(), tt.args.clientType, tt.args.config)
if (err != nil) != tt.wantErr {
t.Errorf("newKafkaClient() error = %v, wantErr %v", err, tt.wantErr)
return