SEBA-902 single-olt tests;
Pin protoc-gen-go to 1.3.2 to resolve compatibility issue;
Run go mod tidy / go mod vendor on importer;
Add Go Module support to demotest
Change-Id: Ifde824fc9a6317b0adc1e12bea54ee1f9b788906
diff --git a/vendor/github.com/Shopify/sarama/.golangci.yml b/vendor/github.com/Shopify/sarama/.golangci.yml
new file mode 100644
index 0000000..561c2c2
--- /dev/null
+++ b/vendor/github.com/Shopify/sarama/.golangci.yml
@@ -0,0 +1,74 @@
+run:
+ timeout: 5m
+ deadline: 10m
+
+linters-settings:
+ govet:
+ check-shadowing: false
+ golint:
+ min-confidence: 0
+ gocyclo:
+ min-complexity: 99
+ maligned:
+ suggest-new: true
+ dupl:
+ threshold: 100
+ goconst:
+ min-len: 2
+ min-occurrences: 3
+ misspell:
+ locale: US
+ goimports:
+ local-prefixes: github.com/Shopify/sarama
+ gocritic:
+ enabled-tags:
+ - diagnostic
+ - experimental
+ - opinionated
+ - performance
+ - style
+ disabled-checks:
+ - wrapperFunc
+ - ifElseChain
+ funlen:
+ lines: 300
+ statements: 300
+
+linters:
+ disable-all: true
+ enable:
+ - bodyclose
+ # - deadcode
+ - depguard
+ - dogsled
+ # - dupl
+ # - errcheck
+ - funlen
+ # - gocritic
+ - gocyclo
+ - gofmt
+ # - goimports
+ # - golint
+ # - gosec
+ # - gosimple
+ - govet
+ # - ineffassign
+ # - interfacer
+ # - misspell
+ # - nakedret
+ # - scopelint
+ # - staticcheck
+ # - structcheck
+ # - stylecheck
+ - typecheck
+ # - unconvert
+ # - unused
+ # - varcheck
+ - whitespace
+ # - goconst
+ # - gochecknoinits
+
+issues:
+ exclude:
+ - consider giving a name to these results
+ - include an explanation for nolint directive
diff --git a/vendor/github.com/Shopify/sarama/.travis.yml b/vendor/github.com/Shopify/sarama/.travis.yml
deleted file mode 100644
index cace313..0000000
--- a/vendor/github.com/Shopify/sarama/.travis.yml
+++ /dev/null
@@ -1,37 +0,0 @@
-dist: xenial
-language: go
-go:
-- 1.12.x
-- 1.13.x
-
-env:
- global:
- - KAFKA_PEERS=localhost:9091,localhost:9092,localhost:9093,localhost:9094,localhost:9095
- - TOXIPROXY_ADDR=http://localhost:8474
- - KAFKA_INSTALL_ROOT=/home/travis/kafka
- - KAFKA_HOSTNAME=localhost
- - DEBUG=true
- matrix:
- - KAFKA_VERSION=2.2.1 KAFKA_SCALA_VERSION=2.12
- - KAFKA_VERSION=2.3.0 KAFKA_SCALA_VERSION=2.12
-
-before_install:
-- export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}
-- vagrant/install_cluster.sh
-- vagrant/boot_cluster.sh
-- vagrant/create_topics.sh
-- vagrant/run_java_producer.sh
-
-install: make install_dependencies
-
-script:
-- make test
-- make vet
-- make errcheck
-- if [[ "$TRAVIS_GO_VERSION" == 1.13* ]]; then make fmt; fi
-
-after_success:
-- go tool cover -func coverage.txt
-- bash <(curl -s https://codecov.io/bash)
-
-after_script: vagrant/halt_cluster.sh
diff --git a/vendor/github.com/Shopify/sarama/CHANGELOG.md b/vendor/github.com/Shopify/sarama/CHANGELOG.md
index 844f481..2bebeb1 100644
--- a/vendor/github.com/Shopify/sarama/CHANGELOG.md
+++ b/vendor/github.com/Shopify/sarama/CHANGELOG.md
@@ -1,5 +1,50 @@
# Changelog
+#### Unreleased
+
+#### Version 1.26.1 (2020-02-04)
+
+Improvements:
+- Add requests-in-flight metric ([1539](https://github.com/Shopify/sarama/pull/1539))
+- Fix misleading example for cluster admin ([1595](https://github.com/Shopify/sarama/pull/1595))
+- Replace Travis with GitHub Actions, linters housekeeping ([1573](https://github.com/Shopify/sarama/pull/1573))
+- Allow BalanceStrategy to provide custom assignment data ([1592](https://github.com/Shopify/sarama/pull/1592))
+
+Bug Fixes:
+- Adds back Consumer.Offsets.CommitInterval to fix API ([1590](https://github.com/Shopify/sarama/pull/1590))
+- Fix error message s/CommitInterval/AutoCommit.Interval ([1589](https://github.com/Shopify/sarama/pull/1589))
+
+#### Version 1.26.0 (2020-01-24)
+
+New Features:
+- Enable zstd compression
+ ([1574](https://github.com/Shopify/sarama/pull/1574),
+ [1582](https://github.com/Shopify/sarama/pull/1582))
+- Support headers in tools kafka-console-producer
+ ([1549](https://github.com/Shopify/sarama/pull/1549))
+
+Improvements:
+- Add SASL AuthIdentity to SASL frames (authzid)
+ ([1585](https://github.com/Shopify/sarama/pull/1585)).
+
+Bug Fixes:
+- Sending messages with ZStd compression enabled fails in multiple ways
+ ([1252](https://github.com/Shopify/sarama/issues/1252)).
+- Use the broker for any admin on BrokerConfig
+ ([1571](https://github.com/Shopify/sarama/pull/1571)).
+- Set DescribeConfigRequest Version field
+ ([1576](https://github.com/Shopify/sarama/pull/1576)).
+- ConsumerGroup flooding logs with client/metadata update req
+ ([1578](https://github.com/Shopify/sarama/pull/1578)).
+- MetadataRequest version in DescribeCluster
+ ([1580](https://github.com/Shopify/sarama/pull/1580)).
+- Fix deadlock in consumer group handleError
+ ([1581](https://github.com/Shopify/sarama/pull/1581))
+- Fill in the Fetch{Request,Response} protocol
+ ([1582](https://github.com/Shopify/sarama/pull/1582)).
+- Retry topic request on ControllerNotAvailable
+ ([1586](https://github.com/Shopify/sarama/pull/1586)).
+
#### Version 1.25.0 (2020-01-13)
New Features:
diff --git a/vendor/github.com/Shopify/sarama/Makefile b/vendor/github.com/Shopify/sarama/Makefile
index 9c8329e..c3b431a 100644
--- a/vendor/github.com/Shopify/sarama/Makefile
+++ b/vendor/github.com/Shopify/sarama/Makefile
@@ -1,56 +1,27 @@
-export GO111MODULE=on
+default: fmt get update test lint
-default: fmt vet errcheck test lint
+GO := GO111MODULE=on GOPRIVATE=github.com/linkedin GOSUMDB=off go
+GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
+GOTEST := $(GO) test -gcflags='-l' -p 3 -v -race -timeout 6m -coverprofile=profile.out -covermode=atomic
-# Taken from https://github.com/codecov/example-go#caveat-multiple-files
-.PHONY: test
-test:
- echo "mode: atomic" > coverage.txt
- for d in `go list ./...`; do \
- go test -p 1 -v -timeout 6m -race -coverprofile=profile.out -covermode=atomic $$d || exit 1; \
- if [ -f profile.out ]; then \
- tail +2 profile.out >> coverage.txt; \
- rm profile.out; \
- fi \
- done
+FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go')
+TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go')
-GOLINT := $(shell command -v golint)
-
-.PHONY: lint
-lint:
-ifndef GOLINT
- go get golang.org/x/lint/golint
-endif
- go list ./... | xargs golint
-
-.PHONY: vet
-vet:
- go vet ./...
-
-ERRCHECK := $(shell command -v errcheck)
-# See https://github.com/kisielk/errcheck/pull/141 for details on ignorepkg
-.PHONY: errcheck
-errcheck:
-ifndef ERRCHECK
- go get github.com/kisielk/errcheck
-endif
- errcheck -ignorepkg fmt github.com/Shopify/sarama/...
-
-.PHONY: fmt
-fmt:
- @if [ -n "$$(go fmt ./...)" ]; then echo 'Please run go fmt on your code.' && exit 1; fi
-
-.PHONY : install_dependencies
-install_dependencies: get
-
-.PHONY: get
get:
- go get -v ./...
+ $(GO) get ./...
+ $(GO) mod verify
+ $(GO) mod tidy
-.PHONY: clean
-clean:
- go clean ./...
+update:
+ $(GO) get -u -v all
+ $(GO) mod verify
+ $(GO) mod tidy
-.PHONY: tidy
-tidy:
- go mod tidy -v
+fmt:
+ gofmt -s -l -w $(FILES) $(TESTS)
+
+lint:
+ golangci-lint run
+
+test:
+ $(GOTEST) ./...
diff --git a/vendor/github.com/Shopify/sarama/Vagrantfile b/vendor/github.com/Shopify/sarama/Vagrantfile
index f4b848a..07d7ffb 100644
--- a/vendor/github.com/Shopify/sarama/Vagrantfile
+++ b/vendor/github.com/Shopify/sarama/Vagrantfile
@@ -1,14 +1,8 @@
-# -*- mode: ruby -*-
-# vi: set ft=ruby :
-
-# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
-VAGRANTFILE_API_VERSION = "2"
-
# We have 5 * 192MB ZK processes and 5 * 320MB Kafka processes => 2560MB
MEMORY = 3072
-Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
- config.vm.box = "ubuntu/trusty64"
+Vagrant.configure("2") do |config|
+ config.vm.box = "ubuntu/bionic64"
config.vm.provision :shell, path: "vagrant/provision.sh"
diff --git a/vendor/github.com/Shopify/sarama/admin.go b/vendor/github.com/Shopify/sarama/admin.go
index 6c9b1e9..dd63484 100644
--- a/vendor/github.com/Shopify/sarama/admin.go
+++ b/vendor/github.com/Shopify/sarama/admin.go
@@ -2,8 +2,11 @@
import (
"errors"
+ "fmt"
"math/rand"
+ "strconv"
"sync"
+ "time"
)
// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
@@ -132,8 +135,45 @@
return ca.client.Controller()
}
-func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
+func (ca *clusterAdmin) refreshController() (*Broker, error) {
+ return ca.client.RefreshController()
+}
+// isErrNoController returns `true` if the given error type unwraps to an
+// `ErrNotController` response from Kafka
+func isErrNoController(err error) bool {
+ switch e := err.(type) {
+ case *TopicError:
+ return e.Err == ErrNotController
+ case *TopicPartitionError:
+ return e.Err == ErrNotController
+ case KError:
+ return e == ErrNotController
+ }
+ return false
+}
+
+// retryOnError will repeatedly call the given (error-returning) func in the
+// case that its response is non-nil and retriable (as determined by the
+// provided retriable func) up to the maximum number of tries permitted by
+// the admin client configuration
+func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error {
+ var err error
+ for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
+ err = fn()
+ if err == nil || !retriable(err) {
+ return err
+ }
+ Logger.Printf(
+ "admin/request retrying after %dms... (%d attempts remaining)\n",
+ ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
+ time.Sleep(ca.conf.Admin.Retry.Backoff)
+ continue
+ }
+ return err
+}
+
+func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
if topic == "" {
return ErrInvalidTopic
}
@@ -158,26 +198,31 @@
request.Version = 2
}
- b, err := ca.Controller()
- if err != nil {
- return err
- }
+ return ca.retryOnError(isErrNoController, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
- rsp, err := b.CreateTopics(request)
- if err != nil {
- return err
- }
+ rsp, err := b.CreateTopics(request)
+ if err != nil {
+ return err
+ }
- topicErr, ok := rsp.TopicErrors[topic]
- if !ok {
- return ErrIncompleteResponse
- }
+ topicErr, ok := rsp.TopicErrors[topic]
+ if !ok {
+ return ErrIncompleteResponse
+ }
- if topicErr.Err != ErrNoError {
- return topicErr
- }
+ if topicErr.Err != ErrNoError {
+ if topicErr.Err == ErrNotController {
+ _, _ = ca.refreshController()
+ }
+ return topicErr
+ }
- return nil
+ return nil
+ })
}
func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
@@ -214,7 +259,7 @@
Topics: []string{},
}
- if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+ if ca.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 1
}
@@ -226,6 +271,16 @@
return response.Brokers, response.ControllerID, nil
}
+func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
+ brokers := ca.client.Brokers()
+ for _, b := range brokers {
+ if b.ID() == id {
+ return b, nil
+ }
+ }
+ return nil, fmt.Errorf("could not find broker id %d", id)
+}
+
func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
brokers := ca.client.Brokers()
if len(brokers) > 0 {
@@ -308,7 +363,6 @@
}
func (ca *clusterAdmin) DeleteTopic(topic string) error {
-
if topic == "" {
return ErrInvalidTopic
}
@@ -322,25 +376,31 @@
request.Version = 1
}
- b, err := ca.Controller()
- if err != nil {
- return err
- }
+ return ca.retryOnError(isErrNoController, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
- rsp, err := b.DeleteTopics(request)
- if err != nil {
- return err
- }
+ rsp, err := b.DeleteTopics(request)
+ if err != nil {
+ return err
+ }
- topicErr, ok := rsp.TopicErrorCodes[topic]
- if !ok {
- return ErrIncompleteResponse
- }
+ topicErr, ok := rsp.TopicErrorCodes[topic]
+ if !ok {
+ return ErrIncompleteResponse
+ }
- if topicErr != ErrNoError {
- return topicErr
- }
- return nil
+ if topicErr != ErrNoError {
+ if topicErr == ErrNotController {
+ _, _ = ca.refreshController()
+ }
+ return topicErr
+ }
+
+ return nil
+ })
}
func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
@@ -356,30 +416,34 @@
Timeout: ca.conf.Admin.Timeout,
}
- b, err := ca.Controller()
- if err != nil {
- return err
- }
+ return ca.retryOnError(isErrNoController, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
- rsp, err := b.CreatePartitions(request)
- if err != nil {
- return err
- }
+ rsp, err := b.CreatePartitions(request)
+ if err != nil {
+ return err
+ }
- topicErr, ok := rsp.TopicPartitionErrors[topic]
- if !ok {
- return ErrIncompleteResponse
- }
+ topicErr, ok := rsp.TopicPartitionErrors[topic]
+ if !ok {
+ return ErrIncompleteResponse
+ }
- if topicErr.Err != ErrNoError {
- return topicErr
- }
+ if topicErr.Err != ErrNoError {
+ if topicErr.Err == ErrNotController {
+ _, _ = ca.refreshController()
+ }
+ return topicErr
+ }
- return nil
+ return nil
+ })
}
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
-
if topic == "" {
return ErrInvalidTopic
}
@@ -432,8 +496,14 @@
return nil
}
-func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
+// Returns a bool indicating whether the resource request needs to go to a
+// specific broker
+func dependsOnSpecificNode(resource ConfigResource) bool {
+ return (resource.Type == BrokerResource && resource.Name != "") ||
+ resource.Type == BrokerLoggerResource
+}
+func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
var entries []ConfigEntry
var resources []*ConfigResource
resources = append(resources, &resource)
@@ -442,11 +512,31 @@
Resources: resources,
}
- b, err := ca.Controller()
+ if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+ request.Version = 1
+ }
+
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 2
+ }
+
+ var (
+ b *Broker
+ err error
+ )
+
+ // DescribeConfig of broker/broker logger must be sent to the broker in question
+ if dependsOnSpecificNode(resource) {
+ id, _ := strconv.Atoi(resource.Name)
+ b, err = ca.findBroker(int32(id))
+ } else {
+ b, err = ca.findAnyBroker()
+ }
if err != nil {
return nil, err
}
+ _ = b.Open(ca.client.Config())
rsp, err := b.DescribeConfigs(request)
if err != nil {
return nil, err
@@ -466,7 +556,6 @@
}
func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
-
var resources []*AlterConfigsResource
resources = append(resources, &AlterConfigsResource{
Type: resourceType,
@@ -479,11 +568,23 @@
ValidateOnly: validateOnly,
}
- b, err := ca.Controller()
+ var (
+ b *Broker
+ err error
+ )
+
+ // AlterConfig of broker/broker logger must be sent to the broker in question
+ if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
+ id, _ := strconv.Atoi(name)
+ b, err = ca.findBroker(int32(id))
+ } else {
+ b, err = ca.findAnyBroker()
+ }
if err != nil {
return err
}
+ _ = b.Open(ca.client.Config())
rsp, err := b.AlterConfigs(request)
if err != nil {
return err
@@ -518,7 +619,6 @@
}
func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
-
request := &DescribeAclsRequest{AclFilter: filter}
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
@@ -566,7 +666,6 @@
for _, mACL := range fr.MatchingAcls {
mAcls = append(mAcls, *mACL)
}
-
}
return mAcls, nil
}
@@ -580,7 +679,6 @@
return nil, err
}
groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
-
}
for broker, brokerGroups := range groupsPerBroker {
@@ -623,7 +721,6 @@
}
groupMaps <- groups
-
}(b, ca.conf)
}
diff --git a/vendor/github.com/Shopify/sarama/balance_strategy.go b/vendor/github.com/Shopify/sarama/balance_strategy.go
index 67c4d96..56da276 100644
--- a/vendor/github.com/Shopify/sarama/balance_strategy.go
+++ b/vendor/github.com/Shopify/sarama/balance_strategy.go
@@ -47,6 +47,10 @@
// Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
// and returns a distribution plan.
Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
+
+ // AssignmentData returns the serialized assignment data for the specified
+ // memberID
+ AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
}
// --------------------------------------------------------------------
@@ -132,6 +136,11 @@
return plan, nil
}
+// AssignmentData simple strategies do not require any shared assignment data
+func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
+ return nil, nil
+}
+
type balanceStrategySortable struct {
topic string
memberIDs []string
@@ -268,6 +277,15 @@
return plan, nil
}
+// AssignmentData serializes the set of topics currently assigned to the
+// specified member as part of the supplied balance plan
+func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
+ return encode(&StickyAssignorUserDataV1{
+ Topics: topics,
+ Generation: generationID,
+ }, nil)
+}
+
func strsContains(s []string, value string) bool {
for _, entry := range s {
if entry == value {
diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go
index 8146749..d27ebd2 100644
--- a/vendor/github.com/Shopify/sarama/broker.go
+++ b/vendor/github.com/Shopify/sarama/broker.go
@@ -40,6 +40,7 @@
outgoingByteRate metrics.Meter
responseRate metrics.Meter
responseSize metrics.Histogram
+ requestsInFlight metrics.Counter
brokerIncomingByteRate metrics.Meter
brokerRequestRate metrics.Meter
brokerRequestSize metrics.Histogram
@@ -47,6 +48,7 @@
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram
+ brokerRequestsInFlight metrics.Counter
kerberosAuthenticator GSSAPIKerberosAuth
}
@@ -182,6 +184,7 @@
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
+ b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
// the same id (-1) and are already exposed through the global metrics above
if b.id >= 0 {
@@ -189,7 +192,6 @@
}
if conf.Net.SASL.Enable {
-
b.connErr = b.authenticateViaSASL()
if b.connErr != nil {
@@ -713,16 +715,19 @@
}
requestTime := time.Now()
+ // Will be decremented in responseReceiver (except error or request with NoResponse)
+ b.addRequestInFlightMetrics(1)
bytes, err := b.write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
return nil, err
}
b.correlationID++
if !promiseResponse {
// Record request latency without the response
- b.updateRequestLatencyMetrics(time.Since(requestTime))
+ b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
return nil, nil
}
@@ -817,6 +822,9 @@
for response := range b.responses {
if dead != nil {
+ // This was previously incremented in send() and
+ // we are not calling updateIncomingCommunicationMetrics()
+ b.addRequestInFlightMetrics(-1)
response.errors <- dead
continue
}
@@ -892,9 +900,12 @@
}
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
bytes, err := b.write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
return err
}
@@ -903,6 +914,7 @@
header := make([]byte, 8) // response header
_, err = b.readFull(header)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
return err
}
@@ -911,6 +923,7 @@
payload := make([]byte, length-4)
n, err := b.readFull(payload)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
return err
}
@@ -961,7 +974,6 @@
// default to V0 to allow for backward compatability when SASL is enabled
// but not the handshake
if b.conf.Net.SASL.Handshake {
-
handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
@@ -977,16 +989,18 @@
// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
-
- length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
+ length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
authBytes := make([]byte, length+4) //4 byte length header + auth data
binary.BigEndian.PutUint32(authBytes, uint32(length))
- copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
+ copy(authBytes[4:], []byte(b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
bytesWritten, err := b.write(authBytes)
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
@@ -1011,11 +1025,13 @@
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
-
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
@@ -1068,16 +1084,18 @@
// if the broker responds with a challenge, in which case the token is
// rejected.
func (b *Broker) sendClientMessage(message []byte) (bool, error) {
-
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
correlationID := b.correlationID
bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
return false, err
}
- b.updateOutgoingCommunicationMetrics(bytesWritten)
b.correlationID++
res := &SaslAuthenticateResponse{}
@@ -1108,22 +1126,25 @@
msg, err := scramClient.Step("")
if err != nil {
return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
-
}
for !scramClient.Done() {
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
correlationID := b.correlationID
bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
- b.updateOutgoingCommunicationMetrics(bytesWritten)
b.correlationID++
challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
return err
}
@@ -1216,7 +1237,7 @@
}
func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
- authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
+ authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
rb := &SaslAuthenticateRequest{authBytes}
req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.conf.MetricRegistry)
@@ -1228,7 +1249,6 @@
}
func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
-
rb := &SaslAuthenticateRequest{initialResp}
req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
@@ -1277,7 +1297,7 @@
}
func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
- b.updateRequestLatencyMetrics(requestLatency)
+ b.updateRequestLatencyAndInFlightMetrics(requestLatency)
b.responseRate.Mark(1)
if b.brokerResponseRate != nil {
@@ -1296,7 +1316,7 @@
}
}
-func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
+func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
requestLatencyInMs := int64(requestLatency / time.Millisecond)
b.requestLatency.Update(requestLatencyInMs)
@@ -1304,6 +1324,14 @@
b.brokerRequestLatency.Update(requestLatencyInMs)
}
+ b.addRequestInFlightMetrics(-1)
+}
+
+func (b *Broker) addRequestInFlightMetrics(i int64) {
+ b.requestsInFlight.Inc(i)
+ if b.brokerRequestsInFlight != nil {
+ b.brokerRequestsInFlight.Inc(i)
+ }
}
func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
@@ -1322,7 +1350,6 @@
if b.brokerRequestSize != nil {
b.brokerRequestSize.Update(requestSize)
}
-
}
func (b *Broker) registerMetrics() {
@@ -1333,6 +1360,7 @@
b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
b.brokerResponseRate = b.registerMeter("response-rate")
b.brokerResponseSize = b.registerHistogram("response-size")
+ b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
}
func (b *Broker) unregisterMetrics() {
@@ -1352,3 +1380,9 @@
b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
}
+
+func (b *Broker) registerCounter(name string) metrics.Counter {
+ nameForBroker := getMetricNameForBroker(name, b)
+ b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+ return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
+}
diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go
index e5b3557..057a57a 100644
--- a/vendor/github.com/Shopify/sarama/client.go
+++ b/vendor/github.com/Shopify/sarama/client.go
@@ -17,9 +17,15 @@
// altered after it has been created.
Config() *Config
- // Controller returns the cluster controller broker. Requires Kafka 0.10 or higher.
+ // Controller returns the cluster controller broker. It will return a
+ // locally cached value if it's available. You can call RefreshController
+ // to update the cached value. Requires Kafka 0.10 or higher.
Controller() (*Broker, error)
+ // RefreshController retrieves the cluster controller from fresh metadata
+ // and stores it in the local cache. Requires Kafka 0.10 or higher.
+ RefreshController() (*Broker, error)
+
// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker
@@ -193,7 +199,6 @@
func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
var err error
for broker := client.any(); broker != nil; broker = client.any() {
-
req := &InitProducerIDRequest{}
response, err := broker.InitProducerID(req)
@@ -487,6 +492,35 @@
return controller, nil
}
+// deregisterController removes the cached controllerID
+func (client *client) deregisterController() {
+ client.lock.Lock()
+ defer client.lock.Unlock()
+ delete(client.brokers, client.controllerID)
+}
+
+// RefreshController retrieves the cluster controller from fresh metadata
+// and stores it in the local cache. Requires Kafka 0.10 or higher.
+func (client *client) RefreshController() (*Broker, error) {
+ if client.Closed() {
+ return nil, ErrClosedClient
+ }
+
+ client.deregisterController()
+
+ if err := client.refreshMetadata(); err != nil {
+ return nil, err
+ }
+
+ controller := client.cachedController()
+ if controller == nil {
+ return nil, ErrControllerNotAvailable
+ }
+
+ _ = controller.Open(client.conf)
+ return controller, nil
+}
+
func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
diff --git a/vendor/github.com/Shopify/sarama/config.go b/vendor/github.com/Shopify/sarama/config.go
index 69c7161..7495200 100644
--- a/vendor/github.com/Shopify/sarama/config.go
+++ b/vendor/github.com/Shopify/sarama/config.go
@@ -21,6 +21,13 @@
type Config struct {
// Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
Admin struct {
+ Retry struct {
+ // The total number of times to retry sending (retriable) admin requests (default 5).
+ // Similar to the `retries` setting of the JVM AdminClientConfig.
+ Max int
+ // Backoff time between retries of a failed request (default 100ms)
+ Backoff time.Duration
+ }
// The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
// including topics, brokers, configurations and ACLs (defaults to 3 seconds).
Timeout time.Duration
@@ -65,8 +72,15 @@
// (defaults to true). You should only set this to false if you're using
// a non-Kafka SASL proxy.
Handshake bool
- //username and password for SASL/PLAIN or SASL/SCRAM authentication
- User string
+ // AuthIdentity is an (optional) authorization identity (authzid) to
+ // use for SASL/PLAIN authentication (if different from User) when
+ // an authenticated user is permitted to act as the presented
+ // alternative user. See RFC4616 for details.
+ AuthIdentity string
+ // User is the authentication identity (authcid) to present for
+ // SASL/PLAIN or SASL/SCRAM authentication
+ User string
+ // Password for SASL/PLAIN authentication
Password string
// authz id used for SASL/SCRAM authentication
SCRAMAuthzID string
@@ -338,6 +352,11 @@
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
+ // Deprecated: CommitInterval exists for historical compatibility
+ // and should not be used. Please use Consumer.Offsets.AutoCommit
+ CommitInterval time.Duration
+
+ // AutoCommit specifies configuration for commit messages automatically.
AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
@@ -401,6 +420,8 @@
func NewConfig() *Config {
c := &Config{}
+ c.Admin.Retry.Max = 5
+ c.Admin.Retry.Backoff = 100 * time.Millisecond
c.Admin.Timeout = 3 * time.Second
c.Net.MaxOpenRequests = 5
@@ -629,6 +650,10 @@
}
}
+ if c.Producer.Compression == CompressionZSTD && !c.Version.IsAtLeast(V2_1_0_0) {
+ return ConfigurationError("zstd compression requires Version >= V2_1_0_0")
+ }
+
if c.Producer.Idempotent {
if !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
@@ -659,7 +684,7 @@
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
case c.Consumer.Offsets.AutoCommit.Interval <= 0:
- return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
+ return ConfigurationError("Consumer.Offsets.AutoCommit.Interval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
case c.Consumer.Offsets.Retry.Max < 0:
@@ -668,6 +693,11 @@
return ConfigurationError("Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted")
}
+ if c.Consumer.Offsets.CommitInterval != 0 {
+ Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" +
+ " and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored")
+ }
+
// validate IsolationLevel
if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("ReadCommitted requires Version >= V0_11_0_0")
diff --git a/vendor/github.com/Shopify/sarama/config_resource_type.go b/vendor/github.com/Shopify/sarama/config_resource_type.go
index 5399d75..bef1053 100644
--- a/vendor/github.com/Shopify/sarama/config_resource_type.go
+++ b/vendor/github.com/Shopify/sarama/config_resource_type.go
@@ -1,22 +1,18 @@
package sarama
-//ConfigResourceType is a type for config resource
+// ConfigResourceType is a type for resources that have configs.
type ConfigResourceType int8
-// Taken from :
-// https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
+// Taken from:
+// https://github.com/apache/kafka/blob/ed7c071e07f1f90e4c2895582f61ca090ced3c42/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L32-L55
const (
- //UnknownResource constant type
- UnknownResource ConfigResourceType = iota
- //AnyResource constant type
- AnyResource
- //TopicResource constant type
- TopicResource
- //GroupResource constant type
- GroupResource
- //ClusterResource constant type
- ClusterResource
- //BrokerResource constant type
- BrokerResource
+ // UnknownResource constant type
+ UnknownResource ConfigResourceType = 0
+ // TopicResource constant type
+ TopicResource ConfigResourceType = 2
+ // BrokerResource constant type
+ BrokerResource ConfigResourceType = 4
+ // BrokerLoggerResource constant type
+ BrokerLoggerResource ConfigResourceType = 8
)
diff --git a/vendor/github.com/Shopify/sarama/consumer.go b/vendor/github.com/Shopify/sarama/consumer.go
index 72c4d7c..ff44ade 100644
--- a/vendor/github.com/Shopify/sarama/consumer.go
+++ b/vendor/github.com/Shopify/sarama/consumer.go
@@ -888,6 +888,10 @@
request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
}
+ if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
+ request.Version = 10
+ }
+
for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
}
diff --git a/vendor/github.com/Shopify/sarama/consumer_group.go b/vendor/github.com/Shopify/sarama/consumer_group.go
index b974dd9..951f64b 100644
--- a/vendor/github.com/Shopify/sarama/consumer_group.go
+++ b/vendor/github.com/Shopify/sarama/consumer_group.go
@@ -120,9 +120,6 @@
c.closeOnce.Do(func() {
close(c.closed)
- c.lock.Lock()
- defer c.lock.Unlock()
-
// leave group
if e := c.leave(); e != nil {
err = e
@@ -175,6 +172,7 @@
// loop check topic partition numbers changed
// will trigger rebalance when any topic partitions number had changed
+ // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
go c.loopCheckPartitionNumbers(topics, sess)
// Wait for session exit signal
@@ -333,20 +331,14 @@
MemberId: c.memberID,
GenerationId: generationID,
}
+ strategy := c.config.Consumer.Group.Rebalance.Strategy
for memberID, topics := range plan {
assignment := &ConsumerGroupMemberAssignment{Topics: topics}
-
- // Include topic assignments in group-assignment userdata for each consumer-group member
- if c.config.Consumer.Group.Rebalance.Strategy.Name() == StickyBalanceStrategyName {
- userDataBytes, err := encode(&StickyAssignorUserDataV1{
- Topics: topics,
- Generation: generationID,
- }, nil)
- if err != nil {
- return nil, err
- }
- assignment.UserData = userDataBytes
+ userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
+ if err != nil {
+ return nil, err
}
+ assignment.UserData = userDataBytes
if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
return nil, err
}
@@ -384,8 +376,10 @@
return strategy.Plan(members, topics)
}
-// Leaves the cluster, called by Close, protected by lock.
+// Leaves the cluster, called by Close.
func (c *consumerGroup) leave() error {
+ c.lock.Lock()
+ defer c.lock.Unlock()
if c.memberID == "" {
return nil
}
@@ -430,9 +424,6 @@
return
}
- c.lock.Lock()
- defer c.lock.Unlock()
-
select {
case <-c.closed:
//consumer is closed
@@ -448,7 +439,7 @@
}
func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
- pause := time.NewTicker(c.config.Consumer.Group.Heartbeat.Interval * 2)
+ pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
defer session.cancel()
defer pause.Stop()
var oldTopicToPartitionNum map[string]int
@@ -468,6 +459,10 @@
}
select {
case <-pause.C:
+ case <-session.ctx.Done():
+ Logger.Printf("loop check partition number coroutine will exit, topics %s", topics)
+ // if session closed by other, should be exited
+ return
case <-c.closed:
return
}
@@ -475,10 +470,6 @@
}
func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
- if err := c.client.RefreshMetadata(topics...); err != nil {
- Logger.Printf("Consumer Group refresh metadata failed %v", err)
- return nil, err
- }
topicToPartitionNum := make(map[string]int, len(topics))
for _, topic := range topics {
if partitionNum, err := c.client.Partitions(topic); err != nil {
diff --git a/vendor/github.com/Shopify/sarama/describe_configs_response.go b/vendor/github.com/Shopify/sarama/describe_configs_response.go
index 5737232..a18eeba 100644
--- a/vendor/github.com/Shopify/sarama/describe_configs_response.go
+++ b/vendor/github.com/Shopify/sarama/describe_configs_response.go
@@ -277,7 +277,6 @@
}
r.Synonyms[i] = s
}
-
}
return nil
}
diff --git a/vendor/github.com/Shopify/sarama/dev.yml b/vendor/github.com/Shopify/sarama/dev.yml
index 4c030de..6f6807e 100644
--- a/vendor/github.com/Shopify/sarama/dev.yml
+++ b/vendor/github.com/Shopify/sarama/dev.yml
@@ -2,7 +2,7 @@
up:
- go:
- version: '1.13.4'
+ version: '1.13.7'
commands:
test:
diff --git a/vendor/github.com/Shopify/sarama/fetch_request.go b/vendor/github.com/Shopify/sarama/fetch_request.go
index 4db9ddd..836e6de 100644
--- a/vendor/github.com/Shopify/sarama/fetch_request.go
+++ b/vendor/github.com/Shopify/sarama/fetch_request.go
@@ -1,20 +1,41 @@
package sarama
type fetchRequestBlock struct {
- fetchOffset int64
- maxBytes int32
+ Version int16
+ currentLeaderEpoch int32
+ fetchOffset int64
+ logStartOffset int64
+ maxBytes int32
}
-func (b *fetchRequestBlock) encode(pe packetEncoder) error {
+func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
+ b.Version = version
+ if b.Version >= 9 {
+ pe.putInt32(b.currentLeaderEpoch)
+ }
pe.putInt64(b.fetchOffset)
+ if b.Version >= 5 {
+ pe.putInt64(b.logStartOffset)
+ }
pe.putInt32(b.maxBytes)
return nil
}
-func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
+func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) {
+ b.Version = version
+ if b.Version >= 9 {
+ if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
+ return err
+ }
+ }
if b.fetchOffset, err = pd.getInt64(); err != nil {
return err
}
+ if b.Version >= 5 {
+ if b.logStartOffset, err = pd.getInt64(); err != nil {
+ return err
+ }
+ }
if b.maxBytes, err = pd.getInt32(); err != nil {
return err
}
@@ -25,12 +46,15 @@
// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
type FetchRequest struct {
- MaxWaitTime int32
- MinBytes int32
- MaxBytes int32
- Version int16
- Isolation IsolationLevel
- blocks map[string]map[int32]*fetchRequestBlock
+ MaxWaitTime int32
+ MinBytes int32
+ MaxBytes int32
+ Version int16
+ Isolation IsolationLevel
+ SessionID int32
+ SessionEpoch int32
+ blocks map[string]map[int32]*fetchRequestBlock
+ forgotten map[string][]int32
}
type IsolationLevel int8
@@ -50,6 +74,10 @@
if r.Version >= 4 {
pe.putInt8(int8(r.Isolation))
}
+ if r.Version >= 7 {
+ pe.putInt32(r.SessionID)
+ pe.putInt32(r.SessionEpoch)
+ }
err = pe.putArrayLength(len(r.blocks))
if err != nil {
return err
@@ -65,17 +93,38 @@
}
for partition, block := range blocks {
pe.putInt32(partition)
- err = block.encode(pe)
+ err = block.encode(pe, r.Version)
if err != nil {
return err
}
}
}
+ if r.Version >= 7 {
+ err = pe.putArrayLength(len(r.forgotten))
+ if err != nil {
+ return err
+ }
+ for topic, partitions := range r.forgotten {
+ err = pe.putString(topic)
+ if err != nil {
+ return err
+ }
+ err = pe.putArrayLength(len(partitions))
+ if err != nil {
+ return err
+ }
+ for _, partition := range partitions {
+ pe.putInt32(partition)
+ }
+ }
+ }
+
return nil
}
func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
+
if _, err = pd.getInt32(); err != nil {
return err
}
@@ -97,6 +146,16 @@
}
r.Isolation = IsolationLevel(isolation)
}
+ if r.Version >= 7 {
+ r.SessionID, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+ r.SessionEpoch, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+ }
topicCount, err := pd.getArrayLength()
if err != nil {
return err
@@ -121,12 +180,43 @@
return err
}
fetchBlock := &fetchRequestBlock{}
- if err = fetchBlock.decode(pd); err != nil {
+ if err = fetchBlock.decode(pd, r.Version); err != nil {
return err
}
r.blocks[topic][partition] = fetchBlock
}
}
+
+ if r.Version >= 7 {
+ forgottenCount, err := pd.getArrayLength()
+ if err != nil {
+ return err
+ }
+ if forgottenCount == 0 {
+ return nil
+ }
+ r.forgotten = make(map[string][]int32)
+ for i := 0; i < forgottenCount; i++ {
+ topic, err := pd.getString()
+ if err != nil {
+ return err
+ }
+ partitionCount, err := pd.getArrayLength()
+ if err != nil {
+ return err
+ }
+ r.forgotten[topic] = make([]int32, partitionCount)
+
+ for j := 0; j < partitionCount; j++ {
+ partition, err := pd.getInt32()
+ if err != nil {
+ return err
+ }
+ r.forgotten[topic][j] = partition
+ }
+ }
+ }
+
return nil
}
@@ -140,16 +230,28 @@
func (r *FetchRequest) requiredVersion() KafkaVersion {
switch r.Version {
+ case 0:
+ return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
- case 4:
+ case 4, 5:
return V0_11_0_0
+ case 6:
+ return V1_0_0_0
+ case 7:
+ return V1_1_0_0
+ case 8:
+ return V2_0_0_0
+ case 9, 10:
+ return V2_1_0_0
+ case 11:
+ return V2_3_0_0
default:
- return MinVersion
+ return MaxVersion
}
}
@@ -158,13 +260,21 @@
r.blocks = make(map[string]map[int32]*fetchRequestBlock)
}
+ if r.Version >= 7 && r.forgotten == nil {
+ r.forgotten = make(map[string][]int32)
+ }
+
if r.blocks[topic] == nil {
r.blocks[topic] = make(map[int32]*fetchRequestBlock)
}
tmp := new(fetchRequestBlock)
+ tmp.Version = r.Version
tmp.maxBytes = maxBytes
tmp.fetchOffset = fetchOffset
+ if r.Version >= 9 {
+ tmp.currentLeaderEpoch = int32(-1)
+ }
r.blocks[topic][partitionID] = tmp
}
diff --git a/vendor/github.com/Shopify/sarama/fetch_response.go b/vendor/github.com/Shopify/sarama/fetch_response.go
index 3afc187..26936d9 100644
--- a/vendor/github.com/Shopify/sarama/fetch_response.go
+++ b/vendor/github.com/Shopify/sarama/fetch_response.go
@@ -33,6 +33,7 @@
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
+ LogStartOffset int64
AbortedTransactions []*AbortedTransaction
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
@@ -57,6 +58,13 @@
return err
}
+ if version >= 5 {
+ b.LogStartOffset, err = pd.getInt64()
+ if err != nil {
+ return err
+ }
+ }
+
numTransact, err := pd.getArrayLength()
if err != nil {
return err
@@ -166,6 +174,10 @@
if version >= 4 {
pe.putInt64(b.LastStableOffset)
+ if version >= 5 {
+ pe.putInt64(b.LogStartOffset)
+ }
+
if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
return err
}
@@ -200,7 +212,9 @@
type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
- Version int16 // v1 requires 0.9+, v2 requires 0.10+
+ ErrorCode int16
+ SessionID int32
+ Version int16
LogAppendTime bool
Timestamp time.Time
}
@@ -216,6 +230,17 @@
r.ThrottleTime = time.Duration(throttle) * time.Millisecond
}
+ if r.Version >= 7 {
+ r.ErrorCode, err = pd.getInt16()
+ if err != nil {
+ return err
+ }
+ r.SessionID, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+ }
+
numTopics, err := pd.getArrayLength()
if err != nil {
return err
@@ -258,6 +283,11 @@
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
}
+ if r.Version >= 7 {
+ pe.putInt16(r.ErrorCode)
+ pe.putInt32(r.SessionID)
+ }
+
err = pe.putArrayLength(len(r.Blocks))
if err != nil {
return err
@@ -281,7 +311,6 @@
return err
}
}
-
}
return nil
}
@@ -296,16 +325,28 @@
func (r *FetchResponse) requiredVersion() KafkaVersion {
switch r.Version {
+ case 0:
+ return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
- case 4:
+ case 4, 5:
return V0_11_0_0
+ case 6:
+ return V1_0_0_0
+ case 7:
+ return V1_1_0_0
+ case 8:
+ return V2_0_0_0
+ case 9, 10:
+ return V2_1_0_0
+ case 11:
+ return V2_3_0_0
default:
- return MinVersion
+ return MaxVersion
}
}
diff --git a/vendor/github.com/Shopify/sarama/go.mod b/vendor/github.com/Shopify/sarama/go.mod
index 8ba2c91..1dca1cc 100644
--- a/vendor/github.com/Shopify/sarama/go.mod
+++ b/vendor/github.com/Shopify/sarama/go.mod
@@ -5,25 +5,30 @@
require (
github.com/Shopify/toxiproxy v2.1.4+incompatible
github.com/davecgh/go-spew v1.1.1
- github.com/eapache/go-resiliency v1.1.0
+ github.com/eapache/go-resiliency v1.2.0
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
github.com/eapache/queue v1.1.0
github.com/fortytw2/leaktest v1.3.0
- github.com/frankban/quicktest v1.4.1 // indirect
+ github.com/frankban/quicktest v1.7.2 // indirect
github.com/golang/snappy v0.0.1 // indirect
- github.com/hashicorp/go-uuid v1.0.1 // indirect
- github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 // indirect
- github.com/klauspost/compress v1.9.7
- github.com/pierrec/lz4 v2.2.6+incompatible
- github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
- github.com/stretchr/testify v1.3.0
+ github.com/google/go-cmp v0.4.0 // indirect
+ github.com/hashicorp/go-uuid v1.0.2 // indirect
+ github.com/jcmturner/gofork v1.0.0 // indirect
+ github.com/klauspost/compress v1.9.8
+ github.com/kr/pretty v0.2.0 // indirect
+ github.com/pierrec/lz4 v2.4.1+incompatible
+ github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563
+ github.com/stretchr/testify v1.4.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/xdg/stringprep v1.0.0 // indirect
- golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 // indirect
- golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
+ golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 // indirect
+ golang.org/x/net v0.0.0-20200202094626-16171245cfb2
+ golang.org/x/text v0.3.2 // indirect
+ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
- gopkg.in/jcmturner/gokrb5.v7 v7.2.3
+ gopkg.in/jcmturner/gokrb5.v7 v7.5.0
gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
+ gopkg.in/yaml.v2 v2.2.8 // indirect
)
diff --git a/vendor/github.com/Shopify/sarama/go.sum b/vendor/github.com/Shopify/sarama/go.sum
index 7f61258..06ec328 100644
--- a/vendor/github.com/Shopify/sarama/go.sum
+++ b/vendor/github.com/Shopify/sarama/go.sum
@@ -1,69 +1,81 @@
-github.com/DataDog/zstd v1.4.0 h1:vhoV+DUHnRZdKW1i5UMjAk2G4JY8wN4ayRfYDNdEhwo=
-github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
-github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
+github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
+github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
-github.com/frankban/quicktest v1.4.1 h1:Wv2VwvNn73pAdFIVUQRXYDFp31lXKbqblIXo/Q5GPSg=
-github.com/frankban/quicktest v1.4.1/go.mod h1:36zfPVQyHxymz4cH7wlDmVwDrJuljRB60qkgn7rorfQ=
+github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3BTYk=
+github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
-github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
-github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
-github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
-github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
-github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
-github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
-github.com/klauspost/compress v1.8.1 h1:oygt2ychZFHOB6M9gUgajzgKrwRgHbGC77NwA4COVgI=
-github.com/klauspost/compress v1.8.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
-github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs=
-github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
-github.com/klauspost/compress v1.9.7 h1:hYW1gP94JUmAhBtJ+LNz5My+gBobDxPR1iVuKug26aA=
-github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
+github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=
+github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
+github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
+github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
+github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
-github.com/pierrec/lz4 v2.2.6+incompatible h1:6aCX4/YZ9v8q69hTyiR7dNLnTA3fgtKHVVW5BCd5Znw=
-github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg=
+github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
-github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ=
+github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
-github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 h1:bselrhR0Or1vomJZC8ZIjWtbDmn9OYFLX5Ik9alpJpE=
-golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
+golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 h1:+ELyKg6m8UBf0nPFSqD0mi7zUfwPyXo23HNjMnXPz7w=
+golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
+golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
gopkg.in/jcmturner/goidentity.v3 v3.0.0 h1:1duIyWiTaYvVx3YX2CYtpJbUFd7/UuPYCfgXtQ3VTbI=
gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4=
-gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010=
-gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
+gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg=
+gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/vendor/github.com/Shopify/sarama/gssapi_kerberos.go b/vendor/github.com/Shopify/sarama/gssapi_kerberos.go
index 57f3ecb..32ca93d 100644
--- a/vendor/github.com/Shopify/sarama/gssapi_kerberos.go
+++ b/vendor/github.com/Shopify/sarama/gssapi_kerberos.go
@@ -200,7 +200,6 @@
/* This does the handshake for authorization */
func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
-
kerberosClient, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
if err != nil {
Logger.Printf("Kerberos client error: %s", err)
diff --git a/vendor/github.com/Shopify/sarama/message.go b/vendor/github.com/Shopify/sarama/message.go
index 7c54748..e48566b 100644
--- a/vendor/github.com/Shopify/sarama/message.go
+++ b/vendor/github.com/Shopify/sarama/message.go
@@ -85,7 +85,6 @@
payload = m.compressedCache
m.compressedCache = nil
} else if m.Value != nil {
-
payload, err = compress(m.Codec, m.CompressionLevel, m.Value)
if err != nil {
return err
diff --git a/vendor/github.com/Shopify/sarama/metadata_response.go b/vendor/github.com/Shopify/sarama/metadata_response.go
index b2d532e..916992d 100644
--- a/vendor/github.com/Shopify/sarama/metadata_response.go
+++ b/vendor/github.com/Shopify/sarama/metadata_response.go
@@ -318,5 +318,4 @@
pmatch.Isr = isr
pmatch.OfflineReplicas = offline
pmatch.Err = err
-
}
diff --git a/vendor/github.com/Shopify/sarama/mockbroker.go b/vendor/github.com/Shopify/sarama/mockbroker.go
index 4ed46a6..cd1a850 100644
--- a/vendor/github.com/Shopify/sarama/mockbroker.go
+++ b/vendor/github.com/Shopify/sarama/mockbroker.go
@@ -235,7 +235,6 @@
var bytesWritten int
var bytesRead int
for {
-
buffer, err := b.readToBytes(conn)
if err != nil {
Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
@@ -245,7 +244,6 @@
bytesWritten = 0
if !b.isGSSAPI(buffer) {
-
req, br, err := decodeRequest(bytes.NewReader(buffer))
bytesRead = br
if err != nil {
@@ -294,7 +292,6 @@
break
}
bytesWritten = len(resHeader) + len(encodedRes)
-
} else {
// GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
// Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
@@ -317,7 +314,6 @@
b.notifier(bytesRead, bytesWritten)
}
b.lock.Unlock()
-
}
Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
}
diff --git a/vendor/github.com/Shopify/sarama/mockresponses.go b/vendor/github.com/Shopify/sarama/mockresponses.go
index 7dcc93e..984e5aa 100644
--- a/vendor/github.com/Shopify/sarama/mockresponses.go
+++ b/vendor/github.com/Shopify/sarama/mockresponses.go
@@ -731,29 +731,78 @@
func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*DescribeConfigsRequest)
- res := &DescribeConfigsResponse{}
+ res := &DescribeConfigsResponse{
+ Version: req.Version,
+ }
+
+ includeSynonyms := (req.Version > 0)
for _, r := range req.Resources {
var configEntries []*ConfigEntry
switch r.Type {
- case TopicResource:
+ case BrokerResource:
configEntries = append(configEntries,
- &ConfigEntry{Name: "max.message.bytes",
- Value: "1000000",
- ReadOnly: false,
- Default: true,
- Sensitive: false,
- }, &ConfigEntry{Name: "retention.ms",
- Value: "5000",
- ReadOnly: false,
- Default: false,
- Sensitive: false,
- }, &ConfigEntry{Name: "password",
- Value: "12345",
- ReadOnly: false,
- Default: false,
- Sensitive: true,
- })
+ &ConfigEntry{
+ Name: "min.insync.replicas",
+ Value: "2",
+ ReadOnly: false,
+ Default: false,
+ },
+ )
+ res.Resources = append(res.Resources, &ResourceResponse{
+ Name: r.Name,
+ Configs: configEntries,
+ })
+ case BrokerLoggerResource:
+ configEntries = append(configEntries,
+ &ConfigEntry{
+ Name: "kafka.controller.KafkaController",
+ Value: "DEBUG",
+ ReadOnly: false,
+ Default: false,
+ },
+ )
+ res.Resources = append(res.Resources, &ResourceResponse{
+ Name: r.Name,
+ Configs: configEntries,
+ })
+ case TopicResource:
+ maxMessageBytes := &ConfigEntry{Name: "max.message.bytes",
+ Value: "1000000",
+ ReadOnly: false,
+ Default: true,
+ Sensitive: false,
+ }
+ if includeSynonyms {
+ maxMessageBytes.Synonyms = []*ConfigSynonym{
+ {
+ ConfigName: "max.message.bytes",
+ ConfigValue: "500000",
+ },
+ }
+ }
+ retentionMs := &ConfigEntry{Name: "retention.ms",
+ Value: "5000",
+ ReadOnly: false,
+ Default: false,
+ Sensitive: false,
+ }
+ if includeSynonyms {
+ retentionMs.Synonyms = []*ConfigSynonym{
+ {
+ ConfigName: "log.retention.ms",
+ ConfigValue: "2500",
+ },
+ }
+ }
+ password := &ConfigEntry{Name: "password",
+ Value: "12345",
+ ReadOnly: false,
+ Default: false,
+ Sensitive: true,
+ }
+ configEntries = append(
+ configEntries, maxMessageBytes, retentionMs, password)
res.Resources = append(res.Resources, &ResourceResponse{
Name: r.Name,
Configs: configEntries,
@@ -777,7 +826,7 @@
for _, r := range req.Resources {
res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
- Type: TopicResource,
+ Type: r.Type,
ErrorMsg: "",
})
}
diff --git a/vendor/github.com/Shopify/sarama/offset_manager.go b/vendor/github.com/Shopify/sarama/offset_manager.go
index e40f429..1940872 100644
--- a/vendor/github.com/Shopify/sarama/offset_manager.go
+++ b/vendor/github.com/Shopify/sarama/offset_manager.go
@@ -280,7 +280,6 @@
ConsumerID: om.memberID,
ConsumerGroupGeneration: om.generation,
}
-
}
om.pomsLock.RLock()
diff --git a/vendor/github.com/Shopify/sarama/produce_request.go b/vendor/github.com/Shopify/sarama/produce_request.go
index 0c755d0..178972a 100644
--- a/vendor/github.com/Shopify/sarama/produce_request.go
+++ b/vendor/github.com/Shopify/sarama/produce_request.go
@@ -214,6 +214,8 @@
return V0_10_0_0
case 3:
return V0_11_0_0
+ case 7:
+ return V2_1_0_0
default:
return MinVersion
}
diff --git a/vendor/github.com/Shopify/sarama/produce_response.go b/vendor/github.com/Shopify/sarama/produce_response.go
index 4c5cd35..e4f19a7 100644
--- a/vendor/github.com/Shopify/sarama/produce_response.go
+++ b/vendor/github.com/Shopify/sarama/produce_response.go
@@ -5,11 +5,27 @@
"time"
)
+// Protocol, http://kafka.apache.org/protocol.html
+// v1
+// v2 = v3 = v4
+// v5 = v6 = v7
+// Produce Response (Version: 7) => [responses] throttle_time_ms
+// responses => topic [partition_responses]
+// topic => STRING
+// partition_responses => partition error_code base_offset log_append_time log_start_offset
+// partition => INT32
+// error_code => INT16
+// base_offset => INT64
+// log_append_time => INT64
+// log_start_offset => INT64
+// throttle_time_ms => INT32
+
+// partition_responses in protocol
type ProduceResponseBlock struct {
- Err KError
- Offset int64
- // only provided if Version >= 2 and the broker is configured with `LogAppendTime`
- Timestamp time.Time
+ Err KError // v0, error_code
+ Offset int64 // v0, base_offset
+ Timestamp time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
+ StartOffset int64 // v5, log_start_offset
}
func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
@@ -32,6 +48,13 @@
}
}
+ if version >= 5 {
+ b.StartOffset, err = pd.getInt64()
+ if err != nil {
+ return err
+ }
+ }
+
return nil
}
@@ -49,13 +72,17 @@
pe.putInt64(timestamp)
}
+ if version >= 5 {
+ pe.putInt64(b.StartOffset)
+ }
+
return nil
}
type ProduceResponse struct {
- Blocks map[string]map[int32]*ProduceResponseBlock
+ Blocks map[string]map[int32]*ProduceResponseBlock // v0, responses
Version int16
- ThrottleTime time.Duration // only provided if Version >= 1
+ ThrottleTime time.Duration // v1, throttle_time_ms
}
func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -129,6 +156,7 @@
}
}
}
+
if r.Version >= 1 {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
}
@@ -143,19 +171,6 @@
return r.Version
}
-func (r *ProduceResponse) requiredVersion() KafkaVersion {
- switch r.Version {
- case 1:
- return V0_9_0_0
- case 2:
- return V0_10_0_0
- case 3:
- return V0_11_0_0
- default:
- return MinVersion
- }
-}
-
func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
if r.Blocks == nil {
return nil
diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index b684aa4..36c43c6 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -129,6 +129,10 @@
req.Version = 3
}
+ if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
+ req.Version = 7
+ }
+
for topic, partitionSets := range ps.msgs {
for partition, set := range partitionSets {
if req.Version >= 3 {
diff --git a/vendor/github.com/Shopify/sarama/request.go b/vendor/github.com/Shopify/sarama/request.go
index 97437d6..6e4ad87 100644
--- a/vendor/github.com/Shopify/sarama/request.go
+++ b/vendor/github.com/Shopify/sarama/request.go
@@ -105,7 +105,7 @@
case 0:
return &ProduceRequest{}
case 1:
- return &FetchRequest{}
+ return &FetchRequest{Version: version}
case 2:
return &OffsetRequest{Version: version}
case 3:
diff --git a/vendor/github.com/Shopify/sarama/sarama.go b/vendor/github.com/Shopify/sarama/sarama.go
index 1e0277a..48f362d 100644
--- a/vendor/github.com/Shopify/sarama/sarama.go
+++ b/vendor/github.com/Shopify/sarama/sarama.go
@@ -39,6 +39,10 @@
| response-rate-for-broker-<broker-id> | meter | Responses/second received from a given broker |
| response-size | histogram | Distribution of the response size in bytes for all brokers |
| response-size-for-broker-<broker-id> | histogram | Distribution of the response size in bytes for a given broker |
+ | requests-in-flight | counter | The current number of in-flight requests awaiting a response |
+ | | | for all brokers |
+ | requests-in-flight-for-broker-<broker-id> | counter | The current number of in-flight requests awaiting a response |
+ | | | for a given broker |
+----------------------------------------------+------------+---------------------------------------------------------------+
Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.
diff --git a/vendor/github.com/golang/protobuf/proto/lib.go b/vendor/github.com/golang/protobuf/proto/lib.go
index fdd328b..70fbda5 100644
--- a/vendor/github.com/golang/protobuf/proto/lib.go
+++ b/vendor/github.com/golang/protobuf/proto/lib.go
@@ -393,7 +393,7 @@
// than relying on this API.
//
// If deterministic serialization is requested, map entries will be sorted
-// by keys in lexographical order. This is an implementation detail and
+// by keys in lexicographical order. This is an implementation detail and
// subject to change.
func (p *Buffer) SetDeterministic(deterministic bool) {
p.deterministic = deterministic
diff --git a/vendor/github.com/golang/protobuf/proto/text.go b/vendor/github.com/golang/protobuf/proto/text.go
index 1aaee72..d97f9b3 100644
--- a/vendor/github.com/golang/protobuf/proto/text.go
+++ b/vendor/github.com/golang/protobuf/proto/text.go
@@ -456,6 +456,8 @@
return nil
}
+var textMarshalerType = reflect.TypeOf((*encoding.TextMarshaler)(nil)).Elem()
+
// writeAny writes an arbitrary field.
func (tm *TextMarshaler) writeAny(w *textWriter, v reflect.Value, props *Properties) error {
v = reflect.Indirect(v)
@@ -519,8 +521,8 @@
// mutating this value.
v = v.Addr()
}
- if etm, ok := v.Interface().(encoding.TextMarshaler); ok {
- text, err := etm.MarshalText()
+ if v.Type().Implements(textMarshalerType) {
+ text, err := v.Interface().(encoding.TextMarshaler).MarshalText()
if err != nil {
return err
}
diff --git a/vendor/github.com/golang/protobuf/ptypes/any/any.pb.go b/vendor/github.com/golang/protobuf/ptypes/any/any.pb.go
index 78ee523..7b0ad1a 100644
--- a/vendor/github.com/golang/protobuf/ptypes/any/any.pb.go
+++ b/vendor/github.com/golang/protobuf/ptypes/any/any.pb.go
@@ -102,7 +102,8 @@
//
type Any struct {
// A URL/resource name that uniquely identifies the type of the serialized
- // protocol buffer message. The last segment of the URL's path must represent
+ // protocol buffer message. This string must contain at least
+ // one "/" character. The last segment of the URL's path must represent
// the fully qualified name of the type (as in
// `path/google.protobuf.Duration`). The name should be in a canonical form
// (e.g., leading "." is not accepted).
@@ -181,7 +182,9 @@
proto.RegisterType((*Any)(nil), "google.protobuf.Any")
}
-func init() { proto.RegisterFile("google/protobuf/any.proto", fileDescriptor_b53526c13ae22eb4) }
+func init() {
+ proto.RegisterFile("google/protobuf/any.proto", fileDescriptor_b53526c13ae22eb4)
+}
var fileDescriptor_b53526c13ae22eb4 = []byte{
// 185 bytes of a gzipped FileDescriptorProto
diff --git a/vendor/github.com/golang/protobuf/ptypes/any/any.proto b/vendor/github.com/golang/protobuf/ptypes/any/any.proto
index 4932942..c9be854 100644
--- a/vendor/github.com/golang/protobuf/ptypes/any/any.proto
+++ b/vendor/github.com/golang/protobuf/ptypes/any/any.proto
@@ -121,7 +121,8 @@
//
message Any {
// A URL/resource name that uniquely identifies the type of the serialized
- // protocol buffer message. The last segment of the URL's path must represent
+ // protocol buffer message. This string must contain at least
+ // one "/" character. The last segment of the URL's path must represent
// the fully qualified name of the type (as in
// `path/google.protobuf.Duration`). The name should be in a canonical form
// (e.g., leading "." is not accepted).
diff --git a/vendor/github.com/golang/protobuf/ptypes/duration/duration.pb.go b/vendor/github.com/golang/protobuf/ptypes/duration/duration.pb.go
index 0d681ee..58b0786 100644
--- a/vendor/github.com/golang/protobuf/ptypes/duration/duration.pb.go
+++ b/vendor/github.com/golang/protobuf/ptypes/duration/duration.pb.go
@@ -41,7 +41,7 @@
// if (duration.seconds < 0 && duration.nanos > 0) {
// duration.seconds += 1;
// duration.nanos -= 1000000000;
-// } else if (durations.seconds > 0 && duration.nanos < 0) {
+// } else if (duration.seconds > 0 && duration.nanos < 0) {
// duration.seconds -= 1;
// duration.nanos += 1000000000;
// }
@@ -142,7 +142,9 @@
proto.RegisterType((*Duration)(nil), "google.protobuf.Duration")
}
-func init() { proto.RegisterFile("google/protobuf/duration.proto", fileDescriptor_23597b2ebd7ac6c5) }
+func init() {
+ proto.RegisterFile("google/protobuf/duration.proto", fileDescriptor_23597b2ebd7ac6c5)
+}
var fileDescriptor_23597b2ebd7ac6c5 = []byte{
// 190 bytes of a gzipped FileDescriptorProto
diff --git a/vendor/github.com/golang/protobuf/ptypes/duration/duration.proto b/vendor/github.com/golang/protobuf/ptypes/duration/duration.proto
index 975fce4..99cb102 100644
--- a/vendor/github.com/golang/protobuf/ptypes/duration/duration.proto
+++ b/vendor/github.com/golang/protobuf/ptypes/duration/duration.proto
@@ -61,7 +61,7 @@
// if (duration.seconds < 0 && duration.nanos > 0) {
// duration.seconds += 1;
// duration.nanos -= 1000000000;
-// } else if (durations.seconds > 0 && duration.nanos < 0) {
+// } else if (duration.seconds > 0 && duration.nanos < 0) {
// duration.seconds -= 1;
// duration.nanos += 1000000000;
// }
@@ -101,7 +101,6 @@
//
//
message Duration {
-
// Signed seconds of the span of time. Must be from -315,576,000,000
// to +315,576,000,000 inclusive. Note: these bounds are computed from:
// 60 sec/min * 60 min/hr * 24 hr/day * 365.25 days/year * 10000 years
diff --git a/vendor/github.com/golang/protobuf/ptypes/empty/empty.pb.go b/vendor/github.com/golang/protobuf/ptypes/empty/empty.pb.go
index b4eb03e..6bd9f67 100644
--- a/vendor/github.com/golang/protobuf/ptypes/empty/empty.pb.go
+++ b/vendor/github.com/golang/protobuf/ptypes/empty/empty.pb.go
@@ -66,7 +66,9 @@
proto.RegisterType((*Empty)(nil), "google.protobuf.Empty")
}
-func init() { proto.RegisterFile("google/protobuf/empty.proto", fileDescriptor_900544acb223d5b8) }
+func init() {
+ proto.RegisterFile("google/protobuf/empty.proto", fileDescriptor_900544acb223d5b8)
+}
var fileDescriptor_900544acb223d5b8 = []byte{
// 148 bytes of a gzipped FileDescriptorProto
diff --git a/vendor/github.com/golang/protobuf/ptypes/timestamp/timestamp.pb.go b/vendor/github.com/golang/protobuf/ptypes/timestamp/timestamp.pb.go
index 31cd846..7a3b1e4 100644
--- a/vendor/github.com/golang/protobuf/ptypes/timestamp/timestamp.pb.go
+++ b/vendor/github.com/golang/protobuf/ptypes/timestamp/timestamp.pb.go
@@ -20,17 +20,19 @@
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
-// A Timestamp represents a point in time independent of any time zone
-// or calendar, represented as seconds and fractions of seconds at
-// nanosecond resolution in UTC Epoch time. It is encoded using the
-// Proleptic Gregorian Calendar which extends the Gregorian calendar
-// backwards to year one. It is encoded assuming all minutes are 60
-// seconds long, i.e. leap seconds are "smeared" so that no leap second
-// table is needed for interpretation. Range is from
-// 0001-01-01T00:00:00Z to 9999-12-31T23:59:59.999999999Z.
-// By restricting to that range, we ensure that we can convert to
-// and from RFC 3339 date strings.
-// See [https://www.ietf.org/rfc/rfc3339.txt](https://www.ietf.org/rfc/rfc3339.txt).
+// A Timestamp represents a point in time independent of any time zone or local
+// calendar, encoded as a count of seconds and fractions of seconds at
+// nanosecond resolution. The count is relative to an epoch at UTC midnight on
+// January 1, 1970, in the proleptic Gregorian calendar which extends the
+// Gregorian calendar backwards to year one.
+//
+// All minutes are 60 seconds long. Leap seconds are "smeared" so that no leap
+// second table is needed for interpretation, using a [24-hour linear
+// smear](https://developers.google.com/time/smear).
+//
+// The range is from 0001-01-01T00:00:00Z to 9999-12-31T23:59:59.999999999Z. By
+// restricting to that range, we ensure that we can convert to and from [RFC
+// 3339](https://www.ietf.org/rfc/rfc3339.txt) date strings.
//
// # Examples
//
@@ -91,12 +93,14 @@
// 01:30 UTC on January 15, 2017.
//
// In JavaScript, one can convert a Date object to this format using the
-// standard [toISOString()](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString]
+// standard
+// [toISOString()](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString)
// method. In Python, a standard `datetime.datetime` object can be converted
-// to this format using [`strftime`](https://docs.python.org/2/library/time.html#time.strftime)
-// with the time format spec '%Y-%m-%dT%H:%M:%S.%fZ'. Likewise, in Java, one
-// can use the Joda Time's [`ISODateTimeFormat.dateTime()`](
-// http://www.joda.org/joda-time/apidocs/org/joda/time/format/ISODateTimeFormat.html#dateTime--
+// to this format using
+// [`strftime`](https://docs.python.org/2/library/time.html#time.strftime) with
+// the time format spec '%Y-%m-%dT%H:%M:%S.%fZ'. Likewise, in Java, one can use
+// the Joda Time's [`ISODateTimeFormat.dateTime()`](
+// http://www.joda.org/joda-time/apidocs/org/joda/time/format/ISODateTimeFormat.html#dateTime%2D%2D
// ) to obtain a formatter capable of generating timestamps in this format.
//
//
@@ -160,7 +164,9 @@
proto.RegisterType((*Timestamp)(nil), "google.protobuf.Timestamp")
}
-func init() { proto.RegisterFile("google/protobuf/timestamp.proto", fileDescriptor_292007bbfe81227e) }
+func init() {
+ proto.RegisterFile("google/protobuf/timestamp.proto", fileDescriptor_292007bbfe81227e)
+}
var fileDescriptor_292007bbfe81227e = []byte{
// 191 bytes of a gzipped FileDescriptorProto
diff --git a/vendor/github.com/golang/protobuf/ptypes/timestamp/timestamp.proto b/vendor/github.com/golang/protobuf/ptypes/timestamp/timestamp.proto
index eafb3fa..cd35786 100644
--- a/vendor/github.com/golang/protobuf/ptypes/timestamp/timestamp.proto
+++ b/vendor/github.com/golang/protobuf/ptypes/timestamp/timestamp.proto
@@ -40,17 +40,19 @@
option java_multiple_files = true;
option objc_class_prefix = "GPB";
-// A Timestamp represents a point in time independent of any time zone
-// or calendar, represented as seconds and fractions of seconds at
-// nanosecond resolution in UTC Epoch time. It is encoded using the
-// Proleptic Gregorian Calendar which extends the Gregorian calendar
-// backwards to year one. It is encoded assuming all minutes are 60
-// seconds long, i.e. leap seconds are "smeared" so that no leap second
-// table is needed for interpretation. Range is from
-// 0001-01-01T00:00:00Z to 9999-12-31T23:59:59.999999999Z.
-// By restricting to that range, we ensure that we can convert to
-// and from RFC 3339 date strings.
-// See [https://www.ietf.org/rfc/rfc3339.txt](https://www.ietf.org/rfc/rfc3339.txt).
+// A Timestamp represents a point in time independent of any time zone or local
+// calendar, encoded as a count of seconds and fractions of seconds at
+// nanosecond resolution. The count is relative to an epoch at UTC midnight on
+// January 1, 1970, in the proleptic Gregorian calendar which extends the
+// Gregorian calendar backwards to year one.
+//
+// All minutes are 60 seconds long. Leap seconds are "smeared" so that no leap
+// second table is needed for interpretation, using a [24-hour linear
+// smear](https://developers.google.com/time/smear).
+//
+// The range is from 0001-01-01T00:00:00Z to 9999-12-31T23:59:59.999999999Z. By
+// restricting to that range, we ensure that we can convert to and from [RFC
+// 3339](https://www.ietf.org/rfc/rfc3339.txt) date strings.
//
// # Examples
//
@@ -111,17 +113,18 @@
// 01:30 UTC on January 15, 2017.
//
// In JavaScript, one can convert a Date object to this format using the
-// standard [toISOString()](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString]
+// standard
+// [toISOString()](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString)
// method. In Python, a standard `datetime.datetime` object can be converted
-// to this format using [`strftime`](https://docs.python.org/2/library/time.html#time.strftime)
-// with the time format spec '%Y-%m-%dT%H:%M:%S.%fZ'. Likewise, in Java, one
-// can use the Joda Time's [`ISODateTimeFormat.dateTime()`](
-// http://www.joda.org/joda-time/apidocs/org/joda/time/format/ISODateTimeFormat.html#dateTime--
+// to this format using
+// [`strftime`](https://docs.python.org/2/library/time.html#time.strftime) with
+// the time format spec '%Y-%m-%dT%H:%M:%S.%fZ'. Likewise, in Java, one can use
+// the Joda Time's [`ISODateTimeFormat.dateTime()`](
+// http://www.joda.org/joda-time/apidocs/org/joda/time/format/ISODateTimeFormat.html#dateTime%2D%2D
// ) to obtain a formatter capable of generating timestamps in this format.
//
//
message Timestamp {
-
// Represents seconds of UTC time since Unix epoch
// 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to
// 9999-12-31T23:59:59Z inclusive.
diff --git a/vendor/github.com/hashicorp/go-uuid/uuid.go b/vendor/github.com/hashicorp/go-uuid/uuid.go
index 911227f..0c10c4e 100644
--- a/vendor/github.com/hashicorp/go-uuid/uuid.go
+++ b/vendor/github.com/hashicorp/go-uuid/uuid.go
@@ -4,22 +4,40 @@
"crypto/rand"
"encoding/hex"
"fmt"
+ "io"
)
// GenerateRandomBytes is used to generate random bytes of given size.
func GenerateRandomBytes(size int) ([]byte, error) {
+ return GenerateRandomBytesWithReader(size, rand.Reader)
+}
+
+// GenerateRandomBytesWithReader is used to generate random bytes of given size read from a given reader.
+func GenerateRandomBytesWithReader(size int, reader io.Reader) ([]byte, error) {
+ if reader == nil {
+ return nil, fmt.Errorf("provided reader is nil")
+ }
buf := make([]byte, size)
- if _, err := rand.Read(buf); err != nil {
+ if _, err := io.ReadFull(reader, buf); err != nil {
return nil, fmt.Errorf("failed to read random bytes: %v", err)
}
return buf, nil
}
+
const uuidLen = 16
// GenerateUUID is used to generate a random UUID
func GenerateUUID() (string, error) {
- buf, err := GenerateRandomBytes(uuidLen)
+ return GenerateUUIDWithReader(rand.Reader)
+}
+
+// GenerateUUIDWithReader is used to generate a random UUID with a given Reader
+func GenerateUUIDWithReader(reader io.Reader) (string, error) {
+ if reader == nil {
+ return "", fmt.Errorf("provided reader is nil")
+ }
+ buf, err := GenerateRandomBytesWithReader(uuidLen, reader)
if err != nil {
return "", err
}
diff --git a/vendor/github.com/pierrec/lz4/README.md b/vendor/github.com/pierrec/lz4/README.md
index 00899fb..4ee388e 100644
--- a/vendor/github.com/pierrec/lz4/README.md
+++ b/vendor/github.com/pierrec/lz4/README.md
@@ -83,23 +83,8 @@
## Contributors
-Thanks to all contributors so far:
+Thanks to all [contributors](https://github.com/pierrec/lz4/graphs/contributors) so far!
-- [@klauspost](https://github.com/klauspost)
-- [@heidawei](https://github.com/heidawei)
-- [@x4m](https://github.com/x4m)
-- [@Zariel](https://github.com/Zariel)
-- [@edwingeng](https://github.com/edwingeng)
-- [@danielmoy-google](https://github.com/danielmoy-google)
-- [@honda-tatsuya](https://github.com/honda-tatsuya)
-- [@h8liu](https://github.com/h8liu)
-- [@sbinet](https://github.com/sbinet)
-- [@fingon](https://github.com/fingon)
-- [@emfree](https://github.com/emfree)
-- [@lhemala](https://github.com/lhemala)
-- [@connor4312](https://github.com/connor4312)
-- [@oov](https://github.com/oov)
-- [@arya](https://github.com/arya)
-- [@ikkeps](https://github.com/ikkeps)
+Special thanks to [@Zariel](https://github.com/Zariel) for his asm implementation of the decoder.
-Special thanks to [@Zariel](https://github.com/Zariel) for his asm implementation of the decoder
+Special thanks to [@klauspost](https://github.com/klauspost) for his work on optimizing the code.
diff --git a/vendor/github.com/pierrec/lz4/block.go b/vendor/github.com/pierrec/lz4/block.go
index 5755cda..b589af4 100644
--- a/vendor/github.com/pierrec/lz4/block.go
+++ b/vendor/github.com/pierrec/lz4/block.go
@@ -40,7 +40,10 @@
// The size of the compressed data is returned. If it is 0 and no error, then the data is incompressible.
//
// An error is returned if the destination buffer is too small.
-func CompressBlock(src, dst []byte, hashTable []int) (di int, err error) {
+func CompressBlock(src, dst []byte, hashTable []int) (_ int, err error) {
+ if len(hashTable) < htSize {
+ return 0, fmt.Errorf("hash table too small, should be at least %d in size", htSize)
+ }
defer recoverBlock(&err)
// adaptSkipLog sets how quickly the compressor begins skipping blocks when data is incompressible.
@@ -51,16 +54,13 @@
if sn <= 0 || dn == 0 {
return 0, nil
}
- if len(hashTable) < htSize {
- return 0, fmt.Errorf("hash table too small, should be at least %d in size", htSize)
- }
// Prove to the compiler the table has at least htSize elements.
// The compiler can see that "uint32() >> hashShift" cannot be out of bounds.
hashTable = hashTable[:htSize]
// si: Current position of the search.
// anchor: Position of the current literals.
- var si, anchor int
+ var si, di, anchor int
// Fast scan strategy: the hash table only stores the last 4 bytes sequences.
for si < sn {
@@ -124,7 +124,7 @@
si, mLen = si+mLen, si+minMatch
// Find the longest match by looking by batches of 8 bytes.
- for si < sn {
+ for si+8 < sn {
x := binary.LittleEndian.Uint64(src[si:]) ^ binary.LittleEndian.Uint64(src[si-offset:])
if x == 0 {
si += 8
@@ -227,7 +227,7 @@
// The size of the compressed data is returned. If it is 0 and no error, then the data is not compressible.
//
// An error is returned if the destination buffer is too small.
-func CompressBlockHC(src, dst []byte, depth int) (di int, err error) {
+func CompressBlockHC(src, dst []byte, depth int) (_ int, err error) {
defer recoverBlock(&err)
// adaptSkipLog sets how quickly the compressor begins skipping blocks when data is incompressible.
@@ -239,7 +239,7 @@
if sn <= 0 || dn == 0 {
return 0, nil
}
- var si int
+ var si, di int
// hashTable: stores the last position found for a given hash
// chainTable: stores previous positions for a given hash
diff --git a/vendor/github.com/pierrec/lz4/decode_other.go b/vendor/github.com/pierrec/lz4/decode_other.go
index 002519f..919888e 100644
--- a/vendor/github.com/pierrec/lz4/decode_other.go
+++ b/vendor/github.com/pierrec/lz4/decode_other.go
@@ -19,7 +19,7 @@
// Literals.
if lLen := b >> 4; lLen > 0 {
switch {
- case lLen < 0xF && di+18 < len(dst) && si+16 < len(src):
+ case lLen < 0xF && si+16 < len(src):
// Shortcut 1
// if we have enough room in src and dst, and the literals length
// is small enough (0..14) then copy all 16 bytes, even if not all
@@ -34,7 +34,13 @@
mLen += 4
if offset := int(src[si]) | int(src[si+1])<<8; mLen <= offset {
i := di - offset
- copy(dst[di:], dst[i:i+18])
+ end := i + 18
+ if end > len(dst) {
+ // The remaining buffer may not hold 18 bytes.
+ // See https://github.com/pierrec/lz4/issues/51.
+ end = len(dst)
+ }
+ copy(dst[di:], dst[i:end])
si += 2
di += mLen
continue
diff --git a/vendor/github.com/pierrec/lz4/errors.go b/vendor/github.com/pierrec/lz4/errors.go
index 3e27894..1c45d18 100644
--- a/vendor/github.com/pierrec/lz4/errors.go
+++ b/vendor/github.com/pierrec/lz4/errors.go
@@ -15,6 +15,8 @@
ErrInvalid = errors.New("lz4: bad magic number")
// ErrBlockDependency is returned when attempting to decompress an archive created with block dependency.
ErrBlockDependency = errors.New("lz4: block dependency not supported")
+ // ErrUnsupportedSeek is returned when attempting to Seek any way but forward from the current position.
+ ErrUnsupportedSeek = errors.New("lz4: can only seek forward from io.SeekCurrent")
)
func recoverBlock(e *error) {
diff --git a/vendor/github.com/pierrec/lz4/lz4.go b/vendor/github.com/pierrec/lz4/lz4.go
index cdbf961..29864d8 100644
--- a/vendor/github.com/pierrec/lz4/lz4.go
+++ b/vendor/github.com/pierrec/lz4/lz4.go
@@ -10,6 +10,10 @@
//
package lz4
+import "math/bits"
+
+import "sync"
+
const (
// Extension is the LZ4 frame file name extension
Extension = ".lz4"
@@ -34,22 +38,61 @@
hashLog = 16
htSize = 1 << hashLog
- mfLimit = 8 + minMatch // The last match cannot start within the last 12 bytes.
+ mfLimit = 10 + minMatch // The last match cannot start within the last 14 bytes.
)
// map the block max size id with its value in bytes: 64Kb, 256Kb, 1Mb and 4Mb.
const (
- blockSize64K = 64 << 10
- blockSize256K = 256 << 10
- blockSize1M = 1 << 20
- blockSize4M = 4 << 20
+ blockSize64K = 1 << (16 + 2*iota)
+ blockSize256K
+ blockSize1M
+ blockSize4M
)
var (
- bsMapID = map[byte]int{4: blockSize64K, 5: blockSize256K, 6: blockSize1M, 7: blockSize4M}
- bsMapValue = map[int]byte{blockSize64K: 4, blockSize256K: 5, blockSize1M: 6, blockSize4M: 7}
+ // Keep a pool of buffers for each valid block sizes.
+ bsMapValue = [...]*sync.Pool{
+ newBufferPool(2 * blockSize64K),
+ newBufferPool(2 * blockSize256K),
+ newBufferPool(2 * blockSize1M),
+ newBufferPool(2 * blockSize4M),
+ }
)
+// newBufferPool returns a pool for buffers of the given size.
+func newBufferPool(size int) *sync.Pool {
+ return &sync.Pool{
+ New: func() interface{} {
+ return make([]byte, size)
+ },
+ }
+}
+
+// getBuffer returns a buffer to its pool.
+func getBuffer(size int) []byte {
+ idx := blockSizeValueToIndex(size) - 4
+ return bsMapValue[idx].Get().([]byte)
+}
+
+// putBuffer returns a buffer to its pool.
+func putBuffer(size int, buf []byte) {
+ if cap(buf) > 0 {
+ idx := blockSizeValueToIndex(size) - 4
+ bsMapValue[idx].Put(buf[:cap(buf)])
+ }
+}
+func blockSizeIndexToValue(i byte) int {
+ return 1 << (16 + 2*uint(i))
+}
+func isValidBlockSize(size int) bool {
+ const blockSizeMask = blockSize64K | blockSize256K | blockSize1M | blockSize4M
+
+ return size&blockSizeMask > 0 && bits.OnesCount(uint(size)) == 1
+}
+func blockSizeValueToIndex(size int) byte {
+ return 4 + byte(bits.TrailingZeros(uint(size)>>16)/2)
+}
+
// Header describes the various flags that can be set on a Writer or obtained from a Reader.
// The default values match those of the LZ4 frame format definition
// (http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html).
@@ -64,3 +107,7 @@
CompressionLevel int // Compression level (higher is better, use 0 for fastest compression).
done bool // Header processed flag (Read or Write and checked).
}
+
+func (h *Header) Reset() {
+ h.done = false
+}
diff --git a/vendor/github.com/pierrec/lz4/reader.go b/vendor/github.com/pierrec/lz4/reader.go
index 90e8efe..87dd72b 100644
--- a/vendor/github.com/pierrec/lz4/reader.go
+++ b/vendor/github.com/pierrec/lz4/reader.go
@@ -25,6 +25,8 @@
data []byte // Uncompressed data.
idx int // Index of unread bytes into data.
checksum xxh32.XXHZero // Frame hash.
+ skip int64 // Bytes to skip before next read.
+ dpos int64 // Position in dest
}
// NewReader returns a new LZ4 frame decoder.
@@ -86,10 +88,10 @@
z.NoChecksum = b>>2&1 == 0
bmsID := buf[1] >> 4 & 0x7
- bSize, ok := bsMapID[bmsID]
- if !ok {
+ if bmsID < 4 || bmsID > 7 {
return fmt.Errorf("lz4: invalid block max size ID: %d", bmsID)
}
+ bSize := blockSizeIndexToValue(bmsID - 4)
z.BlockMaxSize = bSize
// Allocate the compressed/uncompressed buffers.
@@ -275,8 +277,20 @@
z.idx = 0
}
+ if z.skip > int64(len(z.data[z.idx:])) {
+ z.skip -= int64(len(z.data[z.idx:]))
+ z.dpos += int64(len(z.data[z.idx:]))
+ z.idx = len(z.data)
+ return 0, nil
+ }
+
+ z.idx += int(z.skip)
+ z.dpos += z.skip
+ z.skip = 0
+
n := copy(buf, z.data[z.idx:])
z.idx += n
+ z.dpos += int64(n)
if debugFlag {
debug("copied %d bytes to input", n)
}
@@ -284,6 +298,20 @@
return n, nil
}
+// Seek implements io.Seeker, but supports seeking forward from the current
+// position only. Any other seek will return an error. Allows skipping output
+// bytes which aren't needed, which in some scenarios is faster than reading
+// and discarding them.
+// Note this may cause future calls to Read() to read 0 bytes if all of the
+// data they would have returned is skipped.
+func (z *Reader) Seek(offset int64, whence int) (int64, error) {
+ if offset < 0 || whence != io.SeekCurrent {
+ return z.dpos + z.skip, ErrUnsupportedSeek
+ }
+ z.skip += offset
+ return z.dpos + z.skip, nil
+}
+
// Reset discards the Reader's state and makes it equivalent to the
// result of its original state from NewReader, but reading from r instead.
// This permits reusing a Reader rather than allocating a new one.
diff --git a/vendor/github.com/pierrec/lz4/writer.go b/vendor/github.com/pierrec/lz4/writer.go
index 804a68c..324f138 100644
--- a/vendor/github.com/pierrec/lz4/writer.go
+++ b/vendor/github.com/pierrec/lz4/writer.go
@@ -3,11 +3,18 @@
import (
"encoding/binary"
"fmt"
- "io"
-
"github.com/pierrec/lz4/internal/xxh32"
+ "io"
+ "runtime"
)
+// zResult contains the results of compressing a block.
+type zResult struct {
+ size uint32 // Block header
+ data []byte // Compressed data
+ checksum uint32 // Data checksum
+}
+
// Writer implements the LZ4 frame encoder.
type Writer struct {
Header
@@ -18,10 +25,13 @@
buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
dst io.Writer // Destination.
checksum xxh32.XXHZero // Frame checksum.
- zdata []byte // Compressed data.
- data []byte // Data to be compressed.
+ data []byte // Data to be compressed + buffer for compressed data.
idx int // Index into data.
hashtable [winSize]int // Hash table used in CompressBlock().
+
+ // For concurrency.
+ c chan chan zResult // Channel for block compression goroutines and writer goroutine.
+ err error // Any error encountered while writing to the underlying destination.
}
// NewWriter returns a new LZ4 frame encoder.
@@ -29,28 +39,92 @@
// The supplied Header is checked at the first Write.
// It is ok to change it before the first Write but then not until a Reset() is performed.
func NewWriter(dst io.Writer) *Writer {
- return &Writer{dst: dst}
+ z := new(Writer)
+ z.Reset(dst)
+ return z
+}
+
+// WithConcurrency sets the number of concurrent go routines used for compression.
+// A negative value sets the concurrency to GOMAXPROCS.
+func (z *Writer) WithConcurrency(n int) *Writer {
+ switch {
+ case n == 0 || n == 1:
+ z.c = nil
+ return z
+ case n < 0:
+ n = runtime.GOMAXPROCS(0)
+ }
+ z.c = make(chan chan zResult, n)
+ // Writer goroutine managing concurrent block compression goroutines.
+ go func() {
+ // Process next block compression item.
+ for c := range z.c {
+ // Read the next compressed block result.
+ // Waiting here ensures that the blocks are output in the order they were sent.
+ // The incoming channel is always closed as it indicates to the caller that
+ // the block has been processed.
+ res := <-c
+ n := len(res.data)
+ if n == 0 {
+ // Notify the block compression routine that we are done with its result.
+ // This is used when a sentinel block is sent to terminate the compression.
+ close(c)
+ return
+ }
+ // Write the block.
+ if err := z.writeUint32(res.size); err != nil && z.err == nil {
+ z.err = err
+ }
+ if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
+ z.err = err
+ }
+ if z.BlockChecksum {
+ if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
+ z.err = err
+ }
+ }
+ if isCompressed := res.size&compressedBlockFlag == 0; isCompressed {
+ // It is now safe to release the buffer as no longer in use by any goroutine.
+ putBuffer(cap(res.data), res.data)
+ }
+ if h := z.OnBlockDone; h != nil {
+ h(n)
+ }
+ close(c)
+ }
+ }()
+ return z
+}
+
+// newBuffers instantiates new buffers which size matches the one in Header.
+// The returned buffers are for decompression and compression respectively.
+func (z *Writer) newBuffers() {
+ bSize := z.Header.BlockMaxSize
+ buf := getBuffer(bSize)
+ z.data = buf[:bSize] // Uncompressed buffer is the first half.
+}
+
+// freeBuffers puts the writer's buffers back to the pool.
+func (z *Writer) freeBuffers() {
+ // Put the buffer back into the pool, if any.
+ putBuffer(z.Header.BlockMaxSize, z.data)
+ z.data = nil
}
// writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
func (z *Writer) writeHeader() error {
// Default to 4Mb if BlockMaxSize is not set.
if z.Header.BlockMaxSize == 0 {
- z.Header.BlockMaxSize = bsMapID[7]
+ z.Header.BlockMaxSize = blockSize4M
}
// The only option that needs to be validated.
bSize := z.Header.BlockMaxSize
- bSizeID, ok := bsMapValue[bSize]
- if !ok {
+ if !isValidBlockSize(z.Header.BlockMaxSize) {
return fmt.Errorf("lz4: invalid block max size: %d", bSize)
}
// Allocate the compressed/uncompressed buffers.
// The compressed buffer cannot exceed the uncompressed one.
- if n := 2 * bSize; cap(z.zdata) < n {
- z.zdata = make([]byte, n, n)
- }
- z.data = z.zdata[:bSize]
- z.zdata = z.zdata[:cap(z.zdata)][bSize:]
+ z.newBuffers()
z.idx = 0
// Size is optional.
@@ -70,7 +144,7 @@
flg |= 1 << 2
}
buf[4] = flg
- buf[5] = bSizeID << 4
+ buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4
// Current buffer size: magic(4) + flags(1) + block max size (1).
n := 6
@@ -150,28 +224,34 @@
// compressBlock compresses a block.
func (z *Writer) compressBlock(data []byte) error {
if !z.NoChecksum {
- z.checksum.Write(data)
+ _, _ = z.checksum.Write(data)
}
+ if z.c != nil {
+ c := make(chan zResult)
+ z.c <- c // Send now to guarantee order
+ go writerCompressBlock(c, z.Header, data)
+ return nil
+ }
+
+ zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
// The compressed block size cannot exceed the input's.
var zn int
- var err error
if level := z.Header.CompressionLevel; level != 0 {
- zn, err = CompressBlockHC(data, z.zdata, level)
+ zn, _ = CompressBlockHC(data, zdata, level)
} else {
- zn, err = CompressBlock(data, z.zdata, z.hashtable[:])
+ zn, _ = CompressBlock(data, zdata, z.hashtable[:])
}
- var zdata []byte
var bLen uint32
if debugFlag {
debug("block compression %d => %d", len(data), zn)
}
- if err == nil && zn > 0 && zn < len(data) {
+ if zn > 0 && zn < len(data) {
// Compressible and compressed size smaller than uncompressed: ok!
bLen = uint32(zn)
- zdata = z.zdata[:zn]
+ zdata = zdata[:zn]
} else {
// Uncompressed block.
bLen = uint32(len(data)) | compressedBlockFlag
@@ -218,13 +298,35 @@
return nil
}
- if err := z.compressBlock(z.data[:z.idx]); err != nil {
- return err
- }
+ data := z.data[:z.idx]
z.idx = 0
+ if z.c == nil {
+ return z.compressBlock(data)
+ }
+ if !z.NoChecksum {
+ _, _ = z.checksum.Write(data)
+ }
+ c := make(chan zResult)
+ z.c <- c
+ writerCompressBlock(c, z.Header, data)
return nil
}
+func (z *Writer) close() error {
+ if z.c == nil {
+ return nil
+ }
+ // Send a sentinel block (no data to compress) to terminate the writer main goroutine.
+ c := make(chan zResult)
+ z.c <- c
+ c <- zResult{}
+ // Wait for the main goroutine to complete.
+ <-c
+ // At this point the main goroutine has shut down or is about to return.
+ z.c = nil
+ return z.err
+}
+
// Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
func (z *Writer) Close() error {
if !z.Header.done {
@@ -235,6 +337,10 @@
if err := z.Flush(); err != nil {
return err
}
+ if err := z.close(); err != nil {
+ return err
+ }
+ z.freeBuffers()
if debugFlag {
debug("writing last empty block")
@@ -256,12 +362,15 @@
// initial state from NewWriter, but instead writing to w.
// No access to the underlying io.Writer is performed.
func (z *Writer) Reset(w io.Writer) {
- z.Header = Header{}
+ n := cap(z.c)
+ _ = z.close()
+ z.freeBuffers()
+ z.Header.Reset()
z.dst = w
z.checksum.Reset()
- z.zdata = z.zdata[:0]
- z.data = z.data[:0]
z.idx = 0
+ z.err = nil
+ z.WithConcurrency(n)
}
// writeUint32 writes a uint32 to the underlying writer.
@@ -271,3 +380,29 @@
_, err := z.dst.Write(buf)
return err
}
+
+// writerCompressBlock compresses data into a pooled buffer and writes its result
+// out to the input channel.
+func writerCompressBlock(c chan zResult, header Header, data []byte) {
+ zdata := getBuffer(header.BlockMaxSize)
+ // The compressed block size cannot exceed the input's.
+ var zn int
+ if level := header.CompressionLevel; level != 0 {
+ zn, _ = CompressBlockHC(data, zdata, level)
+ } else {
+ var hashTable [winSize]int
+ zn, _ = CompressBlock(data, zdata, hashTable[:])
+ }
+ var res zResult
+ if zn > 0 && zn < len(data) {
+ res.size = uint32(zn)
+ res.data = zdata[:zn]
+ } else {
+ res.size = uint32(len(data)) | compressedBlockFlag
+ res.data = data
+ }
+ if header.BlockChecksum {
+ res.checksum = xxh32.ChecksumZero(res.data)
+ }
+ c <- res
+}
diff --git a/vendor/github.com/rcrowley/go-metrics/.travis.yml b/vendor/github.com/rcrowley/go-metrics/.travis.yml
index 117763e..aead076 100644
--- a/vendor/github.com/rcrowley/go-metrics/.travis.yml
+++ b/vendor/github.com/rcrowley/go-metrics/.travis.yml
@@ -1,13 +1,16 @@
language: go
go:
- - 1.3
- - 1.4
- - 1.5
- - 1.6
- - 1.7
- - 1.8
- - 1.9
+ - "1.3"
+ - "1.4"
+ - "1.5"
+ - "1.6"
+ - "1.7"
+ - "1.8"
+ - "1.9"
+ - "1.10"
+ - "1.11"
+ - "1.12"
script:
- ./validate.sh
diff --git a/vendor/github.com/rcrowley/go-metrics/README.md b/vendor/github.com/rcrowley/go-metrics/README.md
index b7356b5..27ddfee 100644
--- a/vendor/github.com/rcrowley/go-metrics/README.md
+++ b/vendor/github.com/rcrowley/go-metrics/README.md
@@ -157,6 +157,7 @@
Clients are available for the following destinations:
+* AppOptics - https://github.com/ysamlan/go-metrics-appoptics
* Librato - https://github.com/mihasya/go-metrics-librato
* Graphite - https://github.com/cyberdelia/go-metrics-graphite
* InfluxDB - https://github.com/vrischmann/go-metrics-influxdb
@@ -166,3 +167,5 @@
* SignalFX - https://github.com/pascallouisperez/go-metrics-signalfx
* Honeycomb - https://github.com/getspine/go-metrics-honeycomb
* Wavefront - https://github.com/wavefrontHQ/go-metrics-wavefront
+* Open-Falcon - https://github.com/g4zhuj/go-metrics-falcon
+* AWS CloudWatch - [https://github.com/savaki/cloudmetrics](https://github.com/savaki/cloudmetrics)
diff --git a/vendor/github.com/rcrowley/go-metrics/debug.go b/vendor/github.com/rcrowley/go-metrics/debug.go
index 043ccef..179e5aa 100644
--- a/vendor/github.com/rcrowley/go-metrics/debug.go
+++ b/vendor/github.com/rcrowley/go-metrics/debug.go
@@ -2,6 +2,7 @@
import (
"runtime/debug"
+ "sync"
"time"
)
@@ -16,7 +17,8 @@
}
ReadGCStats Timer
}
- gcStats debug.GCStats
+ gcStats debug.GCStats
+ registerDebugMetricsOnce = sync.Once{}
)
// Capture new values for the Go garbage collector statistics exported in
@@ -54,19 +56,21 @@
// debug.GCStats. The metrics are named by their fully-qualified Go symbols,
// i.e. debug.GCStats.PauseTotal.
func RegisterDebugGCStats(r Registry) {
- debugMetrics.GCStats.LastGC = NewGauge()
- debugMetrics.GCStats.NumGC = NewGauge()
- debugMetrics.GCStats.Pause = NewHistogram(NewExpDecaySample(1028, 0.015))
- //debugMetrics.GCStats.PauseQuantiles = NewHistogram(NewExpDecaySample(1028, 0.015))
- debugMetrics.GCStats.PauseTotal = NewGauge()
- debugMetrics.ReadGCStats = NewTimer()
+ registerDebugMetricsOnce.Do(func() {
+ debugMetrics.GCStats.LastGC = NewGauge()
+ debugMetrics.GCStats.NumGC = NewGauge()
+ debugMetrics.GCStats.Pause = NewHistogram(NewExpDecaySample(1028, 0.015))
+ //debugMetrics.GCStats.PauseQuantiles = NewHistogram(NewExpDecaySample(1028, 0.015))
+ debugMetrics.GCStats.PauseTotal = NewGauge()
+ debugMetrics.ReadGCStats = NewTimer()
- r.Register("debug.GCStats.LastGC", debugMetrics.GCStats.LastGC)
- r.Register("debug.GCStats.NumGC", debugMetrics.GCStats.NumGC)
- r.Register("debug.GCStats.Pause", debugMetrics.GCStats.Pause)
- //r.Register("debug.GCStats.PauseQuantiles", debugMetrics.GCStats.PauseQuantiles)
- r.Register("debug.GCStats.PauseTotal", debugMetrics.GCStats.PauseTotal)
- r.Register("debug.ReadGCStats", debugMetrics.ReadGCStats)
+ r.Register("debug.GCStats.LastGC", debugMetrics.GCStats.LastGC)
+ r.Register("debug.GCStats.NumGC", debugMetrics.GCStats.NumGC)
+ r.Register("debug.GCStats.Pause", debugMetrics.GCStats.Pause)
+ //r.Register("debug.GCStats.PauseQuantiles", debugMetrics.GCStats.PauseQuantiles)
+ r.Register("debug.GCStats.PauseTotal", debugMetrics.GCStats.PauseTotal)
+ r.Register("debug.ReadGCStats", debugMetrics.ReadGCStats)
+ })
}
// Allocate an initial slice for gcStats.Pause to avoid allocations during
diff --git a/vendor/github.com/rcrowley/go-metrics/log.go b/vendor/github.com/rcrowley/go-metrics/log.go
index f8074c0..2614a0a 100644
--- a/vendor/github.com/rcrowley/go-metrics/log.go
+++ b/vendor/github.com/rcrowley/go-metrics/log.go
@@ -8,17 +8,37 @@
Printf(format string, v ...interface{})
}
+// Log outputs each metric in the given registry periodically using the given logger.
func Log(r Registry, freq time.Duration, l Logger) {
LogScaled(r, freq, time.Nanosecond, l)
}
-// Output each metric in the given registry periodically using the given
+// LogOnCue outputs each metric in the given registry on demand through the channel
+// using the given logger
+func LogOnCue(r Registry, ch chan interface{}, l Logger) {
+ LogScaledOnCue(r, ch, time.Nanosecond, l)
+}
+
+// LogScaled outputs each metric in the given registry periodically using the given
// logger. Print timings in `scale` units (eg time.Millisecond) rather than nanos.
func LogScaled(r Registry, freq time.Duration, scale time.Duration, l Logger) {
+ ch := make(chan interface{})
+ go func(channel chan interface{}) {
+ for _ = range time.Tick(freq) {
+ channel <- struct{}{}
+ }
+ }(ch)
+ LogScaledOnCue(r, ch, scale, l)
+}
+
+// LogScaledOnCue outputs each metric in the given registry on demand through the channel
+// using the given logger. Print timings in `scale` units (eg time.Millisecond) rather
+// than nanos.
+func LogScaledOnCue(r Registry, ch chan interface{}, scale time.Duration, l Logger) {
du := float64(scale)
duSuffix := scale.String()[1:]
- for _ = range time.Tick(freq) {
+ for _ = range ch {
r.Each(func(name string, i interface{}) {
switch metric := i.(type) {
case Counter:
diff --git a/vendor/github.com/rcrowley/go-metrics/registry.go b/vendor/github.com/rcrowley/go-metrics/registry.go
index b3bab64..a8e6722 100644
--- a/vendor/github.com/rcrowley/go-metrics/registry.go
+++ b/vendor/github.com/rcrowley/go-metrics/registry.go
@@ -64,8 +64,10 @@
// Call the given function for each registered metric.
func (r *StandardRegistry) Each(f func(string, interface{})) {
- for name, i := range r.registered() {
- f(name, i)
+ metrics := r.registered()
+ for i := range metrics {
+ kv := &metrics[i]
+ f(kv.name, kv.value)
}
}
@@ -211,12 +213,20 @@
return nil
}
-func (r *StandardRegistry) registered() map[string]interface{} {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- metrics := make(map[string]interface{}, len(r.metrics))
+type metricKV struct {
+ name string
+ value interface{}
+}
+
+func (r *StandardRegistry) registered() []metricKV {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+ metrics := make([]metricKV, 0, len(r.metrics))
for name, i := range r.metrics {
- metrics[name] = i
+ metrics = append(metrics, metricKV{
+ name: name,
+ value: i,
+ })
}
return metrics
}
diff --git a/vendor/github.com/rcrowley/go-metrics/runtime.go b/vendor/github.com/rcrowley/go-metrics/runtime.go
index 11c6b78..4047ab3 100644
--- a/vendor/github.com/rcrowley/go-metrics/runtime.go
+++ b/vendor/github.com/rcrowley/go-metrics/runtime.go
@@ -3,6 +3,7 @@
import (
"runtime"
"runtime/pprof"
+ "sync"
"time"
)
@@ -49,7 +50,8 @@
numGC uint32
numCgoCalls int64
- threadCreateProfile = pprof.Lookup("threadcreate")
+ threadCreateProfile = pprof.Lookup("threadcreate")
+ registerRuntimeMetricsOnce = sync.Once{}
)
// Capture new values for the Go runtime statistics exported in
@@ -146,67 +148,69 @@
// specifically runtime.MemStats. The runtimeMetrics are named by their
// fully-qualified Go symbols, i.e. runtime.MemStats.Alloc.
func RegisterRuntimeMemStats(r Registry) {
- runtimeMetrics.MemStats.Alloc = NewGauge()
- runtimeMetrics.MemStats.BuckHashSys = NewGauge()
- runtimeMetrics.MemStats.DebugGC = NewGauge()
- runtimeMetrics.MemStats.EnableGC = NewGauge()
- runtimeMetrics.MemStats.Frees = NewGauge()
- runtimeMetrics.MemStats.HeapAlloc = NewGauge()
- runtimeMetrics.MemStats.HeapIdle = NewGauge()
- runtimeMetrics.MemStats.HeapInuse = NewGauge()
- runtimeMetrics.MemStats.HeapObjects = NewGauge()
- runtimeMetrics.MemStats.HeapReleased = NewGauge()
- runtimeMetrics.MemStats.HeapSys = NewGauge()
- runtimeMetrics.MemStats.LastGC = NewGauge()
- runtimeMetrics.MemStats.Lookups = NewGauge()
- runtimeMetrics.MemStats.Mallocs = NewGauge()
- runtimeMetrics.MemStats.MCacheInuse = NewGauge()
- runtimeMetrics.MemStats.MCacheSys = NewGauge()
- runtimeMetrics.MemStats.MSpanInuse = NewGauge()
- runtimeMetrics.MemStats.MSpanSys = NewGauge()
- runtimeMetrics.MemStats.NextGC = NewGauge()
- runtimeMetrics.MemStats.NumGC = NewGauge()
- runtimeMetrics.MemStats.GCCPUFraction = NewGaugeFloat64()
- runtimeMetrics.MemStats.PauseNs = NewHistogram(NewExpDecaySample(1028, 0.015))
- runtimeMetrics.MemStats.PauseTotalNs = NewGauge()
- runtimeMetrics.MemStats.StackInuse = NewGauge()
- runtimeMetrics.MemStats.StackSys = NewGauge()
- runtimeMetrics.MemStats.Sys = NewGauge()
- runtimeMetrics.MemStats.TotalAlloc = NewGauge()
- runtimeMetrics.NumCgoCall = NewGauge()
- runtimeMetrics.NumGoroutine = NewGauge()
- runtimeMetrics.NumThread = NewGauge()
- runtimeMetrics.ReadMemStats = NewTimer()
+ registerRuntimeMetricsOnce.Do(func() {
+ runtimeMetrics.MemStats.Alloc = NewGauge()
+ runtimeMetrics.MemStats.BuckHashSys = NewGauge()
+ runtimeMetrics.MemStats.DebugGC = NewGauge()
+ runtimeMetrics.MemStats.EnableGC = NewGauge()
+ runtimeMetrics.MemStats.Frees = NewGauge()
+ runtimeMetrics.MemStats.HeapAlloc = NewGauge()
+ runtimeMetrics.MemStats.HeapIdle = NewGauge()
+ runtimeMetrics.MemStats.HeapInuse = NewGauge()
+ runtimeMetrics.MemStats.HeapObjects = NewGauge()
+ runtimeMetrics.MemStats.HeapReleased = NewGauge()
+ runtimeMetrics.MemStats.HeapSys = NewGauge()
+ runtimeMetrics.MemStats.LastGC = NewGauge()
+ runtimeMetrics.MemStats.Lookups = NewGauge()
+ runtimeMetrics.MemStats.Mallocs = NewGauge()
+ runtimeMetrics.MemStats.MCacheInuse = NewGauge()
+ runtimeMetrics.MemStats.MCacheSys = NewGauge()
+ runtimeMetrics.MemStats.MSpanInuse = NewGauge()
+ runtimeMetrics.MemStats.MSpanSys = NewGauge()
+ runtimeMetrics.MemStats.NextGC = NewGauge()
+ runtimeMetrics.MemStats.NumGC = NewGauge()
+ runtimeMetrics.MemStats.GCCPUFraction = NewGaugeFloat64()
+ runtimeMetrics.MemStats.PauseNs = NewHistogram(NewExpDecaySample(1028, 0.015))
+ runtimeMetrics.MemStats.PauseTotalNs = NewGauge()
+ runtimeMetrics.MemStats.StackInuse = NewGauge()
+ runtimeMetrics.MemStats.StackSys = NewGauge()
+ runtimeMetrics.MemStats.Sys = NewGauge()
+ runtimeMetrics.MemStats.TotalAlloc = NewGauge()
+ runtimeMetrics.NumCgoCall = NewGauge()
+ runtimeMetrics.NumGoroutine = NewGauge()
+ runtimeMetrics.NumThread = NewGauge()
+ runtimeMetrics.ReadMemStats = NewTimer()
- r.Register("runtime.MemStats.Alloc", runtimeMetrics.MemStats.Alloc)
- r.Register("runtime.MemStats.BuckHashSys", runtimeMetrics.MemStats.BuckHashSys)
- r.Register("runtime.MemStats.DebugGC", runtimeMetrics.MemStats.DebugGC)
- r.Register("runtime.MemStats.EnableGC", runtimeMetrics.MemStats.EnableGC)
- r.Register("runtime.MemStats.Frees", runtimeMetrics.MemStats.Frees)
- r.Register("runtime.MemStats.HeapAlloc", runtimeMetrics.MemStats.HeapAlloc)
- r.Register("runtime.MemStats.HeapIdle", runtimeMetrics.MemStats.HeapIdle)
- r.Register("runtime.MemStats.HeapInuse", runtimeMetrics.MemStats.HeapInuse)
- r.Register("runtime.MemStats.HeapObjects", runtimeMetrics.MemStats.HeapObjects)
- r.Register("runtime.MemStats.HeapReleased", runtimeMetrics.MemStats.HeapReleased)
- r.Register("runtime.MemStats.HeapSys", runtimeMetrics.MemStats.HeapSys)
- r.Register("runtime.MemStats.LastGC", runtimeMetrics.MemStats.LastGC)
- r.Register("runtime.MemStats.Lookups", runtimeMetrics.MemStats.Lookups)
- r.Register("runtime.MemStats.Mallocs", runtimeMetrics.MemStats.Mallocs)
- r.Register("runtime.MemStats.MCacheInuse", runtimeMetrics.MemStats.MCacheInuse)
- r.Register("runtime.MemStats.MCacheSys", runtimeMetrics.MemStats.MCacheSys)
- r.Register("runtime.MemStats.MSpanInuse", runtimeMetrics.MemStats.MSpanInuse)
- r.Register("runtime.MemStats.MSpanSys", runtimeMetrics.MemStats.MSpanSys)
- r.Register("runtime.MemStats.NextGC", runtimeMetrics.MemStats.NextGC)
- r.Register("runtime.MemStats.NumGC", runtimeMetrics.MemStats.NumGC)
- r.Register("runtime.MemStats.GCCPUFraction", runtimeMetrics.MemStats.GCCPUFraction)
- r.Register("runtime.MemStats.PauseNs", runtimeMetrics.MemStats.PauseNs)
- r.Register("runtime.MemStats.PauseTotalNs", runtimeMetrics.MemStats.PauseTotalNs)
- r.Register("runtime.MemStats.StackInuse", runtimeMetrics.MemStats.StackInuse)
- r.Register("runtime.MemStats.StackSys", runtimeMetrics.MemStats.StackSys)
- r.Register("runtime.MemStats.Sys", runtimeMetrics.MemStats.Sys)
- r.Register("runtime.MemStats.TotalAlloc", runtimeMetrics.MemStats.TotalAlloc)
- r.Register("runtime.NumCgoCall", runtimeMetrics.NumCgoCall)
- r.Register("runtime.NumGoroutine", runtimeMetrics.NumGoroutine)
- r.Register("runtime.NumThread", runtimeMetrics.NumThread)
- r.Register("runtime.ReadMemStats", runtimeMetrics.ReadMemStats)
+ r.Register("runtime.MemStats.Alloc", runtimeMetrics.MemStats.Alloc)
+ r.Register("runtime.MemStats.BuckHashSys", runtimeMetrics.MemStats.BuckHashSys)
+ r.Register("runtime.MemStats.DebugGC", runtimeMetrics.MemStats.DebugGC)
+ r.Register("runtime.MemStats.EnableGC", runtimeMetrics.MemStats.EnableGC)
+ r.Register("runtime.MemStats.Frees", runtimeMetrics.MemStats.Frees)
+ r.Register("runtime.MemStats.HeapAlloc", runtimeMetrics.MemStats.HeapAlloc)
+ r.Register("runtime.MemStats.HeapIdle", runtimeMetrics.MemStats.HeapIdle)
+ r.Register("runtime.MemStats.HeapInuse", runtimeMetrics.MemStats.HeapInuse)
+ r.Register("runtime.MemStats.HeapObjects", runtimeMetrics.MemStats.HeapObjects)
+ r.Register("runtime.MemStats.HeapReleased", runtimeMetrics.MemStats.HeapReleased)
+ r.Register("runtime.MemStats.HeapSys", runtimeMetrics.MemStats.HeapSys)
+ r.Register("runtime.MemStats.LastGC", runtimeMetrics.MemStats.LastGC)
+ r.Register("runtime.MemStats.Lookups", runtimeMetrics.MemStats.Lookups)
+ r.Register("runtime.MemStats.Mallocs", runtimeMetrics.MemStats.Mallocs)
+ r.Register("runtime.MemStats.MCacheInuse", runtimeMetrics.MemStats.MCacheInuse)
+ r.Register("runtime.MemStats.MCacheSys", runtimeMetrics.MemStats.MCacheSys)
+ r.Register("runtime.MemStats.MSpanInuse", runtimeMetrics.MemStats.MSpanInuse)
+ r.Register("runtime.MemStats.MSpanSys", runtimeMetrics.MemStats.MSpanSys)
+ r.Register("runtime.MemStats.NextGC", runtimeMetrics.MemStats.NextGC)
+ r.Register("runtime.MemStats.NumGC", runtimeMetrics.MemStats.NumGC)
+ r.Register("runtime.MemStats.GCCPUFraction", runtimeMetrics.MemStats.GCCPUFraction)
+ r.Register("runtime.MemStats.PauseNs", runtimeMetrics.MemStats.PauseNs)
+ r.Register("runtime.MemStats.PauseTotalNs", runtimeMetrics.MemStats.PauseTotalNs)
+ r.Register("runtime.MemStats.StackInuse", runtimeMetrics.MemStats.StackInuse)
+ r.Register("runtime.MemStats.StackSys", runtimeMetrics.MemStats.StackSys)
+ r.Register("runtime.MemStats.Sys", runtimeMetrics.MemStats.Sys)
+ r.Register("runtime.MemStats.TotalAlloc", runtimeMetrics.MemStats.TotalAlloc)
+ r.Register("runtime.NumCgoCall", runtimeMetrics.NumCgoCall)
+ r.Register("runtime.NumGoroutine", runtimeMetrics.NumGoroutine)
+ r.Register("runtime.NumThread", runtimeMetrics.NumThread)
+ r.Register("runtime.ReadMemStats", runtimeMetrics.ReadMemStats)
+ })
}
diff --git a/vendor/github.com/sirupsen/logrus/.golangci.yml b/vendor/github.com/sirupsen/logrus/.golangci.yml
new file mode 100644
index 0000000..65dc285
--- /dev/null
+++ b/vendor/github.com/sirupsen/logrus/.golangci.yml
@@ -0,0 +1,40 @@
+run:
+ # do not run on test files yet
+ tests: false
+
+# all available settings of specific linters
+linters-settings:
+ errcheck:
+ # report about not checking of errors in type assetions: `a := b.(MyStruct)`;
+ # default is false: such cases aren't reported by default.
+ check-type-assertions: false
+
+ # report about assignment of errors to blank identifier: `num, _ := strconv.Atoi(numStr)`;
+ # default is false: such cases aren't reported by default.
+ check-blank: false
+
+ lll:
+ line-length: 100
+ tab-width: 4
+
+ prealloc:
+ simple: false
+ range-loops: false
+ for-loops: false
+
+ whitespace:
+ multi-if: false # Enforces newlines (or comments) after every multi-line if statement
+ multi-func: false # Enforces newlines (or comments) after every multi-line function signature
+
+linters:
+ enable:
+ - megacheck
+ - govet
+ disable:
+ - maligned
+ - prealloc
+ disable-all: false
+ presets:
+ - bugs
+ - unused
+ fast: false
diff --git a/vendor/github.com/sirupsen/logrus/.travis.yml b/vendor/github.com/sirupsen/logrus/.travis.yml
index 848938a..5e20aa4 100644
--- a/vendor/github.com/sirupsen/logrus/.travis.yml
+++ b/vendor/github.com/sirupsen/logrus/.travis.yml
@@ -4,21 +4,13 @@
depth: 1
env:
- GO111MODULE=on
- - GO111MODULE=off
-go: [ 1.11.x, 1.12.x ]
-os: [ linux, osx ]
-matrix:
- exclude:
- - go: 1.12.x
- env: GO111MODULE=off
- - go: 1.11.x
- os: osx
+go: [1.13.x, 1.14.x]
+os: [linux, osx]
install:
- ./travis/install.sh
- - if [[ "$GO111MODULE" == "on" ]]; then go mod download; fi
- - if [[ "$GO111MODULE" == "off" ]]; then go get github.com/stretchr/testify/assert golang.org/x/sys/unix github.com/konsorten/go-windows-terminal-sequences; fi
script:
- ./travis/cross_build.sh
+ - ./travis/lint.sh
- export GOMAXPROCS=4
- export GORACE=halt_on_error=1
- go test -race -v ./...
diff --git a/vendor/github.com/sirupsen/logrus/README.md b/vendor/github.com/sirupsen/logrus/README.md
index a4796eb..5796706 100644
--- a/vendor/github.com/sirupsen/logrus/README.md
+++ b/vendor/github.com/sirupsen/logrus/README.md
@@ -1,8 +1,28 @@
-# Logrus <img src="http://i.imgur.com/hTeVwmJ.png" width="40" height="40" alt=":walrus:" class="emoji" title=":walrus:"/> [](https://travis-ci.org/sirupsen/logrus) [](https://godoc.org/github.com/sirupsen/logrus)
+# Logrus <img src="http://i.imgur.com/hTeVwmJ.png" width="40" height="40" alt=":walrus:" class="emoji" title=":walrus:"/> [](https://travis-ci.org/sirupsen/logrus) [](https://godoc.org/github.com/sirupsen/logrus)
Logrus is a structured logger for Go (golang), completely API compatible with
the standard library logger.
+**Logrus is in maintenance-mode.** We will not be introducing new features. It's
+simply too hard to do in a way that won't break many people's projects, which is
+the last thing you want from your Logging library (again...).
+
+This does not mean Logrus is dead. Logrus will continue to be maintained for
+security, (backwards compatible) bug fixes, and performance (where we are
+limited by the interface).
+
+I believe Logrus' biggest contribution is to have played a part in today's
+widespread use of structured logging in Golang. There doesn't seem to be a
+reason to do a major, breaking iteration into Logrus V2, since the fantastic Go
+community has built those independently. Many fantastic alternatives have sprung
+up. Logrus would look like those, had it been re-designed with what we know
+about structured logging in Go today. Check out, for example,
+[Zerolog][zerolog], [Zap][zap], and [Apex][apex].
+
+[zerolog]: https://github.com/rs/zerolog
+[zap]: https://github.com/uber-go/zap
+[apex]: https://github.com/apex/log
+
**Seeing weird case-sensitive problems?** It's in the past been possible to
import Logrus as both upper- and lower-case. Due to the Go package environment,
this caused issues in the community and we needed a standard. Some environments
@@ -15,11 +35,6 @@
For an in-depth explanation of the casing issue, see [this
comment](https://github.com/sirupsen/logrus/issues/570#issuecomment-313933276).
-**Are you interested in assisting in maintaining Logrus?** Currently I have a
-lot of obligations, and I am unable to provide Logrus with the maintainership it
-needs. If you'd like to help, please reach out to me at `simon at author's
-username dot com`.
-
Nicely color-coded in development (when a TTY is attached, otherwise just
plain text):
@@ -187,7 +202,7 @@
log.Out = os.Stdout
// You could set this to any `io.Writer` such as a file
- // file, err := os.OpenFile("logrus.log", os.O_CREATE|os.O_WRONLY, 0666)
+ // file, err := os.OpenFile("logrus.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
// if err == nil {
// log.Out = file
// } else {
@@ -272,7 +287,7 @@
```
Note: Syslog hook also support connecting to local syslog (Ex. "/dev/log" or "/var/run/syslog" or "/var/run/log"). For the detail, please check the [syslog hook README](hooks/syslog/README.md).
-A list of currently known of service hook can be found in this wiki [page](https://github.com/sirupsen/logrus/wiki/Hooks)
+A list of currently known service hooks can be found in this wiki [page](https://github.com/sirupsen/logrus/wiki/Hooks)
#### Level logging
@@ -354,6 +369,7 @@
[github.com/mattn/go-colorable](https://github.com/mattn/go-colorable).
* When colors are enabled, levels are truncated to 4 characters by default. To disable
truncation set the `DisableLevelTruncation` field to `true`.
+ * When outputting to a TTY, it's often helpful to visually scan down a column where all the levels are the same width. Setting the `PadLevelText` field to `true` enables this behavior, by adding padding to the level text.
* All options are listed in the [generated docs](https://godoc.org/github.com/sirupsen/logrus#TextFormatter).
* `logrus.JSONFormatter`. Logs fields as JSON.
* All options are listed in the [generated docs](https://godoc.org/github.com/sirupsen/logrus#JSONFormatter).
@@ -364,8 +380,10 @@
* [`GELF`](https://github.com/fabienm/go-logrus-formatters). Formats entries so they comply to Graylog's [GELF 1.1 specification](http://docs.graylog.org/en/2.4/pages/gelf.html).
* [`logstash`](https://github.com/bshuster-repo/logrus-logstash-hook). Logs fields as [Logstash](http://logstash.net) Events.
* [`prefixed`](https://github.com/x-cray/logrus-prefixed-formatter). Displays log entry source along with alternative layout.
-* [`zalgo`](https://github.com/aybabtme/logzalgo). Invoking the P͉̫o̳̼̊w̖͈̰͎e̬͔̭͂r͚̼̹̲ ̫͓͉̳͈ō̠͕͖̚f̝͍̠ ͕̲̞͖͑Z̖̫̤̫ͪa͉̬͈̗l͖͎g̳̥o̰̥̅!̣͔̲̻͊̄ ̙̘̦̹̦.
+* [`zalgo`](https://github.com/aybabtme/logzalgo). Invoking the Power of Zalgo.
* [`nested-logrus-formatter`](https://github.com/antonfisher/nested-logrus-formatter). Converts logrus fields to a nested structure.
+* [`powerful-logrus-formatter`](https://github.com/zput/zxcTool). get fileName, log's line number and the latest function's name when print log; Sava log to files.
+* [`caption-json-formatter`](https://github.com/nolleh/caption_json_formatter). logrus's message json formatter with human-readable caption added.
You can define your formatter by implementing the `Formatter` interface,
requiring a `Format` method. `Format` takes an `*Entry`. `entry.Data` is a
@@ -430,14 +448,14 @@
| Tool | Description |
| ---- | ----------- |
-|[Logrus Mate](https://github.com/gogap/logrus_mate)|Logrus mate is a tool for Logrus to manage loggers, you can initial logger's level, hook and formatter by config file, the logger will generated with different config at different environment.|
+|[Logrus Mate](https://github.com/gogap/logrus_mate)|Logrus mate is a tool for Logrus to manage loggers, you can initial logger's level, hook and formatter by config file, the logger will be generated with different configs in different environments.|
|[Logrus Viper Helper](https://github.com/heirko/go-contrib/tree/master/logrusHelper)|An Helper around Logrus to wrap with spf13/Viper to load configuration with fangs! And to simplify Logrus configuration use some behavior of [Logrus Mate](https://github.com/gogap/logrus_mate). [sample](https://github.com/heirko/iris-contrib/blob/master/middleware/logrus-logger/example) |
#### Testing
Logrus has a built in facility for asserting the presence of log messages. This is implemented through the `test` hook and provides:
-* decorators for existing logger (`test.NewLocal` and `test.NewGlobal`) which basically just add the `test` hook
+* decorators for existing logger (`test.NewLocal` and `test.NewGlobal`) which basically just adds the `test` hook
* a test logger (`test.NewNullLogger`) that just records log messages (and does not output any):
```go
@@ -465,7 +483,7 @@
Logrus can register one or more functions that will be called when any `fatal`
level message is logged. The registered handlers will be executed before
-logrus performs a `os.Exit(1)`. This behavior may be helpful if callers need
+logrus performs an `os.Exit(1)`. This behavior may be helpful if callers need
to gracefully shutdown. Unlike a `panic("Something went wrong...")` call which can be intercepted with a deferred `recover` a call to `os.Exit(1)` can not be intercepted.
```
@@ -490,6 +508,6 @@
1) logger.Out is protected by locks.
- 2) logger.Out is a os.File handler opened with `O_APPEND` flag, and every write is smaller than 4k. (This allow multi-thread/multi-process writing)
+ 2) logger.Out is an os.File handler opened with `O_APPEND` flag, and every write is smaller than 4k. (This allows multi-thread/multi-process writing)
(Refer to http://www.notthewizard.com/2014/06/17/are-files-appends-really-atomic/)
diff --git a/vendor/github.com/sirupsen/logrus/entry.go b/vendor/github.com/sirupsen/logrus/entry.go
index 63e2558..27b14bf 100644
--- a/vendor/github.com/sirupsen/logrus/entry.go
+++ b/vendor/github.com/sirupsen/logrus/entry.go
@@ -85,10 +85,15 @@
}
}
+// Returns the bytes representation of this entry from the formatter.
+func (entry *Entry) Bytes() ([]byte, error) {
+ return entry.Logger.Formatter.Format(entry)
+}
+
// Returns the string representation from the reader and ultimately the
// formatter.
func (entry *Entry) String() (string, error) {
- serialized, err := entry.Logger.Formatter.Format(entry)
+ serialized, err := entry.Bytes()
if err != nil {
return "", err
}
@@ -103,7 +108,11 @@
// Add a context to the Entry.
func (entry *Entry) WithContext(ctx context.Context) *Entry {
- return &Entry{Logger: entry.Logger, Data: entry.Data, Time: entry.Time, err: entry.err, Context: ctx}
+ dataCopy := make(Fields, len(entry.Data))
+ for k, v := range entry.Data {
+ dataCopy[k] = v
+ }
+ return &Entry{Logger: entry.Logger, Data: dataCopy, Time: entry.Time, err: entry.err, Context: ctx}
}
// Add a single field to the Entry.
@@ -113,6 +122,8 @@
// Add a map of fields to the Entry.
func (entry *Entry) WithFields(fields Fields) *Entry {
+ entry.Logger.mu.Lock()
+ defer entry.Logger.mu.Unlock()
data := make(Fields, len(entry.Data)+len(fields))
for k, v := range entry.Data {
data[k] = v
@@ -144,7 +155,11 @@
// Overrides the time of the Entry.
func (entry *Entry) WithTime(t time.Time) *Entry {
- return &Entry{Logger: entry.Logger, Data: entry.Data, Time: t, err: entry.err, Context: entry.Context}
+ dataCopy := make(Fields, len(entry.Data))
+ for k, v := range entry.Data {
+ dataCopy[k] = v
+ }
+ return &Entry{Logger: entry.Logger, Data: dataCopy, Time: t, err: entry.err, Context: entry.Context}
}
// getPackageName reduces a fully qualified function name to the package name
@@ -165,15 +180,20 @@
// getCaller retrieves the name of the first non-logrus calling function
func getCaller() *runtime.Frame {
-
// cache this package's fully-qualified name
callerInitOnce.Do(func() {
- pcs := make([]uintptr, 2)
+ pcs := make([]uintptr, maximumCallerDepth)
_ = runtime.Callers(0, pcs)
- logrusPackage = getPackageName(runtime.FuncForPC(pcs[1]).Name())
- // now that we have the cache, we can skip a minimum count of known-logrus functions
- // XXX this is dubious, the number of frames may vary
+ // dynamic get the package name and the minimum caller depth
+ for i := 0; i < maximumCallerDepth; i++ {
+ funcName := runtime.FuncForPC(pcs[i]).Name()
+ if strings.Contains(funcName, "getCaller") {
+ logrusPackage = getPackageName(funcName)
+ break
+ }
+ }
+
minimumCallerDepth = knownLogrusFrames
})
@@ -187,7 +207,7 @@
// If the caller isn't part of this package, we're done
if pkg != logrusPackage {
- return &f
+ return &f //nolint:scopelint
}
}
@@ -217,9 +237,11 @@
entry.Level = level
entry.Message = msg
+ entry.Logger.mu.Lock()
if entry.Logger.ReportCaller {
entry.Caller = getCaller()
}
+ entry.Logger.mu.Unlock()
entry.fireHooks()
@@ -255,11 +277,10 @@
serialized, err := entry.Logger.Formatter.Format(entry)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to obtain reader, %v\n", err)
- } else {
- _, err = entry.Logger.Out.Write(serialized)
- if err != nil {
- fmt.Fprintf(os.Stderr, "Failed to write to log, %v\n", err)
- }
+ return
+ }
+ if _, err = entry.Logger.Out.Write(serialized); err != nil {
+ fmt.Fprintf(os.Stderr, "Failed to write to log, %v\n", err)
}
}
diff --git a/vendor/github.com/sirupsen/logrus/exported.go b/vendor/github.com/sirupsen/logrus/exported.go
index 62fc2f2..42b04f6 100644
--- a/vendor/github.com/sirupsen/logrus/exported.go
+++ b/vendor/github.com/sirupsen/logrus/exported.go
@@ -80,7 +80,7 @@
return std.WithFields(fields)
}
-// WithTime creats an entry from the standard logger and overrides the time of
+// WithTime creates an entry from the standard logger and overrides the time of
// logs generated with it.
//
// Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal
diff --git a/vendor/github.com/sirupsen/logrus/go.mod b/vendor/github.com/sirupsen/logrus/go.mod
index 12fdf98..9ea6e84 100644
--- a/vendor/github.com/sirupsen/logrus/go.mod
+++ b/vendor/github.com/sirupsen/logrus/go.mod
@@ -4,7 +4,8 @@
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.1
github.com/pmezard/go-difflib v1.0.0 // indirect
- github.com/stretchr/objx v0.1.1 // indirect
github.com/stretchr/testify v1.2.2
golang.org/x/sys v0.0.0-20190422165155-953cdadca894
)
+
+go 1.13
diff --git a/vendor/github.com/sirupsen/logrus/go.sum b/vendor/github.com/sirupsen/logrus/go.sum
index 596c318..95a3f07 100644
--- a/vendor/github.com/sirupsen/logrus/go.sum
+++ b/vendor/github.com/sirupsen/logrus/go.sum
@@ -1,16 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe h1:CHRGQ8V7OlCYtwaKPJi3iA7J+YdNKdo8j7nG5IgDhjs=
-github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
-github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
-golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
diff --git a/vendor/github.com/sirupsen/logrus/json_formatter.go b/vendor/github.com/sirupsen/logrus/json_formatter.go
index 098a21a..ba7f237 100644
--- a/vendor/github.com/sirupsen/logrus/json_formatter.go
+++ b/vendor/github.com/sirupsen/logrus/json_formatter.go
@@ -28,6 +28,9 @@
// DisableTimestamp allows disabling automatic timestamps in output
DisableTimestamp bool
+ // DisableHTMLEscape allows disabling html escaping in output
+ DisableHTMLEscape bool
+
// DataKey allows users to put all the log entry parameters into a nested dictionary at a given key.
DataKey string
@@ -110,6 +113,7 @@
}
encoder := json.NewEncoder(b)
+ encoder.SetEscapeHTML(!f.DisableHTMLEscape)
if f.PrettyPrint {
encoder.SetIndent("", " ")
}
diff --git a/vendor/github.com/sirupsen/logrus/logger.go b/vendor/github.com/sirupsen/logrus/logger.go
index c0c0b1e..6fdda74 100644
--- a/vendor/github.com/sirupsen/logrus/logger.go
+++ b/vendor/github.com/sirupsen/logrus/logger.go
@@ -68,10 +68,10 @@
// `Out` and `Hooks` directly on the default logger instance. You can also just
// instantiate your own:
//
-// var log = &Logger{
+// var log = &logrus.Logger{
// Out: os.Stderr,
-// Formatter: new(JSONFormatter),
-// Hooks: make(LevelHooks),
+// Formatter: new(logrus.JSONFormatter),
+// Hooks: make(logrus.LevelHooks),
// Level: logrus.DebugLevel,
// }
//
@@ -100,8 +100,9 @@
logger.entryPool.Put(entry)
}
-// Adds a field to the log entry, note that it doesn't log until you call
-// Debug, Print, Info, Warn, Error, Fatal or Panic. It only creates a log entry.
+// WithField allocates a new entry and adds a field to it.
+// Debug, Print, Info, Warn, Error, Fatal or Panic must be then applied to
+// this new returned entry.
// If you want multiple fields, use `WithFields`.
func (logger *Logger) WithField(key string, value interface{}) *Entry {
entry := logger.newEntry()
diff --git a/vendor/github.com/sirupsen/logrus/logrus.go b/vendor/github.com/sirupsen/logrus/logrus.go
index 8644761..2f16224 100644
--- a/vendor/github.com/sirupsen/logrus/logrus.go
+++ b/vendor/github.com/sirupsen/logrus/logrus.go
@@ -51,7 +51,7 @@
return err
}
- *level = Level(l)
+ *level = l
return nil
}
diff --git a/vendor/github.com/sirupsen/logrus/terminal_check_bsd.go b/vendor/github.com/sirupsen/logrus/terminal_check_bsd.go
index 3c4f43f..4997899 100644
--- a/vendor/github.com/sirupsen/logrus/terminal_check_bsd.go
+++ b/vendor/github.com/sirupsen/logrus/terminal_check_bsd.go
@@ -1,4 +1,5 @@
// +build darwin dragonfly freebsd netbsd openbsd
+// +build !js
package logrus
@@ -10,4 +11,3 @@
_, err := unix.IoctlGetTermios(fd, ioctlReadTermios)
return err == nil
}
-
diff --git a/vendor/github.com/sirupsen/logrus/terminal_check_js.go b/vendor/github.com/sirupsen/logrus/terminal_check_js.go
new file mode 100644
index 0000000..ebdae3e
--- /dev/null
+++ b/vendor/github.com/sirupsen/logrus/terminal_check_js.go
@@ -0,0 +1,7 @@
+// +build js
+
+package logrus
+
+func isTerminal(fd int) bool {
+ return false
+}
diff --git a/vendor/github.com/sirupsen/logrus/terminal_check_unix.go b/vendor/github.com/sirupsen/logrus/terminal_check_unix.go
index 355dc96..cc4fe6e 100644
--- a/vendor/github.com/sirupsen/logrus/terminal_check_unix.go
+++ b/vendor/github.com/sirupsen/logrus/terminal_check_unix.go
@@ -1,4 +1,5 @@
// +build linux aix
+// +build !js
package logrus
@@ -10,4 +11,3 @@
_, err := unix.IoctlGetTermios(fd, ioctlReadTermios)
return err == nil
}
-
diff --git a/vendor/github.com/sirupsen/logrus/text_formatter.go b/vendor/github.com/sirupsen/logrus/text_formatter.go
index e01587c..2d15a23 100644
--- a/vendor/github.com/sirupsen/logrus/text_formatter.go
+++ b/vendor/github.com/sirupsen/logrus/text_formatter.go
@@ -6,9 +6,11 @@
"os"
"runtime"
"sort"
+ "strconv"
"strings"
"sync"
"time"
+ "unicode/utf8"
)
const (
@@ -32,6 +34,9 @@
// Force disabling colors.
DisableColors bool
+ // Force quoting of all values
+ ForceQuote bool
+
// Override coloring based on CLICOLOR and CLICOLOR_FORCE. - https://bixense.com/clicolors/
EnvironmentOverrideColors bool
@@ -57,6 +62,10 @@
// Disables the truncation of the level text to 4 characters.
DisableLevelTruncation bool
+ // PadLevelText Adds padding the level text so that all the levels output at the same length
+ // PadLevelText is a superset of the DisableLevelTruncation option
+ PadLevelText bool
+
// QuoteEmptyFields will wrap empty fields in quotes if true
QuoteEmptyFields bool
@@ -79,23 +88,32 @@
CallerPrettyfier func(*runtime.Frame) (function string, file string)
terminalInitOnce sync.Once
+
+ // The max length of the level text, generated dynamically on init
+ levelTextMaxLength int
}
func (f *TextFormatter) init(entry *Entry) {
if entry.Logger != nil {
f.isTerminal = checkIfTerminal(entry.Logger.Out)
}
+ // Get the max length of the level text
+ for _, level := range AllLevels {
+ levelTextLength := utf8.RuneCount([]byte(level.String()))
+ if levelTextLength > f.levelTextMaxLength {
+ f.levelTextMaxLength = levelTextLength
+ }
+ }
}
func (f *TextFormatter) isColored() bool {
isColored := f.ForceColors || (f.isTerminal && (runtime.GOOS != "windows"))
if f.EnvironmentOverrideColors {
- if force, ok := os.LookupEnv("CLICOLOR_FORCE"); ok && force != "0" {
+ switch force, ok := os.LookupEnv("CLICOLOR_FORCE"); {
+ case ok && force != "0":
isColored = true
- } else if ok && force == "0" {
- isColored = false
- } else if os.Getenv("CLICOLOR") == "0" {
+ case ok && force == "0", os.Getenv("CLICOLOR") == "0":
isColored = false
}
}
@@ -217,9 +235,18 @@
}
levelText := strings.ToUpper(entry.Level.String())
- if !f.DisableLevelTruncation {
+ if !f.DisableLevelTruncation && !f.PadLevelText {
levelText = levelText[0:4]
}
+ if f.PadLevelText {
+ // Generates the format string used in the next line, for example "%-6s" or "%-7s".
+ // Based on the max level text length.
+ formatString := "%-" + strconv.Itoa(f.levelTextMaxLength) + "s"
+ // Formats the level text by appending spaces up to the max length, for example:
+ // - "INFO "
+ // - "WARNING"
+ levelText = fmt.Sprintf(formatString, levelText)
+ }
// Remove a single newline if it already exists in the message to keep
// the behavior of logrus text_formatter the same as the stdlib log package
@@ -243,11 +270,12 @@
}
}
- if f.DisableTimestamp {
+ switch {
+ case f.DisableTimestamp:
fmt.Fprintf(b, "\x1b[%dm%s\x1b[0m%s %-44s ", levelColor, levelText, caller, entry.Message)
- } else if !f.FullTimestamp {
+ case !f.FullTimestamp:
fmt.Fprintf(b, "\x1b[%dm%s\x1b[0m[%04d]%s %-44s ", levelColor, levelText, int(entry.Time.Sub(baseTimestamp)/time.Second), caller, entry.Message)
- } else {
+ default:
fmt.Fprintf(b, "\x1b[%dm%s\x1b[0m[%s]%s %-44s ", levelColor, levelText, entry.Time.Format(timestampFormat), caller, entry.Message)
}
for _, k := range keys {
@@ -258,6 +286,9 @@
}
func (f *TextFormatter) needsQuoting(text string) bool {
+ if f.ForceQuote {
+ return true
+ }
if f.QuoteEmptyFields && len(text) == 0 {
return true
}
diff --git a/vendor/github.com/sirupsen/logrus/writer.go b/vendor/github.com/sirupsen/logrus/writer.go
index 9e1f751..72e8e3a 100644
--- a/vendor/github.com/sirupsen/logrus/writer.go
+++ b/vendor/github.com/sirupsen/logrus/writer.go
@@ -6,10 +6,16 @@
"runtime"
)
+// Writer at INFO level. See WriterLevel for details.
func (logger *Logger) Writer() *io.PipeWriter {
return logger.WriterLevel(InfoLevel)
}
+// WriterLevel returns an io.Writer that can be used to write arbitrary text to
+// the logger at the given log level. Each line written to the writer will be
+// printed in the usual way using formatters and hooks. The writer is part of an
+// io.Pipe and it is the callers responsibility to close the writer when done.
+// This can be used to override the standard library logger easily.
func (logger *Logger) WriterLevel(level Level) *io.PipeWriter {
return NewEntry(logger).WriterLevel(level)
}