blob: 38e8449b72526a00113e793fb775f1e4b146c891 [file] [log] [blame]
onkarkundargi72cfd362020-02-27 12:34:37 +05301/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17// Package main implements a client for Greeter service.
18package main
19
20import (
21 "context"
22 "encoding/json"
23 "fmt"
24 "github.com/Shopify/sarama"
25 "github.com/golang/protobuf/proto"
26 "github.com/google/uuid"
27 pb "github.com/opencord/voltha-protos/v3/go/voltha"
28 "google.golang.org/grpc"
29 "log"
30 "time"
31)
32
33const (
34 //address = "compose_rw_core_1:50057"
35 address = "voltha-rw-core.voltha:50057"
36 //kafka_address = "compose_kafka_1:9092"
37 kafka_address = "voltha-kafka.voltha:9092"
38 onuDeviceId = "world"
39)
40
41func connect(device_id *string) (*pb.Event, error) {
42 // Set up a connection to the server.
43 log.Printf("voltha grpc client started, address=%s ...", address)
44 conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
45 if err != nil {
46 log.Printf("did not connect: %v", err)
47 return nil, err
48 }
49 defer conn.Close()
50 c := pb.NewVolthaServiceClient(conn)
51 id, err := uuid.NewUUID()
52 log.Printf("ID: %s", id.String())
53 if err != nil {
54 log.Printf("did not generate uuid: %v", err)
55 return nil, err
56 }
57 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
58 defer cancel()
59 log.Printf("Calling StartOmciTestAction")
60 r, err := c.StartOmciTestAction(ctx, &pb.OmciTestRequest{Id: *device_id, Uuid: id.String()})
61 if err != nil {
62 return nil, fmt.Errorf("start-omci-test-action-failed: %v", err)
63 }
64 log.Printf("Result: %s", r.Result)
65 djson, _ := json.Marshal(r.Result)
66 result := &pb.Event{}
67 if string(djson) == "0" {
68 config := sarama.NewConfig()
69 config.ClientID = "go-kafka-consumer"
70 config.Consumer.Return.Errors = true
71 // Specify brokers address. This is default one
72 brokers := []string{kafka_address}
73 // Create new consumer
74 master, err := sarama.NewConsumer(brokers, config)
75 if err != nil {
76 panic(err)
77 }
78 defer func() {
79 if err := master.Close(); err != nil {
80 panic(err)
81 }
82 }()
83
84 topic := "voltha.events"
85 consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetNewest)
86 if err != nil {
87 panic(err)
88 }
89 // Get signnal for finish
90 doneCh := make(chan struct{})
91 go func() {
92 for {
93 select {
94 case err := <-consumer.Errors():
95 fmt.Println(err)
96 case msg := <-consumer.Messages():
97 unpackResult := &pb.Event{}
98 var err error
99 if err = proto.Unmarshal(msg.Value, unpackResult); err != nil {
100 fmt.Println("Error while doing unmarshal", err)
101 }
102 kpi_event2 := unpackResult.GetKpiEvent2()
103 if (kpi_event2 != nil) && (kpi_event2.SliceData[0].Metadata.Uuid == id.String()) {
104 result = unpackResult
105 close(doneCh)
106 return
107 }
108 }
109 }
110 }()
111 <-doneCh
112 log.Printf("Result: %s", result)
113 }
114 return result, nil
115}