[VOL-5567] Update protos
Change-Id: Ib4ec57241aab48d918fc33448020c876882d5abc
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/pkg/flows/flow_utils.go b/pkg/flows/flow_utils.go
index 8a6333f..7b65b4e 100755
--- a/pkg/flows/flow_utils.go
+++ b/pkg/flows/flow_utils.go
@@ -27,9 +27,10 @@
"sync"
"github.com/cevaris/ordered_map"
- "github.com/golang/protobuf/proto"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "google.golang.org/protobuf/encoding/prototext"
+ "google.golang.org/protobuf/proto"
)
var (
@@ -1019,7 +1020,7 @@
bandStats = append(bandStats, band)
}
meter.Stats.BandStats = bandStats
- logger.Debugw(ctx, "Allocated meter entry", log.Fields{"meter": *meter})
+ logger.Debugw(ctx, "Allocated meter entry", log.Fields{"meter": meter})
return meter
}
@@ -1040,8 +1041,8 @@
func MkOxmFields(matchFields []ofp.OfpOxmField) []*ofp.OfpOxmField {
oxmFields := make([]*ofp.OfpOxmField, 0)
- for _, matchField := range matchFields {
- oxmField := ofp.OfpOxmField{OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC, Field: matchField.Field}
+ for i := range matchFields {
+ oxmField := ofp.OfpOxmField{OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC, Field: matchFields[i].Field}
oxmFields = append(oxmFields, &oxmField)
}
return oxmFields
@@ -1278,7 +1279,7 @@
for kv, ok := iter(); ok; kv, ok = iter() {
if protoMsg, isMsg := kv.Value.(*ofp.OfpFlowStats); isMsg {
buffer.WriteString("\nFlow:\n")
- buffer.WriteString(proto.MarshalTextString(protoMsg))
+ buffer.WriteString(prototext.Format(protoMsg))
buffer.WriteString("\n")
}
}
@@ -1286,7 +1287,7 @@
for kv, ok := iter(); ok; kv, ok = iter() {
if protoMsg, isMsg := kv.Value.(*ofp.OfpGroupEntry); isMsg {
buffer.WriteString("\nGroup:\n")
- buffer.WriteString(proto.MarshalTextString(protoMsg))
+ buffer.WriteString(prototext.Format(protoMsg))
buffer.WriteString("\n")
}
}
diff --git a/pkg/flows/flow_utils_test.go b/pkg/flows/flow_utils_test.go
index 9704600..dcd6b25 100644
--- a/pkg/flows/flow_utils_test.go
+++ b/pkg/flows/flow_utils_test.go
@@ -256,12 +256,12 @@
fg.AddGroup(group)
str = fg.String()
- assert.True(t, strings.Contains(str, "id: 11819684229970388353"))
- assert.True(t, strings.Contains(str, "group_id: 10"))
- assert.True(t, strings.Contains(str, "oxm_class: OFPXMC_OPENFLOW_BASIC"))
- assert.True(t, strings.Contains(str, "type: OFPXMT_OFB_VLAN_VID"))
- assert.True(t, strings.Contains(str, "vlan_vid: 4096"))
- assert.True(t, strings.Contains(str, "buckets:"))
+ assert.True(t, strings.Contains(str, "id: 11819684229970388353"))
+ assert.True(t, strings.Contains(str, "group_id: 10"))
+ assert.True(t, strings.Contains(str, "oxm_class: OFPXMC_OPENFLOW_BASIC"))
+ assert.True(t, strings.Contains(str, "type: OFPXMT_OFB_VLAN_VID"))
+ assert.True(t, strings.Contains(str, "vlan_vid: 4096"))
+ assert.True(t, strings.Contains(str, "buckets: {"))
}
func TestFlowsAndGroups_AddFrom(t *testing.T) {
diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go
index 6d8a93c..5fcc8dc 100644
--- a/pkg/grpc/client.go
+++ b/pkg/grpc/client.go
@@ -37,7 +37,7 @@
"github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
- rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
+ "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)
@@ -291,7 +291,7 @@
// Get a new client using reflection. The server can implement any grpc service, but it
// needs to also implement the "StartKeepAliveStream" API
- grpcReflectClient := grpcreflect.NewClient(ctx, rpb.NewServerReflectionClient(conn))
+ grpcReflectClient := grpcreflect.NewClientAuto(ctx, conn)
if grpcReflectClient == nil {
logger.Errorw(ctx, "grpc-reflect-client-nil", log.Fields{"api-endpoint": c.serverEndPoint, "client": c.clientEndpoint})
return
@@ -621,8 +621,8 @@
if len(retry_interceptor) > 0 {
interceptor_opts = append(interceptor_opts, retry_interceptor...)
}
- conn, err := grpc.Dial(c.serverEndPoint,
- grpc.WithInsecure(),
+ conn, err := grpc.NewClient(c.serverEndPoint,
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcRecvMsgSizeLimit*1024*1024)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
grpc_opentracing.StreamClientInterceptor(grpc_opentracing.WithTracer(log.ActiveTracerProxy{})),
diff --git a/pkg/grpc/mock_core_service.go b/pkg/grpc/mock_core_service.go
index 5c78dff..e695102 100644
--- a/pkg/grpc/mock_core_service.go
+++ b/pkg/grpc/mock_core_service.go
@@ -21,17 +21,18 @@
"strconv"
"time"
- "github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-protos/v5/go/common"
ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
"github.com/opencord/voltha-protos/v5/go/core_service"
"github.com/opencord/voltha-protos/v5/go/health"
"github.com/opencord/voltha-protos/v5/go/voltha"
+ "google.golang.org/protobuf/types/known/emptypb"
)
// MockCoreServiceHandler implements the methods in the core service
type MockCoreServiceHandler struct {
+ core_service.UnimplementedCoreServiceServer
exitChannel chan struct{}
}
@@ -48,25 +49,25 @@
close(handler.exitChannel)
}
-func (handler *MockCoreServiceHandler) RegisterAdapter(ctx context.Context, reg *ca.AdapterRegistration) (*empty.Empty, error) {
+func (handler *MockCoreServiceHandler) RegisterAdapter(ctx context.Context, reg *ca.AdapterRegistration) (*emptypb.Empty, error) {
//logger.Debugw(ctx, "registration-received", log.Fields{"input": reg})
- return &empty.Empty{}, nil
+ return &emptypb.Empty{}, nil
}
-func (handler *MockCoreServiceHandler) DeviceUpdate(context.Context, *voltha.Device) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) DeviceUpdate(context.Context, *voltha.Device) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
-func (handler *MockCoreServiceHandler) PortCreated(context.Context, *voltha.Port) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) PortCreated(context.Context, *voltha.Port) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
-func (handler *MockCoreServiceHandler) PortsStateUpdate(context.Context, *ca.PortStateFilter) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) PortsStateUpdate(context.Context, *ca.PortStateFilter) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
-func (handler *MockCoreServiceHandler) DeleteAllPorts(context.Context, *common.ID) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) DeleteAllPorts(context.Context, *common.ID) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
func (handler *MockCoreServiceHandler) GetDevicePort(context.Context, *ca.PortFilter) (*voltha.Port, error) {
@@ -77,25 +78,25 @@
return &voltha.Ports{}, nil
}
-func (handler *MockCoreServiceHandler) DeviceStateUpdate(context.Context, *ca.DeviceStateFilter) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) DeviceStateUpdate(context.Context, *ca.DeviceStateFilter) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
-func (handler *MockCoreServiceHandler) DevicePMConfigUpdate(context.Context, *voltha.PmConfigs) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) DevicePMConfigUpdate(context.Context, *voltha.PmConfigs) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
func (handler *MockCoreServiceHandler) ChildDeviceDetected(context.Context, *ca.DeviceDiscovery) (*voltha.Device, error) {
return &voltha.Device{}, nil
}
-func (handler *MockCoreServiceHandler) ChildDevicesLost(context.Context, *common.ID) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) ChildDevicesLost(context.Context, *common.ID) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
-func (handler *MockCoreServiceHandler) ChildDevicesDetected(context.Context, *common.ID) (*empty.Empty, error) {
+func (handler *MockCoreServiceHandler) ChildDevicesDetected(context.Context, *common.ID) (*emptypb.Empty, error) {
time.Sleep(50 * time.Millisecond)
- return &empty.Empty{}, nil
+ return &emptypb.Empty{}, nil
}
func (handler *MockCoreServiceHandler) GetDevice(ctx context.Context, id *common.ID) (*voltha.Device, error) {
@@ -116,21 +117,21 @@
return &voltha.Devices{}, nil
}
-func (handler *MockCoreServiceHandler) SendPacketIn(context.Context, *ca.PacketIn) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) SendPacketIn(context.Context, *ca.PacketIn) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
-func (handler *MockCoreServiceHandler) DeviceReasonUpdate(context.Context, *ca.DeviceReason) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) DeviceReasonUpdate(context.Context, *ca.DeviceReason) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
-func (handler *MockCoreServiceHandler) PortStateUpdate(context.Context, *ca.PortState) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) PortStateUpdate(context.Context, *ca.PortState) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
// Additional API found in the Core - unused?
-func (handler *MockCoreServiceHandler) ReconcileChildDevices(context.Context, *common.ID) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) ReconcileChildDevices(context.Context, *common.ID) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
func (handler *MockCoreServiceHandler) GetChildDeviceWithProxyAddress(context.Context, *voltha.Device_ProxyAddress) (*voltha.Device, error) {
@@ -141,12 +142,12 @@
return &voltha.Ports{}, nil
}
-func (handler *MockCoreServiceHandler) ChildrenStateUpdate(context.Context, *ca.DeviceStateFilter) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) ChildrenStateUpdate(context.Context, *ca.DeviceStateFilter) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
-func (handler *MockCoreServiceHandler) UpdateImageDownload(context.Context, *voltha.ImageDownload) (*empty.Empty, error) {
- return &empty.Empty{}, nil
+func (handler *MockCoreServiceHandler) UpdateImageDownload(context.Context, *voltha.ImageDownload) (*emptypb.Empty, error) {
+ return &emptypb.Empty{}, nil
}
func (handler *MockCoreServiceHandler) GetHealthStatus(stream core_service.CoreService_GetHealthStatusServer) error {
diff --git a/pkg/kafka/client.go b/pkg/kafka/client.go
index 77c529b..5b77001 100755
--- a/pkg/kafka/client.go
+++ b/pkg/kafka/client.go
@@ -25,7 +25,7 @@
"context"
"time"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
)
const (
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 75025f2..c47efe9 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -26,9 +26,9 @@
"github.com/IBM/sarama"
"github.com/eapache/go-resiliency/breaker"
- "github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "google.golang.org/protobuf/proto"
)
// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
diff --git a/pkg/kafka/utils.go b/pkg/kafka/utils.go
index fba621f..622513f 100644
--- a/pkg/kafka/utils.go
+++ b/pkg/kafka/utils.go
@@ -21,9 +21,9 @@
"strings"
"time"
- "github.com/golang/protobuf/ptypes/any"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-lib-go/v7/pkg/probe"
+ "google.golang.org/protobuf/types/known/anypb"
)
const (
@@ -59,10 +59,10 @@
type RpcResponse struct {
MType RpcMType
Err error
- Reply *any.Any
+ Reply *anypb.Any
}
-func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
+func NewResponse(messageType RpcMType, err error, body *anypb.Any) *RpcResponse {
return &RpcResponse{
MType: messageType,
Err: err,
diff --git a/pkg/mocks/kafka/kafka_client.go b/pkg/mocks/kafka/kafka_client.go
index 6d2bd1c..092ae84 100644
--- a/pkg/mocks/kafka/kafka_client.go
+++ b/pkg/mocks/kafka/kafka_client.go
@@ -22,11 +22,11 @@
"sync"
"time"
- "github.com/golang/protobuf/proto"
"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
)
const (
diff --git a/pkg/techprofile/tech_profile.go b/pkg/techprofile/tech_profile.go
index c3e202d..1bfb964 100644
--- a/pkg/techprofile/tech_profile.go
+++ b/pkg/techprofile/tech_profile.go
@@ -17,7 +17,6 @@
package techprofile
import (
- "bytes"
"context"
"errors"
"fmt"
@@ -27,12 +26,12 @@
"sync"
"time"
- "github.com/golang/protobuf/jsonpb"
- "github.com/golang/protobuf/proto"
"github.com/opencord/voltha-lib-go/v7/pkg/db"
"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
tp_pb "github.com/opencord/voltha-protos/v5/go/tech_profile"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/proto"
)
// Interface to pon resource manager APIs
@@ -284,7 +283,7 @@
if t.resourceMgr.GetTechnology() == epon {
tp := t.getEponTPFromKVStore(ctx, tpID)
if tp != nil {
- if err := t.validateInstanceControlAttr(ctx, *tp.InstanceControl); err != nil {
+ if err := t.validateInstanceControlAttr(ctx, tp.InstanceControl); err != nil {
logger.Error(ctx, "invalid-instance-ctrl-attr-using-default-tp")
tp = t.getDefaultEponProfile(ctx)
} else {
@@ -318,7 +317,7 @@
logger.Infow(ctx, "epon-tp-instance-created-successfully",
log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
- if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, resInst); err != nil {
+ if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, &resInst); err != nil {
logger.Errorw(ctx, "failed-to-update-resource-instance-to-kv-store--freeing-up-resources", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
allocIDs := make([]uint32, 0)
allocIDs = append(allocIDs, resInst.AllocId)
@@ -334,7 +333,7 @@
} else {
tp := t.getTPFromKVStore(ctx, tpID)
if tp != nil {
- if err := t.validateInstanceControlAttr(ctx, *tp.InstanceControl); err != nil {
+ if err := t.validateInstanceControlAttr(ctx, tp.InstanceControl); err != nil {
logger.Error(ctx, "invalid-instance-ctrl-attr--using-default-tp")
tp = t.getDefaultTechProfile(ctx)
} else {
@@ -369,7 +368,7 @@
logger.Infow(ctx, "tp-instance-created-successfully",
log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
- if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, resInst); err != nil {
+ if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, &resInst); err != nil {
logger.Errorw(ctx, "failed-to-update-resource-instance-to-kv-store--freeing-up-resources", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
allocIDs := make([]uint32, 0)
allocIDs = append(allocIDs, resInst.AllocId)
@@ -523,22 +522,22 @@
if tech == xgspon || tech == xgpon || tech == gpon {
t.tpInstanceMapLock.RLock()
defer t.tpInstanceMapLock.RUnlock()
- tpInstancesTech := make([]tp_pb.TechProfileInstance, 0)
+ tpInstancesTech := make([]*tp_pb.TechProfileInstance, 0)
for i := 0; i < MaxUniPortPerOnu; i++ {
key := onuTpInstancePathSuffix + fmt.Sprintf("/uni-{%d}", i)
if tpInst, ok := t.tpInstanceMap[key]; ok {
- tpInstancesTech = append(tpInstancesTech, *tpInst)
+ tpInstancesTech = append(tpInstancesTech, tpInst)
}
}
return tpInstancesTech
} else if tech == epon {
t.epontpInstanceMapLock.RLock()
defer t.epontpInstanceMapLock.RUnlock()
- tpInstancesTech := make([]tp_pb.EponTechProfileInstance, 0)
+ tpInstancesTech := make([]*tp_pb.EponTechProfileInstance, 0)
for i := 0; i < MaxUniPortPerOnu; i++ {
key := onuTpInstancePathSuffix + fmt.Sprintf("/uni-{%d}", i)
if tpInst, ok := t.eponTpInstanceMap[key]; ok {
- tpInstancesTech = append(tpInstancesTech, *tpInst)
+ tpInstancesTech = append(tpInstancesTech, tpInst)
}
}
return tpInstancesTech
@@ -698,7 +697,7 @@
return nil, fmt.Errorf("downstream gem port traffic queue creation failed due to unsupported direction %s", direction)
}
-func (t *TechProfileMgr) validateInstanceControlAttr(ctx context.Context, instCtl tp_pb.InstanceControl) error {
+func (t *TechProfileMgr) validateInstanceControlAttr(ctx context.Context, instCtl *tp_pb.InstanceControl) error {
if instCtl.Onu != "single-instance" && instCtl.Onu != "multi-instance" {
logger.Errorw(ctx, "invalid-onu-instance-control-attribute", log.Fields{"onu-inst": instCtl.Onu})
return errors.New("invalid-onu-instance-ctl-attr")
@@ -1144,11 +1143,11 @@
(isMulticastAttrValue == "True" || isMulticastAttrValue == "true" || isMulticastAttrValue == "TRUE")
}
-func (t *TechProfileMgr) addResourceInstanceToKVStore(ctx context.Context, tpID uint32, uniPortName string, resInst tp_pb.ResourceInstance) error {
- logger.Debugw(ctx, "adding-resource-instance-to-kv-store", log.Fields{"tpID": tpID, "uniPortName": uniPortName, "resInst": resInst})
- val, err := proto.Marshal(&resInst)
+func (t *TechProfileMgr) addResourceInstanceToKVStore(ctx context.Context, tpID uint32, uniPortName string, resInst *tp_pb.ResourceInstance) error {
+ logger.Debugw(ctx, "adding-resource-instance-to-kv-store", log.Fields{"tpID": tpID, "uniPortName": uniPortName})
+ val, err := proto.Marshal(resInst)
if err != nil {
- logger.Errorw(ctx, "failed-to-marshall-resource-instance", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName, "resInst": resInst})
+ logger.Errorw(ctx, "failed-to-marshall-resource-instance", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
return err
}
err = t.config.ResourceInstanceKVBacked.Put(ctx, fmt.Sprintf("%s/%d/%s", t.resourceMgr.GetTechnology(), tpID, uniPortName), val)
@@ -1184,13 +1183,12 @@
/* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
if value, err := kvstore.ToByte(kvresult.Value); err == nil {
lTp := &tp_pb.TechProfile{}
- reader := bytes.NewReader(value)
- if err = jsonpb.Unmarshal(reader, lTp); err != nil {
+ if err = protojson.Unmarshal(value, lTp); err != nil {
logger.Errorw(ctx, "error-unmarshalling-tp-from-kv-store", log.Fields{"err": err, "tpID": tpID, "error": err})
return nil
}
- logger.Debugw(ctx, "success-fetched-tp-from-kv-store", log.Fields{"tpID": tpID, "value": *lTp})
+ logger.Debugw(ctx, "success-fetched-tp-from-kv-store", log.Fields{"tpID": tpID})
return lTp
} else {
logger.Errorw(ctx, "error-decoding-tp", log.Fields{"err": err, "tpID": tpID})
@@ -1221,13 +1219,12 @@
/* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
if value, err := kvstore.ToByte(kvresult.Value); err == nil {
lEponTp := &tp_pb.EponTechProfile{}
- reader := bytes.NewReader(value)
- if err = jsonpb.Unmarshal(reader, lEponTp); err != nil {
+ if err = protojson.Unmarshal(value, lEponTp); err != nil {
logger.Errorw(ctx, "error-unmarshalling-epon-tp-from-kv-store", log.Fields{"err": err, "tpID": tpID, "error": err})
return nil
}
- logger.Debugw(ctx, "success-fetching-epon-tp-from-kv-store", log.Fields{"tpID": tpID, "value": *lEponTp})
+ logger.Debugw(ctx, "success-fetching-epon-tp-from-kv-store", log.Fields{"tpID": tpID})
return lEponTp
}
}