[SEBA-618] create protofile and API for importer to expose
	This commit adds
		-protofile
		-grpc server
		-service to register device
		-put event data received from http server to kafka bus
		- server structure for all methods and databse to store device information

Change-Id: Ic56d42553c132305bf485188b4234cb1dfcba511
diff --git a/main.go b/main.go
index 583f668..477568f 100644
--- a/main.go
+++ b/main.go
@@ -16,22 +16,80 @@
 
 import (
 	"fmt"
+	"net"
 	"net/http"
 	"os"
 	"os/signal"
 	"io/ioutil"
 	"github.com/Shopify/sarama"
+	"google.golang.org/grpc"
+	"golang.org/x/net/context"
+	empty "github.com/golang/protobuf/ptypes/empty"
+        importer "./proto"
 )
 
 var (
-//	broker  = [2]string{"voltha-kafka.default.svc.cluster.local","9092"}
 	importerTopic = "importer"
 
 )
 
 var DataProducer sarama.AsyncProducer
 
-func kafkaInit() {
+type device struct  {
+	subscription []string
+	freq uint32
+}
+
+type Server struct {
+	devicemap  map[string]*device
+	gRPCserver   *grpc.Server
+	dataproducer sarama.AsyncProducer
+	devicechan   chan *importer.DeviceInfo
+}
+
+func (s *Server) SendDeviceInfo(c context.Context,  info *importer.DeviceInfo) (*empty.Empty, error) {
+	d := device {
+		freq:	info.Frequency,
+	}
+	s.devicemap[info.IpAddress] = &d
+	s.devicechan <- info
+	return &empty.Empty{}, nil
+}
+func(s *Server) subscribeevents() {
+	for {
+		select {
+			case info:= <-s.devicechan:
+				ip_address:= info.IpAddress
+			fmt.Println("Configuring  %s ...", ip_address)
+			// call subscription function with info.IpAddress
+		}
+	}
+}
+func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
+        fmt.Println("Listening %s ...", grpcport)
+        g = grpc.NewServer()
+        l, e = net.Listen("tcp", grpcport)
+        return
+}
+func (s *Server) startgrpcserver()error {
+	fmt.Println("starting gRPC Server")
+	grpcport := ":50051"
+	listener, gserver, err := NewGrpcServer(grpcport)
+	if err != nil {
+		fmt.Println("Failed to create gRPC server: %v", err)
+		return err
+	}
+	s.gRPCserver = gserver
+	importer.RegisterDeviceManagementServer(gserver, s)
+	if err := gserver.Serve(listener); err != nil {
+		fmt.Println("Failed to run gRPC server: %v", err)
+		return err
+	}
+	return nil
+
+}
+func (s *Server) kafkaInit() {
+	fmt.Println("Starting kafka init to Connect to broker: ")
 	config := sarama.NewConfig()
         config.Producer.RequiredAcks = sarama.WaitForAll
 	config.Producer.Retry.Max = 10
@@ -40,7 +98,7 @@
 	if err != nil {
 		panic(err)
 	}
-	DataProducer = producer
+	s.dataproducer = producer
 	defer func() {
 		if err := producer.Close(); err != nil {
 			panic(err)
@@ -48,7 +106,7 @@
 	}()
 }
 
-func handle_events(w http.ResponseWriter, r *http.Request) {
+func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
 	signals := make(chan os.Signal, 1)
 	signal.Notify(signals, os.Interrupt)
 
@@ -64,17 +122,17 @@
                         Value: sarama.StringEncoder(Body),
                 }
 		select {
-		case DataProducer.Input() <- message:
+		case s.dataproducer.Input() <- message:
 
 		case <-signals:
-	        DataProducer.AsyncClose() // Trigger a shutdown of the producer.
+	        s.dataproducer.AsyncClose() // Trigger a shutdown of the producer.
 		}
 	}
 }
 
-func runServer() {
+func (s *Server) runServer() {
 	fmt.Println("Starting HTTP Server")
-	http.HandleFunc("/", handle_events)
+	http.HandleFunc("/", s.handle_events)
 	http.ListenAndServe(":8080", nil)
 }
 
@@ -86,8 +144,14 @@
 
 func main() {
 	fmt.Println("Starting Device-management Container")
-	go kafkaInit()
-	go runServer()
+	s := Server {
+		devicemap:	make(map[string]*device),
+		devicechan:	make(chan *importer.DeviceInfo),
+	}
+	go s.kafkaInit()
+	go s.runServer()
+	go s.startgrpcserver()
+	go s.subscribeevents()
 	quit := make(chan os.Signal)
 	signal.Notify(quit, os.Interrupt)