SEBA-841 importer to parse all status from redfish server for data collection purpose / remove demotest binary
SEBA-856 SendDeviceList- This API will add all devices in the list
SEBA-858 DeleteDeviceList- This API will remove all devices in the list
decouple add/remove event subscription routines and device data file update
remove 'protocol'
SEBA-874 get rid of the 'vendor' argument called by some API's

Change-Id: Icc044dd4661c3cc14f02ad1a5f52e18116da63aa
diff --git a/main.go b/main.go
index 0510876..d01685d 100644
--- a/main.go
+++ b/main.go
@@ -36,7 +36,9 @@
 )
 
 //globals
-const REDFISH_ROOT = "/redfish/v1"
+const RF_DEFAULT_PROTOCOL = "https://"
+const RF_DATA_COLLECT_THRESHOLD = 5
+const RF_DATA_COLLECT_DUMMY_INTERVAL = 1000
 const CONTENT_TYPE = "application/json"
 
 var (
@@ -45,33 +47,30 @@
 
 var DataProducer sarama.AsyncProducer
 
-var vendor_default_events = map[string][]string{
-	"edgecore": {"ResourceAdded", "ResourceRemoved", "Alert"},
-}
-var redfish_services = [...]string{"/Chassis", "/Systems", "/EthernetSwitches"}
+var redfish_resources = [...]string{"/redfish/v1/Chassis", "/redfish/v1/Systems","/redfish/v1/EthernetSwitches"}
 var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
 var subscriptionListPath string
 
-type scheduler struct {
+type scheduler struct  {
 	getdata *time.Ticker
-	quit    chan bool
+	quit chan bool
+	getdataend chan bool
 }
 
-type device struct {
-	Subscriptions map[string]string `json:"ss"`
-	Freq          uint32            `json:"freq"`
-	Datacollector scheduler         `json:"-"`
-	Freqchan      chan uint32       `json:"-"`
-	Vendor        string            `json:"vendor"`
-	Protocol      string            `json:"protocol"`
+type device struct  {
+	Subscriptions	map[string]string	`json:"ss"`
+	Freq		uint32			`json:"freq"`
+	Datacollector	scheduler		`json:"-"`
+	Freqchan	chan uint32		`json:"-"`
+	Eventtypes	[]string		`json:"eventtypes"`
+	Datafile	*os.File		`json:"-"`
 }
 
 type Server struct {
-	devicemap    map[string]*device
-	gRPCserver   *grpc.Server
-	dataproducer sarama.AsyncProducer
-	httpclient   *http.Client
-	devicechan   chan *importer.DeviceInfo
+	devicemap	map[string]*device
+	gRPCserver	*grpc.Server
+	dataproducer	sarama.AsyncProducer
+	httpclient	*http.Client
 }
 
 func (s *Server) ClearCurrentEventList(c context.Context, info *importer.Device) (*empty.Empty, error) {
@@ -81,18 +80,15 @@
 	if !found {
 		return nil, status.Errorf(codes.NotFound, "Device not registered")
 	}
-	f := get_subscription_list(ip_address)
 	for event, _ := range s.devicemap[ip_address].Subscriptions {
-		rtn := s.remove_subscription(ip_address, event, f)
+		rtn := s.remove_subscription(ip_address, event)
 		if !rtn {
 			log.WithFields(log.Fields{
 				"Event": event,
 			}).Info("Error removing event")
 		}
 	}
-	if f != nil {
-		f.Close()
-	}
+	s.update_data_file(ip_address)
 	return &empty.Empty{}, nil
 }
 
@@ -109,14 +105,14 @@
 	return currentevents, nil
 }
 
-func (s *Server) GetEventList(c context.Context, info *importer.VendorInfo) (*importer.EventList, error) {
+func (s *Server) GetEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
 	fmt.Println("Received GetEventList\n")
-	_, found := vendor_default_events[info.Vendor]
-	if !found {
-		return nil, status.Errorf(codes.NotFound, "Invalid Vendor Provided")
-	}
 	eventstobesubscribed := new(importer.EventList)
-	eventstobesubscribed.Events = vendor_default_events[info.Vendor]
+//	eventstobesubscribed.Events = s.devicemap[info.IpAddress].Eventtypes
+	eventstobesubscribed.Events = s.get_event_types(info.IpAddress)
+	if eventstobesubscribed.Events == nil {
+		return nil, status.Errorf(codes.NotFound, "No events found")
+	}
 	return eventstobesubscribed, nil
 }
 
@@ -126,12 +122,17 @@
 	if !found {
 		return nil, status.Errorf(codes.NotFound, "Device not registered")
 	}
-
+	if info.Frequency > 0 && info.Frequency < RF_DATA_COLLECT_THRESHOLD {
+		return nil, status.Errorf(codes.InvalidArgument, "Invalid frequency")
+	}
 	s.devicemap[info.IpAddress].Freqchan <- info.Frequency
+	s.devicemap[info.IpAddress].Freq = info.Frequency
+	s.update_data_file(info.IpAddress)
 	return &empty.Empty{}, nil
 }
 
 func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.GivenEventList) (*empty.Empty, error) {
+	errstring := ""
 	fmt.Println("Received SubsrcribeEvents\n")
 	//Call API to subscribe events
 	ip_address := subeventlist.EventIpAddress
@@ -142,28 +143,31 @@
 	if len(subeventlist.Events) <= 0 {
 		return nil, status.Errorf(codes.InvalidArgument, "Event list is empty")
 	}
-	f := get_subscription_list(ip_address)
 	for _, event := range subeventlist.Events {
 		if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
-			rtn := s.add_subscription(ip_address, event, f)
+			rtn := s.add_subscription(ip_address, event)
 			if !rtn {
+				errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
 				log.WithFields(log.Fields{
 					"Event": event,
 				}).Info("Error adding  event")
 			}
 		} else {
+			errstring = errstring + "event " + event + " already subscribed\n"
 			log.WithFields(log.Fields{
 				"Event": event,
 			}).Info("Already Subscribed")
 		}
 	}
-	if f != nil {
-		f.Close()
+	s.update_data_file(ip_address)
+	if errstring != "" {
+		return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
 	}
 	return &empty.Empty{}, nil
 }
 
 func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.GivenEventList) (*empty.Empty, error) {
+	errstring := ""
 	fmt.Println("Received UnSubsrcribeEvents\n")
 	ip_address := unsubeventlist.EventIpAddress
 	_, found := s.devicemap[ip_address]
@@ -175,28 +179,50 @@
 		return nil, status.Errorf(codes.InvalidArgument, "Event list is empty")
 	}
 	//Call API to unsubscribe events
-	f := get_subscription_list(ip_address)
 	for _, event := range unsubeventlist.Events {
 		if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
-			rtn := s.remove_subscription(ip_address, event, f)
+			rtn := s.remove_subscription(ip_address, event)
 			if !rtn {
+				errstring = errstring + "failed to unsubscribe event " + ip_address + " " + event + "\n"
 				log.WithFields(log.Fields{
 					"Event": event,
 				}).Info("Error removing event")
 			}
 		} else {
+			errstring = errstring + "event " + event + " not found\n"
 			log.WithFields(log.Fields{
 				"Event": event,
 			}).Info("was not Subscribed")
 		}
 	}
-	if f != nil {
-		f.Close()
-	}
+	s.update_data_file(ip_address)
 
+	if errstring != "" {
+		return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
+	}
 	return &empty.Empty{}, nil
 }
 
+func (s *Server) update_data_file(ip_address string) {
+	f := s.devicemap[ip_address].Datafile
+	if f != nil {
+		b, err := json.Marshal(s.devicemap[ip_address])
+		if err != nil {
+			fmt.Println(err)
+		} else {
+			f.Truncate(0)
+			f.Seek(0, 0)
+			n, err := f.Write(b)
+			if err != nil {
+				fmt.Println("err wrote", n, "bytes")
+				fmt.Println(err)
+			}
+		}
+	} else {
+		fmt.Println("file handle is nil", ip_address)
+	}
+}
+
 func (s *Server) collect_data(ip_address string) {
 	freqchan := s.devicemap[ip_address].Freqchan
 	ticker := s.devicemap[ip_address].Datacollector.getdata
@@ -207,83 +233,141 @@
 			ticker.Stop()
 			if freq > 0 {
 				ticker = time.NewTicker(time.Duration(freq) * time.Second)
+				s.devicemap[ip_address].Datacollector.getdata = ticker
 			}
 		case err := <-s.dataproducer.Errors():
 			fmt.Println("Failed to produce message:", err)
 		case <-ticker.C:
-			for _, service := range redfish_services {
-				rtn, data := s.get_status(ip_address, service)
-				if rtn {
-					for _, str := range data {
-						str = "Device IP: " + ip_address + " " + str
-						fmt.Printf("collected data  %s\n ...", str)
-						b := []byte(str)
-						msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
-						select {
-						case s.dataproducer.Input() <- msg:
-							fmt.Println("Produce message")
-						default:
-						}
+			for _, resource := range redfish_resources {
+				data := s.get_status(ip_address, resource)
+				for _, str := range data {
+					str = "Device IP: " + ip_address + " " + str
+					fmt.Printf("collected data  %s\n ...", str)
+					b := []byte(str)
+					msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
+					select {
+					case  s.dataproducer.Input() <- msg:
+						fmt.Println("Produce message")
+					default:
 					}
 				}
 			}
 		case <-donechan:
 			ticker.Stop()
 			fmt.Println("getdata ticker stopped")
+			s.devicemap[ip_address].Datacollector.getdataend <- true
 			return
 		}
 	}
 }
 
-func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
-	d := device{
-		Subscriptions: make(map[string]string),
-		Freq:          info.Frequency,
-		Datacollector: scheduler{
-			getdata: time.NewTicker(time.Duration(info.Frequency) * time.Second),
-			quit:    make(chan bool),
-		},
-		Freqchan: make(chan uint32),
-		Vendor:   info.Vendor,
-		Protocol: info.Protocol,
-	}
-	_, found := s.devicemap[info.IpAddress]
-	if found {
-		return nil, status.Errorf(codes.AlreadyExists, "Device Already registered")
-	}
+func (s *Server) DeleteDeviceList(c context.Context, list *importer.DeviceListByIp) (*empty.Empty, error) {
+	fmt.Println("DeleteDeviceList received")
+	errstring := ""
+	for _, ip := range list.Ip {
+		if _, ok := s.devicemap[ip]; !ok {
+			fmt.Printf("Device not found ", ip)
+			errstring = errstring + "Device " + ip + " not found\n"
+			continue
+		}
+		for event, _ := range s.devicemap[ip].Subscriptions {
+			rtn := s.remove_subscription(ip, event)
+			if !rtn {
+				log.WithFields(log.Fields{
+					"Event": event,
+				}).Info("Error removing event")
+			}
+		}
+		fmt.Println("deleting device", ip)
+		s.devicemap[ip].Datacollector.quit <- true
 
-	_, vendorfound := vendor_default_events[info.Vendor]
-	if !vendorfound {
-		return nil, status.Errorf(codes.NotFound, "Vendor Not Found")
+		f := s.devicemap[ip].Datafile
+		if f != nil {
+			fmt.Println("deleteing file", f.Name())
+			err := f.Close()
+			if err != nil {
+				fmt.Println("error closing file ", f.Name(), err)
+				errstring = errstring + "error closing file " + f.Name() + "\n"
+			}
+			err = os.Remove(f.Name())
+			if err != nil {
+				fmt.Println("error deleting file ", f.Name(), err)
+			}
+		} else {
+			errstring = errstring + "file " + ip + " not found\n"
+		}
+		<-s.devicemap[ip].Datacollector.getdataend
+		delete(s.devicemap, ip)
 	}
-
-	//default_events := [...]string{}
-	s.devicemap[info.IpAddress] = &d
-	fmt.Printf("size of devicemap %d\n", len(s.devicemap))
-	ip_address := info.IpAddress
-	fmt.Printf("Configuring  %s\n", ip_address)
-	// call subscription function with info.IpAddress
-
-	default_events := vendor_default_events[info.Vendor]
-
-	f := get_subscription_list(ip_address)
-	for _, event := range default_events {
-		s.add_subscription(ip_address, event, f)
+	if errstring != "" {
+		return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
 	}
-	if f != nil {
-		f.Close()
-	}
-	go s.collect_data(ip_address)
 	return &empty.Empty{}, nil
 }
 
-func (s *Server) GetCurrentDevices(c context.Context, e *importer.Empty) (*importer.DeviceList, error) {
+func (s *Server) SendDeviceList(c context.Context,  list *importer.DeviceList) (*empty.Empty, error) {
+	errstring := ""
+	for _, dev := range list.Device {
+		ip_address:= dev.IpAddress
+		if _, ok := s.devicemap[dev.IpAddress]; ok {
+			fmt.Printf("Device %s already exists", ip_address)
+			errstring = errstring + "Device " + ip_address + " already exists\n"
+			continue
+		}
+
+		if dev.Frequency > 0 && dev.Frequency < RF_DATA_COLLECT_THRESHOLD {
+			fmt.Printf("Device %s data collection frequency %d out of range", ip_address, dev.Frequency)
+			errstring = errstring + "Device " + ip_address + " data collection frequency out of range\n"
+			continue
+		}
+		d := device {
+			Subscriptions: make(map[string]string),
+			Freq: dev.Frequency,
+			Datacollector: scheduler{
+				quit: make(chan bool),
+				getdataend: make(chan bool),
+			},
+			Freqchan: make(chan uint32),
+		}
+		s.devicemap[ip_address] = &d
+		fmt.Printf("Configuring  %s\n", ip_address)
+
+		/* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
+		freq := dev.Frequency
+		if freq == 0 {
+			freq = RF_DATA_COLLECT_DUMMY_INTERVAL
+		}
+		s.devicemap[ip_address].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
+		if dev.Frequency == 0 {
+			s.devicemap[ip_address].Datacollector.getdata.Stop()
+		}
+
+		eventtypes := s.get_event_types(ip_address)
+		if eventtypes != nil {
+			for _, event := range eventtypes {
+				s.devicemap[ip_address].Eventtypes = append(s.devicemap[ip_address].Eventtypes, event)
+				if s.add_subscription(ip_address, event) == false {
+					errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
+				}
+			}
+		}
+		go s.collect_data(ip_address)
+		s.devicemap[ip_address].Datafile = get_data_file(ip_address)
+		s.update_data_file(ip_address)
+	}
+	if errstring != "" {
+		return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
+	}
+	return &empty.Empty{}, nil
+}
+
+func (s *Server) GetCurrentDevices(c context.Context, e *importer.Empty) (*importer.DeviceListByIp, error) {
 	fmt.Println("In Received GetCurrentDevices\n")
 
 	if len(s.devicemap) == 0 {
 		return nil, status.Errorf(codes.NotFound, "Devices not registered")
 	}
-	dl := new(importer.DeviceList)
+	dl := new(importer.DeviceListByIp)
 	for k, v := range s.devicemap {
 		if v != nil {
 			fmt.Printf("IpAdd[%s] \n", k)
@@ -362,26 +446,39 @@
 }
 
 func (s *Server) init_data_persistence() {
+	fmt.Println("Retrieving persisted data")
 	subscriptionListPath = pvmount + "/subscriptions"
 	if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
 		fmt.Println(err)
 	} else {
-		lists, err := ioutil.ReadDir(subscriptionListPath)
+		files, err := ioutil.ReadDir(subscriptionListPath)
 		if err != nil {
 			fmt.Println(err)
 		} else {
-			for _, list := range lists {
-				b, err := ioutil.ReadFile(path.Join(subscriptionListPath, list.Name()))
+			for _, f := range files {
+				b, err := ioutil.ReadFile(path.Join(subscriptionListPath, f.Name()))
 				if err != nil {
 					fmt.Println(err)
-				} else {
-					ip := list.Name()
+				} else if f.Size() > 0 {
+					ip := f.Name()
 					d := device{}
 					json.Unmarshal(b, &d)
 					s.devicemap[ip] = &d
-					s.devicemap[ip].Datacollector.getdata = time.NewTicker(time.Duration(s.devicemap[ip].Freq) * time.Second)
+					freq := s.devicemap[ip].Freq
+
+					/* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
+					if freq == 0 {
+						freq = RF_DATA_COLLECT_DUMMY_INTERVAL
+					}
+					s.devicemap[ip].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
+					if s.devicemap[ip].Freq == 0 {
+						s.devicemap[ip].Datacollector.getdata.Stop()
+					}
+
 					s.devicemap[ip].Datacollector.quit = make(chan bool)
+					s.devicemap[ip].Datacollector.getdataend = make(chan bool)
 					s.devicemap[ip].Freqchan = make(chan uint32)
+					s.devicemap[ip].Datafile = get_data_file(ip)
 					go s.collect_data(ip)
 				}
 			}
@@ -401,7 +498,7 @@
 	//sarama.Logger = log.New()
 }
 
-func get_subscription_list(ip string) *os.File {
+func get_data_file(ip string) *os.File {
 	if pvmount == "" {
 		return nil
 	}
@@ -412,6 +509,12 @@
 	return f
 }
 
+func (s *Server) close_data_files() {
+	for ip, _ := range s.devicemap {
+		s.devicemap[ip].Datafile.Close()
+	}
+}
+
 func main() {
 	fmt.Println("Starting Device-management Container")
 
@@ -420,10 +523,9 @@
 		Timeout: 10 * time.Second,
 	}
 
-	s := Server{
-		devicemap:  make(map[string]*device),
-		devicechan: make(chan *importer.DeviceInfo),
-		httpclient: client,
+	s := Server {
+		devicemap:	make(map[string]*device),
+		httpclient:	client,
 	}
 
 	s.kafkaInit()
@@ -441,5 +543,6 @@
 	case sig := <-quit:
 		fmt.Println("Shutting down:", sig)
 		s.kafkaCloseProducer()
+		s.close_data_files()
 	}
 }