(VOL-4959) Introduce retry mechanism when read/write to KAFKA for openonu-adapter
Change-Id: I5c3f566b144ecec9d71d0b1461ca1823468b4845
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/VERSION b/VERSION
index 02d7139..2e21b72 100755
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.12.30
+2.12.31
diff --git a/cmd/openonu-adapter/main.go b/cmd/openonu-adapter/main.go
index a21629f..9bccc93 100644
--- a/cmd/openonu-adapter/main.go
+++ b/cmd/openonu-adapter/main.go
@@ -106,7 +106,7 @@
go conf.StartLogLevelConfigProcessing(cm, ctx)
// Setup Kafka Client
- if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
+ if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config); err != nil {
logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
}
@@ -244,19 +244,19 @@
return nil, errors.New("unsupported-kv-store")
}
-func newKafkaClient(ctx context.Context, clientType, addr string) (kafka.Client, error) {
+func newKafkaClient(ctx context.Context, clientType string, config *config.AdapterFlags) (kafka.Client, error) {
logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
switch clientType {
case "sarama":
return kafka.NewSaramaClient(
- kafka.Address(addr),
+ kafka.Address(config.KafkaClusterAddress),
kafka.ProducerReturnOnErrors(true),
kafka.ProducerReturnOnSuccess(true),
- kafka.ProducerMaxRetries(6),
+ kafka.ProducerMaxRetries(config.ProducerRetryMax),
kafka.ProducerRetryBackoff(time.Millisecond*30),
- kafka.MetadatMaxRetries(15)), nil
+ kafka.MetadatMaxRetries(config.MetadataRetryMax)), nil
}
return nil, errors.New("unsupported-client-type")
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index 04e4293..e951584 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -25,8 +25,10 @@
// Open ONU default constants
const (
- KVStoreName = "etcd"
- OnuVendorIds = "OPEN,ALCL,BRCM,TWSH,ALPH,ISKT,SFAA,BBSM,SCOM,ARPX,DACM,ERSN,HWTC,CIGG,ADTN,ARCA,AVMG,LEOX,ZYXE"
+ KVStoreName = "etcd"
+ OnuVendorIds = "OPEN,ALCL,BRCM,TWSH,ALPH,ISKT,SFAA,BBSM,SCOM,ARPX,DACM,ERSN,HWTC,CIGG,ADTN,ARCA,AVMG,LEOX,ZYXE"
+ defaultProducerRetryMax = 10
+ defaultMetadataRetryMax = 15
)
// AdapterFlags represents the set of configurations used by the read-write adaptercore service
@@ -77,6 +79,8 @@
ExtendedOmciSupportEnabled bool
SkipOnuConfig bool
CheckDeviceTechProfOnReboot bool
+ ProducerRetryMax int
+ MetadataRetryMax int
}
// ParseCommandArguments parses the arguments when running read-write adaptercore service
@@ -301,6 +305,14 @@
"max_grpc_client_retry",
0,
"The maximum number of times olt adaptor will retry in case grpc request timeouts")
+ fs.IntVar(&so.ProducerRetryMax,
+ "producer_retry_max",
+ defaultProducerRetryMax,
+ "This option specifies the maximum number of times the producer will retry sending messages before giving up")
+ fs.IntVar(&so.MetadataRetryMax,
+ "metadata_retry_max",
+ defaultMetadataRetryMax,
+ "This option specifies the maximum number of times retry to receive messages before giving up")
_ = fs.Parse(args)
containerName := getContainerInfo()
if len(containerName) > 0 {