[VOL-5486] Upgrade library versions
Change-Id: I8b4e88699e03f44ee13e467867f45ae3f0a63c4b
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/pkg/config/logcontroller.go b/pkg/config/logcontroller.go
index 8cc4e52..8d0fab8 100644
--- a/pkg/config/logcontroller.go
+++ b/pkg/config/logcontroller.go
@@ -56,7 +56,7 @@
logger.Debug(ctx, "creating-new-component-log-controller")
componentName := os.Getenv("COMPONENT_NAME")
if componentName == "" {
- return nil, errors.New("Unable to retrieve PoD Component Name from Runtime env")
+ return nil, errors.New("unable to retrieve PoD Component Name from Runtime env")
}
var defaultLogLevel string
diff --git a/pkg/config/logfeaturescontroller.go b/pkg/config/logfeaturescontroller.go
index 8f4131e..6b5ce53 100644
--- a/pkg/config/logfeaturescontroller.go
+++ b/pkg/config/logfeaturescontroller.go
@@ -44,7 +44,7 @@
logger.Debug(ctx, "creating-new-component-log-features-controller")
componentName := os.Getenv("COMPONENT_NAME")
if componentName == "" {
- return nil, errors.New("Unable to retrieve PoD Component Name from Runtime env")
+ return nil, errors.New("unable to retrieve PoD Component Name from Runtime env")
}
tracingStatus := log.GetGlobalLFM().GetTracePublishingStatus()
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index d15bb6e..6828700 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -25,8 +25,9 @@
"time"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
- v3Client "go.etcd.io/etcd/clientv3"
- v3rpcTypes "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+ v3rpcTypes "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
+
+ clientv3 "go.etcd.io/etcd/client/v3"
)
const (
@@ -43,7 +44,7 @@
type EtcdClient struct {
pool EtcdClientAllocator
watchedChannels sync.Map
- watchedClients map[string]*v3Client.Client
+ watchedClients map[string]*clientv3.Client
watchedClientsLock sync.RWMutex
}
@@ -82,7 +83,7 @@
logger.Infow(ctx, "etcd-pool-created", log.Fields{"capacity": capacity, "max-usage": maxUsage})
return &EtcdClient{pool: pool,
- watchedClients: make(map[string]*v3Client.Client),
+ watchedClients: make(map[string]*clientv3.Client),
}, nil
}
@@ -109,7 +110,7 @@
return false, err
}
defer c.pool.Put(client)
- resp, err := client.Get(ctx, key, v3Client.WithKeysOnly(), v3Client.WithCountOnly())
+ resp, err := client.Get(ctx, key, clientv3.WithKeysOnly(), clientv3.WithCountOnly())
if err != nil {
logger.Error(ctx, err)
return false, err
@@ -128,7 +129,7 @@
return nil, err
}
defer c.pool.Put(client)
- resp, err := client.Get(ctx, key, v3Client.WithPrefix())
+ resp, err := client.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
logger.Error(ctx, err)
@@ -201,7 +202,7 @@
defer c.pool.Put(client)
// Fetch keys with the prefix
- resp, err := client.Get(ctx, prefixKey, v3Client.WithPrefix())
+ resp, err := client.Get(ctx, prefixKey, clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("failed to fetch entries for prefix %s: %w", prefixKey, err)
}
@@ -227,7 +228,7 @@
defer c.pool.Put(client)
// Fetch keys with the prefix
- resp, err := client.Get(ctx, prefixKey, v3Client.WithPrefix(), v3Client.WithKeysOnly())
+ resp, err := client.Get(ctx, prefixKey, clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
return nil, fmt.Errorf("failed to fetch entries for prefix %s: %w", prefixKey, err)
}
@@ -346,7 +347,7 @@
defer c.pool.Put(client)
//delete the prefix
- if _, err := client.Delete(ctx, prefixKey, v3Client.WithPrefix()); err != nil {
+ if _, err := client.Delete(ctx, prefixKey, clientv3.WithPrefix()); err != nil {
logger.Errorw(ctx, "failed-to-delete-prefix-key", log.Fields{"key": prefixKey, "error": err})
return err
}
@@ -372,11 +373,11 @@
}
c.watchedClientsLock.Unlock()
- w := v3Client.NewWatcher(client)
+ w := clientv3.NewWatcher(client)
ctx, cancel := context.WithCancel(ctx)
- var channel v3Client.WatchChan
+ var channel clientv3.WatchChan
if withPrefix {
- channel = w.Watch(ctx, key, v3Client.WithPrefix())
+ channel = w.Watch(ctx, key, clientv3.WithPrefix())
} else {
channel = w.Watch(ctx, key)
}
@@ -385,7 +386,7 @@
ch := make(chan *Event, maxClientChannelBufferSize)
// Keep track of the created channels so they can be closed when required
- channelMap := make(map[chan *Event]v3Client.Watcher)
+ channelMap := make(map[chan *Event]clientv3.Watcher)
channelMap[ch] = w
channelMaps := c.addChannelMap(key, channelMap)
@@ -399,33 +400,33 @@
}
-func (c *EtcdClient) addChannelMap(key string, channelMap map[chan *Event]v3Client.Watcher) []map[chan *Event]v3Client.Watcher {
+func (c *EtcdClient) addChannelMap(key string, channelMap map[chan *Event]clientv3.Watcher) []map[chan *Event]clientv3.Watcher {
var channels interface{}
var exists bool
if channels, exists = c.watchedChannels.Load(key); exists {
- channels = append(channels.([]map[chan *Event]v3Client.Watcher), channelMap)
+ channels = append(channels.([]map[chan *Event]clientv3.Watcher), channelMap)
} else {
- channels = []map[chan *Event]v3Client.Watcher{channelMap}
+ channels = []map[chan *Event]clientv3.Watcher{channelMap}
}
c.watchedChannels.Store(key, channels)
- return channels.([]map[chan *Event]v3Client.Watcher)
+ return channels.([]map[chan *Event]clientv3.Watcher)
}
-func (c *EtcdClient) removeChannelMap(key string, pos int) []map[chan *Event]v3Client.Watcher {
+func (c *EtcdClient) removeChannelMap(key string, pos int) []map[chan *Event]clientv3.Watcher {
var channels interface{}
var exists bool
if channels, exists = c.watchedChannels.Load(key); exists {
- channels = append(channels.([]map[chan *Event]v3Client.Watcher)[:pos], channels.([]map[chan *Event]v3Client.Watcher)[pos+1:]...)
+ channels = append(channels.([]map[chan *Event]clientv3.Watcher)[:pos], channels.([]map[chan *Event]clientv3.Watcher)[pos+1:]...)
c.watchedChannels.Store(key, channels)
}
- return channels.([]map[chan *Event]v3Client.Watcher)
+ return channels.([]map[chan *Event]clientv3.Watcher)
}
-func (c *EtcdClient) getChannelMaps(key string) ([]map[chan *Event]v3Client.Watcher, bool) {
+func (c *EtcdClient) getChannelMaps(key string) ([]map[chan *Event]clientv3.Watcher, bool) {
var channels interface{}
var exists bool
@@ -435,14 +436,14 @@
return nil, exists
}
- return channels.([]map[chan *Event]v3Client.Watcher), exists
+ return channels.([]map[chan *Event]clientv3.Watcher), exists
}
// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
// may be multiple listeners on the same key. The previously created channel serves as a key
func (c *EtcdClient) CloseWatch(ctx context.Context, key string, ch chan *Event) {
// Get the array of channels mapping
- var watchedChannels []map[chan *Event]v3Client.Watcher
+ var watchedChannels []map[chan *Event]clientv3.Watcher
var ok bool
if watchedChannels, ok = c.getChannelMaps(key); !ok {
@@ -482,7 +483,7 @@
logger.Infow(ctx, "watcher-channel-exiting", log.Fields{"key": key, "channel": channelMaps})
}
-func (c *EtcdClient) listenForKeyChange(ctx context.Context, channel v3Client.WatchChan, ch chan<- *Event, cancel context.CancelFunc) {
+func (c *EtcdClient) listenForKeyChange(ctx context.Context, channel clientv3.WatchChan, ch chan<- *Event, cancel context.CancelFunc) {
logger.Debug(ctx, "start-listening-on-channel ...")
defer cancel()
defer close(ch)
@@ -494,11 +495,11 @@
logger.Debug(ctx, "stop-listening-on-channel ...")
}
-func getEventType(event *v3Client.Event) int {
+func getEventType(event *clientv3.Event) int {
switch event.Type {
- case v3Client.EventTypePut:
+ case clientv3.EventTypePut:
return PUT
- case v3Client.EventTypeDelete:
+ case clientv3.EventTypeDelete:
return DELETE
}
return UNKNOWN
diff --git a/pkg/db/kvstore/etcdpool.go b/pkg/db/kvstore/etcdpool.go
index 6c7c838..68095c4 100644
--- a/pkg/db/kvstore/etcdpool.go
+++ b/pkg/db/kvstore/etcdpool.go
@@ -23,7 +23,7 @@
"time"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
- "go.etcd.io/etcd/clientv3"
+ clientv3 "go.etcd.io/etcd/client/v3"
)
// EtcdClientAllocator represents a generic interface to allocate an Etcd Client
diff --git a/pkg/events/events_proxy.go b/pkg/events/events_proxy.go
index a265392..1eb2409 100644
--- a/pkg/events/events_proxy.go
+++ b/pkg/events/events_proxy.go
@@ -128,8 +128,8 @@
/* Send out device events with key*/
func (ep *EventProxy) SendDeviceEventWithKey(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64, key string) error {
if deviceEvent == nil {
- logger.Error(ctx, "Recieved empty device event")
- return errors.New("Device event nil")
+ logger.Error(ctx, "recieved empty device event")
+ return errors.New("device event nil")
}
var event voltha.Event
var de voltha.Event_DeviceEvent
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index d1a1a3a..75025f2 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -24,8 +24,7 @@
"sync"
"time"
- "github.com/Shopify/sarama"
- scc "github.com/bsm/sarama-cluster"
+ "github.com/IBM/sarama"
"github.com/eapache/go-resiliency/breaker"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
@@ -49,7 +48,7 @@
KafkaAddress string
producer sarama.AsyncProducer
consumer sarama.Consumer
- groupConsumers map[string]*scc.Consumer
+ groupConsumers map[string]sarama.ConsumerGroup
lockOfGroupConsumers sync.RWMutex
consumerGroupPrefix string
consumerType int
@@ -218,7 +217,7 @@
option(client)
}
- client.groupConsumers = make(map[string]*scc.Consumer)
+ client.groupConsumers = make(map[string]sarama.ConsumerGroup)
client.lockTopicToConsumerChannelMap = sync.RWMutex{}
client.topicLockMap = make(map[string]*sync.RWMutex)
@@ -776,7 +775,7 @@
err = errTemp
}
}
- } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
+ } else if groupConsumer, ok := consumer.(sarama.ConsumerGroup); ok {
if errTemp := groupConsumer.Close(); errTemp != nil {
if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
// This can occur on race condition
@@ -879,33 +878,28 @@
}
// createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
- config := scc.NewConfig()
+func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (sarama.ConsumerGroup, error) {
+ config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.ClientID = uuid.New().String()
- config.Group.Mode = scc.ConsumerModeMultiplex
+ config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
config.Consumer.Return.Errors = true
- //config.Group.Return.Notifications = false
- //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
- //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = initialOffset
- //config.Consumer.Offsets.Initial = sarama.OffsetOldest
+
brokers := []string{sc.KafkaAddress}
-
- topics := []string{topic.Name}
- var consumer *scc.Consumer
+ // topics := []string{topic.Name}
+ var consumerGroup sarama.ConsumerGroup
var err error
-
- if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
- logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ if consumerGroup, err = sarama.NewConsumerGroup(brokers, groupId, config); err != nil {
+ logger.Errorw(ctx, "create-group-consumer-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
- logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
- //sc.groupConsumers[topic.Name] = consumer
- sc.addToGroupConsumers(topic.Name, consumer)
- return consumer, nil
+ logger.Debugw(ctx, "create-group-consumer-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+ //sc.groupConsumers[topic.Name] = consumerGroup
+ sc.addToGroupConsumers(topic.Name, consumerGroup)
+ return consumerGroup, nil
}
// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
@@ -964,48 +958,59 @@
sc.setUnhealthy(ctx)
}
-func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
+func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumerGroup sarama.ConsumerGroup, consumerChnls *consumerChannels) {
logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
-startloop:
- for {
- select {
- case err, ok := <-consumer.Errors():
- if ok {
+ go func() {
+ for {
+ err := consumerGroup.Consume(ctx, []string{topic.Name}, &groupConsumerHandler{
+ consumerChnls: consumerChnls,
+ sc: sc,
+ topic: topic,
+ })
+ if err != nil {
+ logger.Warnw(ctx, "group-consumer-error", log.Fields{"topic": topic.Name, "error": err})
if sc.isLivenessError(ctx, err) {
sc.updateLiveness(ctx, false)
}
- logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
- } else {
- logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
- // channel is closed
- break startloop
}
- case msg, ok := <-consumer.Messages():
- if !ok {
- logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
- // Channel closed
- break startloop
+ select {
+ case <-sc.doneCh:
+ logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
+ return
+ default:
}
- sc.updateLiveness(ctx, true)
- logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
- msgBody := msg.Value
- var protoMsg proto.Message
- if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
- logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
- continue
- }
- go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
- consumer.MarkOffset(msg, "")
- case ntf := <-consumer.Notifications():
- logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
- case <-sc.doneCh:
- logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
- break startloop
}
+ }()
+}
+
+type groupConsumerHandler struct {
+ consumerChnls *consumerChannels
+ sc *SaramaClient
+ topic *Topic
+}
+
+func (h *groupConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
+ return nil
+}
+
+func (h *groupConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
+ return nil
+}
+
+func (h *groupConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ for msg := range claim.Messages() {
+ h.sc.updateLiveness(context.Background(), true)
+ logger.Debugw(context.Background(), "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+ var protoMsg proto.Message
+ if err := proto.Unmarshal(msg.Value, protoMsg); err != nil {
+ logger.Warnw(context.Background(), "invalid-message", log.Fields{"error": err})
+ continue
+ }
+ go h.sc.dispatchToConsumers(h.consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
+ session.MarkMessage(msg, "")
}
- logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
- sc.setUnhealthy(ctx)
+ return nil
}
func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
@@ -1019,7 +1024,7 @@
for _, consumer := range consumerCh.consumers {
if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
- } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
+ } else if gConsumer, ok := consumer.(sarama.ConsumerGroup); ok {
go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
} else {
logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
@@ -1071,18 +1076,16 @@
// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
// for that topic. It also starts the routine that listens for messages on that topic.
func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
- // TODO: Replace this development partition consumers with a group consumers
- var pConsumer *scc.Consumer
+ var consumerGroup sarama.ConsumerGroup
var err error
- if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
- logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+ if consumerGroup, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset); err != nil {
+ logger.Errorw(ctx, "creating-group-consumer-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
- // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
- // unbuffered to verify race conditions.
+
consumerListeningChannel := make(chan proto.Message)
cc := &consumerChannels{
- consumers: []interface{}{pConsumer},
+ consumers: []interface{}{consumerGroup},
channels: []chan proto.Message{consumerListeningChannel},
}
@@ -1135,22 +1138,21 @@
return channels
}
-func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
+func (sc *SaramaClient) addToGroupConsumers(topic string, consumerGroup sarama.ConsumerGroup) {
sc.lockOfGroupConsumers.Lock()
defer sc.lockOfGroupConsumers.Unlock()
if _, exist := sc.groupConsumers[topic]; !exist {
- sc.groupConsumers[topic] = consumer
+ sc.groupConsumers[topic] = consumerGroup
}
}
func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
sc.lockOfGroupConsumers.Lock()
defer sc.lockOfGroupConsumers.Unlock()
- if _, exist := sc.groupConsumers[topic]; exist {
- consumer := sc.groupConsumers[topic]
+ if consumerGroup, exist := sc.groupConsumers[topic]; exist {
delete(sc.groupConsumers, topic)
- if err := consumer.Close(); err != nil {
- logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
+ if err := consumerGroup.Close(); err != nil {
+ logger.Errorw(ctx, "failure-closing-consumer-group", log.Fields{"error": err})
return err
}
}
diff --git a/pkg/log/log.go b/pkg/log/log.go
index 8564602..4e42c26 100644
--- a/pkg/log/log.go
+++ b/pkg/log/log.go
@@ -202,7 +202,7 @@
case "FATAL":
return FatalLevel, nil
}
- return 0, errors.New("Given LogLevel is invalid : " + l)
+ return 0, errors.New("given LogLevel is invalid : " + l)
}
func LogLevelToString(l LogLevel) (string, error) {
@@ -218,7 +218,7 @@
case FatalLevel:
return "FATAL", nil
}
- return "", fmt.Errorf("Given LogLevel is invalid %d", l)
+ return "", fmt.Errorf("given LogLevel is invalid %d", l)
}
func getDefaultConfig(outputType string, level LogLevel, defaultFields Fields) zp.Config {
@@ -674,7 +674,7 @@
return loggers[pkgName], nil
}
- return loggers[pkgName], errors.New("Package Not Found")
+ return loggers[pkgName], errors.New("package not found")
}
// UpdateAllCallerSkipLevel create new loggers for all registered pacakges with the default updated caller skipltFields.
diff --git a/pkg/log/utils.go b/pkg/log/utils.go
index e21ce0c..292a1cf 100644
--- a/pkg/log/utils.go
+++ b/pkg/log/utils.go
@@ -23,13 +23,14 @@
"context"
"errors"
"fmt"
- "github.com/opentracing/opentracing-go"
- jtracing "github.com/uber/jaeger-client-go"
- jcfg "github.com/uber/jaeger-client-go/config"
"io"
"os"
"strings"
"sync"
+
+ "github.com/opentracing/opentracing-go"
+ jtracing "github.com/uber/jaeger-client-go"
+ jcfg "github.com/uber/jaeger-client-go/config"
)
const (
@@ -103,7 +104,7 @@
func (lfm *LogFeaturesManager) InitTracingAndLogCorrelation(tracePublishEnabled bool, traceAgentAddress string, logCorrelationEnabled bool) (io.Closer, error) {
lfm.componentName = os.Getenv("COMPONENT_NAME")
if lfm.componentName == "" {
- return nil, errors.New("Unable to retrieve PoD Component Name from Runtime env")
+ return nil, errors.New("unable to retrieve PoD Component Name from Runtime env")
}
lfm.lock.Lock()
diff --git a/pkg/meters/meter_utils.go b/pkg/meters/meter_utils.go
index 6dfb0c8..4e84126 100644
--- a/pkg/meters/meter_utils.go
+++ b/pkg/meters/meter_utils.go
@@ -26,14 +26,15 @@
// GetTrafficShapingInfo returns CIR,PIR and GIR values
func GetTrafficShapingInfo(ctx context.Context, meterConfig *ofp.OfpMeterConfig) (*tp_pb.TrafficShapingInfo, error) {
- switch meterBandSize := len(meterConfig.Bands); {
- case meterBandSize == 1:
+ meterBandSize := len(meterConfig.Bands)
+ switch meterBandSize {
+ case 1:
band := meterConfig.Bands[0]
if band.BurstSize == 0 { // GIR = PIR, Burst Size = 0, tcont type 1
return &tp_pb.TrafficShapingInfo{Pir: band.Rate, Gir: band.Rate}, nil
}
return &tp_pb.TrafficShapingInfo{Pir: band.Rate, Pbs: band.BurstSize}, nil // PIR, tcont type 4
- case meterBandSize == 2:
+ case 2:
firstBand, secondBand := meterConfig.Bands[0], meterConfig.Bands[1]
if firstBand.BurstSize == 0 && secondBand.BurstSize == 0 &&
firstBand.Rate == secondBand.Rate { // PIR = GIR, tcont type 1
@@ -45,7 +46,7 @@
}
return &tp_pb.TrafficShapingInfo{Pir: secondBand.Rate, Pbs: secondBand.BurstSize, Cir: firstBand.Rate, Cbs: firstBand.BurstSize}, nil
}
- case meterBandSize == 3: // PIR,CIR,GIR, tcont type 5
+ case 3: // PIR,CIR,GIR, tcont type 5
var count, girIndex int
for i, band := range meterConfig.Bands {
if band.BurstSize == 0 { // find GIR
diff --git a/pkg/mocks/etcd/etcd_server.go b/pkg/mocks/etcd/etcd_server.go
index 59d5215..f2e221c 100644
--- a/pkg/mocks/etcd/etcd_server.go
+++ b/pkg/mocks/etcd/etcd_server.go
@@ -20,10 +20,9 @@
"fmt"
"net/url"
"os"
- "strings"
"time"
- "go.etcd.io/etcd/embed"
+ "go.etcd.io/etcd/server/v3/embed"
)
const (
@@ -58,28 +57,26 @@
cfg := embed.NewConfig()
cfg.Name = configName
cfg.Dir = localPersistentStorageDir
- // cfg.Logger = "zap"
+
if !islogLevelValid(logLevel) {
- logger.Fatalf(ctx, "Invalid log level -%s", logLevel)
+ logger.Fatalf(ctx, "Invalid log level - %s", logLevel)
}
- // cfg.LogLevel = logLevel
- cfg.Debug = strings.EqualFold(logLevel, "debug")
- cfg.LogPkgLevels = "*=C"
- cfg.SetupLogging()
+
+ cfg.Logger = "zap"
acurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", clientPort))
if err != nil {
- logger.Fatalf(ctx, "Invalid client port -%d", clientPort)
+ logger.Fatalf(ctx, "Invalid client port - %d", clientPort)
}
- cfg.ACUrls = []url.URL{*acurl}
- cfg.LCUrls = []url.URL{*acurl}
+ cfg.ListenClientUrls = []url.URL{*acurl}
+ cfg.AdvertiseClientUrls = []url.URL{*acurl}
apurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", peerPort))
if err != nil {
- logger.Fatalf(ctx, "Invalid peer port -%d", peerPort)
+ logger.Fatalf(ctx, "Invalid peer port - %d", peerPort)
}
- cfg.LPUrls = []url.URL{*apurl}
- cfg.APUrls = []url.URL{*apurl}
+ cfg.ListenPeerUrls = []url.URL{*apurl}
+ cfg.AdvertisePeerUrls = []url.URL{*apurl}
cfg.ClusterState = embed.ClusterStateFlagNew
cfg.InitialCluster = cfg.Name + "=" + apurl.String()
@@ -90,9 +87,7 @@
// getDefaultCfg specifies the default config
func getDefaultCfg() *embed.Config {
cfg := embed.NewConfig()
- cfg.Debug = false
- cfg.LogPkgLevels = "*=C"
- cfg.SetupLogging()
+ cfg.Logger = "zap"
cfg.Dir = defaultLocalPersistentStorage
return cfg
}
diff --git a/pkg/ponresourcemanager/ponresourcemanager.go b/pkg/ponresourcemanager/ponresourcemanager.go
index 26bef32..0eec452 100755
--- a/pkg/ponresourcemanager/ponresourcemanager.go
+++ b/pkg/ponresourcemanager/ponresourcemanager.go
@@ -749,15 +749,16 @@
IntfID = SharedPoolID
}
var Path string
- if ResourceType == ONU_ID {
+ switch ResourceType {
+ case ONU_ID:
Path = fmt.Sprintf(ONU_ID_POOL_PATH, PONRMgr.DeviceID, IntfID)
- } else if ResourceType == ALLOC_ID {
+ case ALLOC_ID:
Path = fmt.Sprintf(ALLOC_ID_POOL_PATH, PONRMgr.DeviceID, IntfID)
- } else if ResourceType == GEMPORT_ID {
+ case GEMPORT_ID:
Path = fmt.Sprintf(GEMPORT_ID_POOL_PATH, PONRMgr.DeviceID, IntfID)
- } else if ResourceType == FLOW_ID {
+ case FLOW_ID:
Path = fmt.Sprintf(FLOW_ID_POOL_PATH, PONRMgr.DeviceID, IntfID)
- } else {
+ default:
logger.Error(ctx, "Invalid resource pool identifier")
}
return Path
diff --git a/pkg/probe/probe.go b/pkg/probe/probe.go
index 4b4bb1f..dba7ffa 100644
--- a/pkg/probe/probe.go
+++ b/pkg/probe/probe.go
@@ -240,7 +240,7 @@
}
comma := ""
for c, s := range p.status {
- if _, err := w.Write([]byte(fmt.Sprintf("%s\"%s\": \"%s\"", comma, c, s.String()))); err != nil {
+ if _, err := fmt.Fprintf(w, "%s\"%s\": \"%s\"", comma, c, s.String()); err != nil {
logger.Errorw(ctx, "write-response", log.Fields{"error": err})
w.WriteHeader(http.StatusInternalServerError)
return
diff --git a/pkg/stats/promserver_test.go b/pkg/stats/promserver_test.go
index 6db4dd0..cdb385d 100644
--- a/pkg/stats/promserver_test.go
+++ b/pkg/stats/promserver_test.go
@@ -23,6 +23,7 @@
"testing"
"time"
+ "github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -32,7 +33,9 @@
serverCtx, serverCancel := context.WithCancel(context.Background())
defer serverCancel()
- testPort := 34201
+ // Get a free port to avoid conflicts
+ testPort, err := freeport.GetFreePort()
+ require.NoError(t, err)
StatsServer.Start(serverCtx, testPort, VCore)