diff --git a/pkg/kafka/utils.go b/pkg/kafka/utils.go
index bdc615f..608361b 100644
--- a/pkg/kafka/utils.go
+++ b/pkg/kafka/utils.go
@@ -16,8 +16,14 @@
 package kafka
 
 import (
-	"github.com/golang/protobuf/ptypes/any"
+	"context"
+	"errors"
 	"strings"
+	"time"
+
+	"github.com/golang/protobuf/ptypes/any"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/probe"
 )
 
 const (
@@ -82,3 +88,98 @@
 	}
 	return deviceId
 }
+
+// WaitUntilKafkaConnectionIsUp waits until the kafka client can establish a connection to the kafka broker or until the
+// context times out.
+func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, serviceName string) error {
+	if kClient == nil {
+		return errors.New("kafka-client-is-nil")
+	}
+	for {
+		if err := kClient.Start(ctx); err != nil {
+			probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+			logger.Warnw(ctx, "kafka-connection-down", log.Fields{"error": err})
+			select {
+			case <-time.After(connectionRetryInterval):
+				continue
+			case <-ctx.Done():
+				return ctx.Err()
+			}
+		}
+		probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+		logger.Info(ctx, "kafka-connection-up")
+		break
+	}
+	return nil
+}
+
+/**
+MonitorKafkaReadiness checks the liveliness and readiness of the kafka service
+and update the status in the probe.
+*/
+func MonitorKafkaReadiness(ctx context.Context,
+	kClient Client,
+	liveProbeInterval, notLiveProbeInterval time.Duration,
+	serviceName string) {
+
+	if kClient == nil {
+		logger.Fatal(ctx, "kafka-client-is-nil")
+	}
+
+	logger.Infow(ctx, "monitor-kafka-readiness", log.Fields{"service": serviceName})
+
+	livelinessChannel := kClient.EnableLivenessChannel(ctx, true)
+	healthinessChannel := kClient.EnableHealthinessChannel(ctx, true)
+	timeout := liveProbeInterval
+	failed := false
+	for {
+		timeoutTimer := time.NewTimer(timeout)
+
+		select {
+		case healthiness := <-healthinessChannel:
+			if !healthiness {
+				// This will eventually cause K8s to restart the container, and will do
+				// so in a way that allows cleanup to continue, rather than an immediate
+				// panic and exit here.
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusFailed)
+				logger.Infow(ctx, "kafka-not-healthy", log.Fields{"service": serviceName})
+				failed = true
+			}
+			// Check if the timer has expired or not
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case liveliness := <-livelinessChannel:
+			if failed {
+				// Failures of the message bus are permanent and can't ever be recovered from,
+				// so make sure we never inadvertently reset a failed state back to unready.
+			} else if !liveliness {
+				// kafka not reachable or down, updating the status to not ready state
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+				logger.Infow(ctx, "kafka-not-live", log.Fields{"service": serviceName})
+				timeout = notLiveProbeInterval
+			} else {
+				// kafka is reachable , updating the status to running state
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+				timeout = liveProbeInterval
+			}
+			// Check if the timer has expired or not
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case <-timeoutTimer.C:
+			logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
+			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+			// the liveness probe may wait (and block) writing to our channel.
+			go func() {
+				err := kClient.SendLiveness(ctx)
+				if err != nil {
+					// Catch possible error case if sending liveness after Sarama has been stopped.
+					logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
+				}
+			}()
+		case <-ctx.Done():
+			return // just exit
+		}
+	}
+}
