SEBA-841 importer to parse all status from redfish server for data collection purpose / remove demotest binary
SEBA-856 SendDeviceList- This API will add all devices in the list
SEBA-858 DeleteDeviceList- This API will remove all devices in the list
decouple add/remove event subscription routines and device data file update
remove 'protocol'
SEBA-874 get rid of the 'vendor' argument called by some API's
Change-Id: Icc044dd4661c3cc14f02ad1a5f52e18116da63aa
diff --git a/main.go b/main.go
index 0510876..d01685d 100644
--- a/main.go
+++ b/main.go
@@ -36,7 +36,9 @@
)
//globals
-const REDFISH_ROOT = "/redfish/v1"
+const RF_DEFAULT_PROTOCOL = "https://"
+const RF_DATA_COLLECT_THRESHOLD = 5
+const RF_DATA_COLLECT_DUMMY_INTERVAL = 1000
const CONTENT_TYPE = "application/json"
var (
@@ -45,33 +47,30 @@
var DataProducer sarama.AsyncProducer
-var vendor_default_events = map[string][]string{
- "edgecore": {"ResourceAdded", "ResourceRemoved", "Alert"},
-}
-var redfish_services = [...]string{"/Chassis", "/Systems", "/EthernetSwitches"}
+var redfish_resources = [...]string{"/redfish/v1/Chassis", "/redfish/v1/Systems","/redfish/v1/EthernetSwitches"}
var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
var subscriptionListPath string
-type scheduler struct {
+type scheduler struct {
getdata *time.Ticker
- quit chan bool
+ quit chan bool
+ getdataend chan bool
}
-type device struct {
- Subscriptions map[string]string `json:"ss"`
- Freq uint32 `json:"freq"`
- Datacollector scheduler `json:"-"`
- Freqchan chan uint32 `json:"-"`
- Vendor string `json:"vendor"`
- Protocol string `json:"protocol"`
+type device struct {
+ Subscriptions map[string]string `json:"ss"`
+ Freq uint32 `json:"freq"`
+ Datacollector scheduler `json:"-"`
+ Freqchan chan uint32 `json:"-"`
+ Eventtypes []string `json:"eventtypes"`
+ Datafile *os.File `json:"-"`
}
type Server struct {
- devicemap map[string]*device
- gRPCserver *grpc.Server
- dataproducer sarama.AsyncProducer
- httpclient *http.Client
- devicechan chan *importer.DeviceInfo
+ devicemap map[string]*device
+ gRPCserver *grpc.Server
+ dataproducer sarama.AsyncProducer
+ httpclient *http.Client
}
func (s *Server) ClearCurrentEventList(c context.Context, info *importer.Device) (*empty.Empty, error) {
@@ -81,18 +80,15 @@
if !found {
return nil, status.Errorf(codes.NotFound, "Device not registered")
}
- f := get_subscription_list(ip_address)
for event, _ := range s.devicemap[ip_address].Subscriptions {
- rtn := s.remove_subscription(ip_address, event, f)
+ rtn := s.remove_subscription(ip_address, event)
if !rtn {
log.WithFields(log.Fields{
"Event": event,
}).Info("Error removing event")
}
}
- if f != nil {
- f.Close()
- }
+ s.update_data_file(ip_address)
return &empty.Empty{}, nil
}
@@ -109,14 +105,14 @@
return currentevents, nil
}
-func (s *Server) GetEventList(c context.Context, info *importer.VendorInfo) (*importer.EventList, error) {
+func (s *Server) GetEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
fmt.Println("Received GetEventList\n")
- _, found := vendor_default_events[info.Vendor]
- if !found {
- return nil, status.Errorf(codes.NotFound, "Invalid Vendor Provided")
- }
eventstobesubscribed := new(importer.EventList)
- eventstobesubscribed.Events = vendor_default_events[info.Vendor]
+// eventstobesubscribed.Events = s.devicemap[info.IpAddress].Eventtypes
+ eventstobesubscribed.Events = s.get_event_types(info.IpAddress)
+ if eventstobesubscribed.Events == nil {
+ return nil, status.Errorf(codes.NotFound, "No events found")
+ }
return eventstobesubscribed, nil
}
@@ -126,12 +122,17 @@
if !found {
return nil, status.Errorf(codes.NotFound, "Device not registered")
}
-
+ if info.Frequency > 0 && info.Frequency < RF_DATA_COLLECT_THRESHOLD {
+ return nil, status.Errorf(codes.InvalidArgument, "Invalid frequency")
+ }
s.devicemap[info.IpAddress].Freqchan <- info.Frequency
+ s.devicemap[info.IpAddress].Freq = info.Frequency
+ s.update_data_file(info.IpAddress)
return &empty.Empty{}, nil
}
func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.GivenEventList) (*empty.Empty, error) {
+ errstring := ""
fmt.Println("Received SubsrcribeEvents\n")
//Call API to subscribe events
ip_address := subeventlist.EventIpAddress
@@ -142,28 +143,31 @@
if len(subeventlist.Events) <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "Event list is empty")
}
- f := get_subscription_list(ip_address)
for _, event := range subeventlist.Events {
if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
- rtn := s.add_subscription(ip_address, event, f)
+ rtn := s.add_subscription(ip_address, event)
if !rtn {
+ errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
log.WithFields(log.Fields{
"Event": event,
}).Info("Error adding event")
}
} else {
+ errstring = errstring + "event " + event + " already subscribed\n"
log.WithFields(log.Fields{
"Event": event,
}).Info("Already Subscribed")
}
}
- if f != nil {
- f.Close()
+ s.update_data_file(ip_address)
+ if errstring != "" {
+ return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
}
return &empty.Empty{}, nil
}
func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.GivenEventList) (*empty.Empty, error) {
+ errstring := ""
fmt.Println("Received UnSubsrcribeEvents\n")
ip_address := unsubeventlist.EventIpAddress
_, found := s.devicemap[ip_address]
@@ -175,28 +179,50 @@
return nil, status.Errorf(codes.InvalidArgument, "Event list is empty")
}
//Call API to unsubscribe events
- f := get_subscription_list(ip_address)
for _, event := range unsubeventlist.Events {
if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
- rtn := s.remove_subscription(ip_address, event, f)
+ rtn := s.remove_subscription(ip_address, event)
if !rtn {
+ errstring = errstring + "failed to unsubscribe event " + ip_address + " " + event + "\n"
log.WithFields(log.Fields{
"Event": event,
}).Info("Error removing event")
}
} else {
+ errstring = errstring + "event " + event + " not found\n"
log.WithFields(log.Fields{
"Event": event,
}).Info("was not Subscribed")
}
}
- if f != nil {
- f.Close()
- }
+ s.update_data_file(ip_address)
+ if errstring != "" {
+ return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
+ }
return &empty.Empty{}, nil
}
+func (s *Server) update_data_file(ip_address string) {
+ f := s.devicemap[ip_address].Datafile
+ if f != nil {
+ b, err := json.Marshal(s.devicemap[ip_address])
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ f.Truncate(0)
+ f.Seek(0, 0)
+ n, err := f.Write(b)
+ if err != nil {
+ fmt.Println("err wrote", n, "bytes")
+ fmt.Println(err)
+ }
+ }
+ } else {
+ fmt.Println("file handle is nil", ip_address)
+ }
+}
+
func (s *Server) collect_data(ip_address string) {
freqchan := s.devicemap[ip_address].Freqchan
ticker := s.devicemap[ip_address].Datacollector.getdata
@@ -207,83 +233,141 @@
ticker.Stop()
if freq > 0 {
ticker = time.NewTicker(time.Duration(freq) * time.Second)
+ s.devicemap[ip_address].Datacollector.getdata = ticker
}
case err := <-s.dataproducer.Errors():
fmt.Println("Failed to produce message:", err)
case <-ticker.C:
- for _, service := range redfish_services {
- rtn, data := s.get_status(ip_address, service)
- if rtn {
- for _, str := range data {
- str = "Device IP: " + ip_address + " " + str
- fmt.Printf("collected data %s\n ...", str)
- b := []byte(str)
- msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
- select {
- case s.dataproducer.Input() <- msg:
- fmt.Println("Produce message")
- default:
- }
+ for _, resource := range redfish_resources {
+ data := s.get_status(ip_address, resource)
+ for _, str := range data {
+ str = "Device IP: " + ip_address + " " + str
+ fmt.Printf("collected data %s\n ...", str)
+ b := []byte(str)
+ msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
+ select {
+ case s.dataproducer.Input() <- msg:
+ fmt.Println("Produce message")
+ default:
}
}
}
case <-donechan:
ticker.Stop()
fmt.Println("getdata ticker stopped")
+ s.devicemap[ip_address].Datacollector.getdataend <- true
return
}
}
}
-func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
- d := device{
- Subscriptions: make(map[string]string),
- Freq: info.Frequency,
- Datacollector: scheduler{
- getdata: time.NewTicker(time.Duration(info.Frequency) * time.Second),
- quit: make(chan bool),
- },
- Freqchan: make(chan uint32),
- Vendor: info.Vendor,
- Protocol: info.Protocol,
- }
- _, found := s.devicemap[info.IpAddress]
- if found {
- return nil, status.Errorf(codes.AlreadyExists, "Device Already registered")
- }
+func (s *Server) DeleteDeviceList(c context.Context, list *importer.DeviceListByIp) (*empty.Empty, error) {
+ fmt.Println("DeleteDeviceList received")
+ errstring := ""
+ for _, ip := range list.Ip {
+ if _, ok := s.devicemap[ip]; !ok {
+ fmt.Printf("Device not found ", ip)
+ errstring = errstring + "Device " + ip + " not found\n"
+ continue
+ }
+ for event, _ := range s.devicemap[ip].Subscriptions {
+ rtn := s.remove_subscription(ip, event)
+ if !rtn {
+ log.WithFields(log.Fields{
+ "Event": event,
+ }).Info("Error removing event")
+ }
+ }
+ fmt.Println("deleting device", ip)
+ s.devicemap[ip].Datacollector.quit <- true
- _, vendorfound := vendor_default_events[info.Vendor]
- if !vendorfound {
- return nil, status.Errorf(codes.NotFound, "Vendor Not Found")
+ f := s.devicemap[ip].Datafile
+ if f != nil {
+ fmt.Println("deleteing file", f.Name())
+ err := f.Close()
+ if err != nil {
+ fmt.Println("error closing file ", f.Name(), err)
+ errstring = errstring + "error closing file " + f.Name() + "\n"
+ }
+ err = os.Remove(f.Name())
+ if err != nil {
+ fmt.Println("error deleting file ", f.Name(), err)
+ }
+ } else {
+ errstring = errstring + "file " + ip + " not found\n"
+ }
+ <-s.devicemap[ip].Datacollector.getdataend
+ delete(s.devicemap, ip)
}
-
- //default_events := [...]string{}
- s.devicemap[info.IpAddress] = &d
- fmt.Printf("size of devicemap %d\n", len(s.devicemap))
- ip_address := info.IpAddress
- fmt.Printf("Configuring %s\n", ip_address)
- // call subscription function with info.IpAddress
-
- default_events := vendor_default_events[info.Vendor]
-
- f := get_subscription_list(ip_address)
- for _, event := range default_events {
- s.add_subscription(ip_address, event, f)
+ if errstring != "" {
+ return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
}
- if f != nil {
- f.Close()
- }
- go s.collect_data(ip_address)
return &empty.Empty{}, nil
}
-func (s *Server) GetCurrentDevices(c context.Context, e *importer.Empty) (*importer.DeviceList, error) {
+func (s *Server) SendDeviceList(c context.Context, list *importer.DeviceList) (*empty.Empty, error) {
+ errstring := ""
+ for _, dev := range list.Device {
+ ip_address:= dev.IpAddress
+ if _, ok := s.devicemap[dev.IpAddress]; ok {
+ fmt.Printf("Device %s already exists", ip_address)
+ errstring = errstring + "Device " + ip_address + " already exists\n"
+ continue
+ }
+
+ if dev.Frequency > 0 && dev.Frequency < RF_DATA_COLLECT_THRESHOLD {
+ fmt.Printf("Device %s data collection frequency %d out of range", ip_address, dev.Frequency)
+ errstring = errstring + "Device " + ip_address + " data collection frequency out of range\n"
+ continue
+ }
+ d := device {
+ Subscriptions: make(map[string]string),
+ Freq: dev.Frequency,
+ Datacollector: scheduler{
+ quit: make(chan bool),
+ getdataend: make(chan bool),
+ },
+ Freqchan: make(chan uint32),
+ }
+ s.devicemap[ip_address] = &d
+ fmt.Printf("Configuring %s\n", ip_address)
+
+ /* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
+ freq := dev.Frequency
+ if freq == 0 {
+ freq = RF_DATA_COLLECT_DUMMY_INTERVAL
+ }
+ s.devicemap[ip_address].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
+ if dev.Frequency == 0 {
+ s.devicemap[ip_address].Datacollector.getdata.Stop()
+ }
+
+ eventtypes := s.get_event_types(ip_address)
+ if eventtypes != nil {
+ for _, event := range eventtypes {
+ s.devicemap[ip_address].Eventtypes = append(s.devicemap[ip_address].Eventtypes, event)
+ if s.add_subscription(ip_address, event) == false {
+ errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
+ }
+ }
+ }
+ go s.collect_data(ip_address)
+ s.devicemap[ip_address].Datafile = get_data_file(ip_address)
+ s.update_data_file(ip_address)
+ }
+ if errstring != "" {
+ return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
+ }
+ return &empty.Empty{}, nil
+}
+
+func (s *Server) GetCurrentDevices(c context.Context, e *importer.Empty) (*importer.DeviceListByIp, error) {
fmt.Println("In Received GetCurrentDevices\n")
if len(s.devicemap) == 0 {
return nil, status.Errorf(codes.NotFound, "Devices not registered")
}
- dl := new(importer.DeviceList)
+ dl := new(importer.DeviceListByIp)
for k, v := range s.devicemap {
if v != nil {
fmt.Printf("IpAdd[%s] \n", k)
@@ -362,26 +446,39 @@
}
func (s *Server) init_data_persistence() {
+ fmt.Println("Retrieving persisted data")
subscriptionListPath = pvmount + "/subscriptions"
if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
fmt.Println(err)
} else {
- lists, err := ioutil.ReadDir(subscriptionListPath)
+ files, err := ioutil.ReadDir(subscriptionListPath)
if err != nil {
fmt.Println(err)
} else {
- for _, list := range lists {
- b, err := ioutil.ReadFile(path.Join(subscriptionListPath, list.Name()))
+ for _, f := range files {
+ b, err := ioutil.ReadFile(path.Join(subscriptionListPath, f.Name()))
if err != nil {
fmt.Println(err)
- } else {
- ip := list.Name()
+ } else if f.Size() > 0 {
+ ip := f.Name()
d := device{}
json.Unmarshal(b, &d)
s.devicemap[ip] = &d
- s.devicemap[ip].Datacollector.getdata = time.NewTicker(time.Duration(s.devicemap[ip].Freq) * time.Second)
+ freq := s.devicemap[ip].Freq
+
+ /* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
+ if freq == 0 {
+ freq = RF_DATA_COLLECT_DUMMY_INTERVAL
+ }
+ s.devicemap[ip].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
+ if s.devicemap[ip].Freq == 0 {
+ s.devicemap[ip].Datacollector.getdata.Stop()
+ }
+
s.devicemap[ip].Datacollector.quit = make(chan bool)
+ s.devicemap[ip].Datacollector.getdataend = make(chan bool)
s.devicemap[ip].Freqchan = make(chan uint32)
+ s.devicemap[ip].Datafile = get_data_file(ip)
go s.collect_data(ip)
}
}
@@ -401,7 +498,7 @@
//sarama.Logger = log.New()
}
-func get_subscription_list(ip string) *os.File {
+func get_data_file(ip string) *os.File {
if pvmount == "" {
return nil
}
@@ -412,6 +509,12 @@
return f
}
+func (s *Server) close_data_files() {
+ for ip, _ := range s.devicemap {
+ s.devicemap[ip].Datafile.Close()
+ }
+}
+
func main() {
fmt.Println("Starting Device-management Container")
@@ -420,10 +523,9 @@
Timeout: 10 * time.Second,
}
- s := Server{
- devicemap: make(map[string]*device),
- devicechan: make(chan *importer.DeviceInfo),
- httpclient: client,
+ s := Server {
+ devicemap: make(map[string]*device),
+ httpclient: client,
}
s.kafkaInit()
@@ -441,5 +543,6 @@
case sig := <-quit:
fmt.Println("Shutting down:", sig)
s.kafkaCloseProducer()
+ s.close_data_files()
}
}