(VOL-4959) Introduce retry mechanism when read/write to KAFKA
Change-Id: Ie082afa8f2caff446e13751d4447fb793ab9cf76
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index a7dc890..ba0e564 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -63,6 +63,8 @@
defaultCheckOnuDevExistenceAtOnuDiscovery = false
defaultForceOnuDiscIndProcessing = false
defaultMaxRetries = 10
+ defaultProducerRetryMax = 10
+ defaultMetadataRetryMax = 15
)
// AdapterFlags represents the set of configurations used by the read-write adaptercore service
@@ -103,6 +105,8 @@
EnableGEMStats bool
CheckOnuDevExistenceAtOnuDiscovery bool
ForceOnuDiscIndProcessing bool
+ ProducerRetryMax int
+ MetadataRetryMax int
}
// NewAdapterFlags returns a new RWCore config
@@ -139,6 +143,8 @@
CheckOnuDevExistenceAtOnuDiscovery: defaultCheckOnuDevExistenceAtOnuDiscovery,
ForceOnuDiscIndProcessing: defaultForceOnuDiscIndProcessing,
MaxRetries: defaultMaxRetries,
+ ProducerRetryMax: defaultProducerRetryMax,
+ MetadataRetryMax: defaultMetadataRetryMax,
}
return &adapterFlags
}
@@ -308,6 +314,14 @@
"force_onu_disc_processing",
defaultForceOnuDiscIndProcessing,
"Skip the check for onu device existence on onu discovery")
+ flag.IntVar(&(so.ProducerRetryMax),
+ "producer_retry_max",
+ defaultProducerRetryMax,
+ "This option specifies the maximum number of times the producer will retry sending messages before giving up")
+ flag.IntVar(&(so.MetadataRetryMax),
+ "metadata_retry_max",
+ defaultMetadataRetryMax,
+ "This option specifies the maximum number of times retry to receive messages before giving up")
flag.Parse()
containerName := getContainerInfo()