VOL-2098 Support for Kafka liveness testing

* Adds liveness channel to sarama_client and kafka_interadapter proxy.
The liveness channel will push true or false to the channel on
each successful or failed Kafka publish.

* Adds support to make a "liveness publish attempt", which publishes
an empty message on a _liveness channel.

* Adds ServiceStatusNotReady to Probe

* Suppresses multiple Probe.UpdateStatus of the same status

* Adds the ability to attach a Probe to the grpc server, so that
when the probe returns NotReady, the Server responds to requests
with UNAVAILABLE.

Change-Id: I996c719570a50f2f6f397887d10d489608269c3f
diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go
index 4c95aa1..488d470 100644
--- a/pkg/grpc/server.go
+++ b/pkg/grpc/server.go
@@ -20,7 +20,9 @@
 	"fmt"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials"
+	"google.golang.org/grpc/status"
 	"net"
 )
 
@@ -52,12 +54,19 @@
 	s.server.Start(ctx)
 */
 
+// Interface allows probes to be attached to server
+// A probe must support the IsReady() method
+type ReadyProbe interface {
+	IsReady() bool
+}
+
 type GrpcServer struct {
 	gs       *grpc.Server
 	address  string
 	port     int
 	secure   bool
 	services []func(*grpc.Server)
+	probe    ReadyProbe // optional
 
 	*GrpcSecurity
 }
@@ -97,11 +106,12 @@
 		if err != nil {
 			log.Fatalf("could not load TLS keys: %s", err)
 		}
-		s.gs = grpc.NewServer(grpc.Creds(creds))
+		s.gs = grpc.NewServer(grpc.Creds(creds),
+			withServerUnaryInterceptor(s))
 
 	} else {
 		log.Info("starting-insecure-grpc-server")
-		s.gs = grpc.NewServer()
+		s.gs = grpc.NewServer(withServerUnaryInterceptor(s))
 	}
 
 	// Register all required services
@@ -114,6 +124,42 @@
 	}
 }
 
+// Attach a readiness probe to the server.
+// If the probe returns NotReady, the server will return UNAVAILABLE
+func (s *GrpcServer) AttachReadyProbe(p ReadyProbe) {
+	s.probe = p
+}
+
+func withServerUnaryInterceptor(s *GrpcServer) grpc.ServerOption {
+	return grpc.UnaryInterceptor(mkServerInterceptor(s))
+}
+
+// Make a serverInterceptor for the given GrpcServer
+// This interceptor will check whether there is an attached probe,
+// and if that probe indicates NotReady, then an UNAVAILABLE
+// response will be returned.
+func mkServerInterceptor(s *GrpcServer) func(ctx context.Context,
+	req interface{},
+	info *grpc.UnaryServerInfo,
+	handler grpc.UnaryHandler) (interface{}, error) {
+
+	return func(ctx context.Context,
+		req interface{},
+		info *grpc.UnaryServerInfo,
+		handler grpc.UnaryHandler) (interface{}, error) {
+
+		if (s.probe != nil) && (!s.probe.IsReady()) {
+			log.Warnf("Grpc request received while not ready %v", req)
+			return nil, status.Error(codes.Unavailable, "system is not ready")
+		}
+
+		// Calls the handler
+		h, err := handler(ctx, req)
+
+		return h, err
+	}
+}
+
 /*
 Stop servicing GRPC requests
 */