[VOL-5486] Upgrade library versions
Change-Id: I8b4e88699e03f44ee13e467867f45ae3f0a63c4b
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/sink.go b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
index a2e7c34..9ea598b 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/sink.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
@@ -21,45 +21,36 @@
import (
"bufio"
"encoding/binary"
- "fmt"
"io"
- "io/ioutil"
"sync"
"time"
- "github.com/golang/protobuf/proto"
- pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
- "google.golang.org/grpc/grpclog"
+ binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
+ "google.golang.org/protobuf/proto"
)
var (
- defaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
+ // DefaultSink is the sink where the logs will be written to. It's exported
+ // for the binarylog package to update.
+ DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
)
-// SetDefaultSink sets the sink where binary logs will be written to.
-//
-// Not thread safe. Only set during initialization.
-func SetDefaultSink(s Sink) {
- if defaultSink != nil {
- defaultSink.Close()
- }
- defaultSink = s
-}
-
// Sink writes log entry into the binary log sink.
+//
+// sink is a copy of the exported binarylog.Sink, to avoid circular dependency.
type Sink interface {
// Write will be called to write the log entry into the sink.
//
// It should be thread-safe so it can be called in parallel.
- Write(*pb.GrpcLogEntry) error
+ Write(*binlogpb.GrpcLogEntry) error
// Close will be called when the Sink is replaced by a new Sink.
Close() error
}
type noopSink struct{}
-func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil }
-func (ns *noopSink) Close() error { return nil }
+func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil }
+func (ns *noopSink) Close() error { return nil }
// newWriterSink creates a binary log sink with the given writer.
//
@@ -67,7 +58,7 @@
// message is prefixed with a 4 byte big endian unsigned integer as the length.
//
// No buffer is done, Close() doesn't try to close the writer.
-func newWriterSink(w io.Writer) *writerSink {
+func newWriterSink(w io.Writer) Sink {
return &writerSink{out: w}
}
@@ -75,10 +66,11 @@
out io.Writer
}
-func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
+func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error {
b, err := proto.Marshal(e)
if err != nil {
- grpclog.Infof("binary logging: failed to marshal proto message: %v", err)
+ grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
+ return err
}
hdr := make([]byte, 4)
binary.BigEndian.PutUint32(hdr, uint32(len(b)))
@@ -93,25 +85,28 @@
func (ws *writerSink) Close() error { return nil }
-type bufWriteCloserSink struct {
- mu sync.Mutex
- closer io.Closer
- out *writerSink // out is built on buf.
- buf *bufio.Writer // buf is kept for flush.
+type bufferedSink struct {
+ mu sync.Mutex
+ closer io.Closer
+ out Sink // out is built on buf.
+ buf *bufio.Writer // buf is kept for flush.
+ flusherStarted bool
- writeStartOnce sync.Once
- writeTicker *time.Ticker
+ writeTicker *time.Ticker
+ done chan struct{}
}
-func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error {
- // Start the write loop when Write is called.
- fs.writeStartOnce.Do(fs.startFlushGoroutine)
+func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error {
fs.mu.Lock()
+ defer fs.mu.Unlock()
+ if !fs.flusherStarted {
+ // Start the write loop when Write is called.
+ fs.startFlushGoroutine()
+ fs.flusherStarted = true
+ }
if err := fs.out.Write(e); err != nil {
- fs.mu.Unlock()
return err
}
- fs.mu.Unlock()
return nil
}
@@ -119,44 +114,57 @@
bufFlushDuration = 60 * time.Second
)
-func (fs *bufWriteCloserSink) startFlushGoroutine() {
+func (fs *bufferedSink) startFlushGoroutine() {
fs.writeTicker = time.NewTicker(bufFlushDuration)
go func() {
- for range fs.writeTicker.C {
+ for {
+ select {
+ case <-fs.done:
+ return
+ case <-fs.writeTicker.C:
+ }
fs.mu.Lock()
- fs.buf.Flush()
+ if err := fs.buf.Flush(); err != nil {
+ grpclogLogger.Warningf("failed to flush to Sink: %v", err)
+ }
fs.mu.Unlock()
}
}()
}
-func (fs *bufWriteCloserSink) Close() error {
+func (fs *bufferedSink) Close() error {
+ fs.mu.Lock()
+ defer fs.mu.Unlock()
if fs.writeTicker != nil {
fs.writeTicker.Stop()
}
- fs.mu.Lock()
- fs.buf.Flush()
- fs.closer.Close()
- fs.out.Close()
- fs.mu.Unlock()
+ close(fs.done)
+ if err := fs.buf.Flush(); err != nil {
+ grpclogLogger.Warningf("failed to flush to Sink: %v", err)
+ }
+ if err := fs.closer.Close(); err != nil {
+ grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err)
+ }
+ if err := fs.out.Close(); err != nil {
+ grpclogLogger.Warningf("failed to close the Sink: %v", err)
+ }
return nil
}
-func newBufWriteCloserSink(o io.WriteCloser) Sink {
+// NewBufferedSink creates a binary log sink with the given WriteCloser.
+//
+// Write() marshals the proto message and writes it to the given writer. Each
+// message is prefixed with a 4 byte big endian unsigned integer as the length.
+//
+// Content is kept in a buffer, and is flushed every 60 seconds.
+//
+// Close closes the WriteCloser.
+func NewBufferedSink(o io.WriteCloser) Sink {
bufW := bufio.NewWriter(o)
- return &bufWriteCloserSink{
+ return &bufferedSink{
closer: o,
out: newWriterSink(bufW),
buf: bufW,
+ done: make(chan struct{}),
}
}
-
-// NewTempFileSink creates a temp file and returns a Sink that writes to this
-// file.
-func NewTempFileSink() (Sink, error) {
- tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt")
- if err != nil {
- return nil, fmt.Errorf("failed to create temp file: %v", err)
- }
- return newBufWriteCloserSink(tempFile), nil
-}