blob: 9ea598b14c077d3e9443a2e3ac59231b0f577510 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package binarylog
20
21import (
22 "bufio"
23 "encoding/binary"
William Kurkianea869482019-04-09 15:16:11 -040024 "io"
William Kurkianea869482019-04-09 15:16:11 -040025 "sync"
26 "time"
27
Abhay Kumara61c5222025-11-10 07:32:50 +000028 binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
29 "google.golang.org/protobuf/proto"
William Kurkianea869482019-04-09 15:16:11 -040030)
31
32var (
Abhay Kumara61c5222025-11-10 07:32:50 +000033 // DefaultSink is the sink where the logs will be written to. It's exported
34 // for the binarylog package to update.
35 DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
William Kurkianea869482019-04-09 15:16:11 -040036)
37
William Kurkianea869482019-04-09 15:16:11 -040038// Sink writes log entry into the binary log sink.
Abhay Kumara61c5222025-11-10 07:32:50 +000039//
40// sink is a copy of the exported binarylog.Sink, to avoid circular dependency.
William Kurkianea869482019-04-09 15:16:11 -040041type Sink interface {
42 // Write will be called to write the log entry into the sink.
43 //
44 // It should be thread-safe so it can be called in parallel.
Abhay Kumara61c5222025-11-10 07:32:50 +000045 Write(*binlogpb.GrpcLogEntry) error
William Kurkianea869482019-04-09 15:16:11 -040046 // Close will be called when the Sink is replaced by a new Sink.
47 Close() error
48}
49
50type noopSink struct{}
51
Abhay Kumara61c5222025-11-10 07:32:50 +000052func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil }
53func (ns *noopSink) Close() error { return nil }
William Kurkianea869482019-04-09 15:16:11 -040054
55// newWriterSink creates a binary log sink with the given writer.
56//
Devmalya Pauldd23a992019-11-14 07:06:31 +000057// Write() marshals the proto message and writes it to the given writer. Each
William Kurkianea869482019-04-09 15:16:11 -040058// message is prefixed with a 4 byte big endian unsigned integer as the length.
59//
60// No buffer is done, Close() doesn't try to close the writer.
Abhay Kumara61c5222025-11-10 07:32:50 +000061func newWriterSink(w io.Writer) Sink {
William Kurkianea869482019-04-09 15:16:11 -040062 return &writerSink{out: w}
63}
64
65type writerSink struct {
66 out io.Writer
67}
68
Abhay Kumara61c5222025-11-10 07:32:50 +000069func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error {
William Kurkianea869482019-04-09 15:16:11 -040070 b, err := proto.Marshal(e)
71 if err != nil {
Abhay Kumara61c5222025-11-10 07:32:50 +000072 grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
73 return err
William Kurkianea869482019-04-09 15:16:11 -040074 }
75 hdr := make([]byte, 4)
76 binary.BigEndian.PutUint32(hdr, uint32(len(b)))
77 if _, err := ws.out.Write(hdr); err != nil {
78 return err
79 }
80 if _, err := ws.out.Write(b); err != nil {
81 return err
82 }
83 return nil
84}
85
86func (ws *writerSink) Close() error { return nil }
87
Abhay Kumara61c5222025-11-10 07:32:50 +000088type bufferedSink struct {
89 mu sync.Mutex
90 closer io.Closer
91 out Sink // out is built on buf.
92 buf *bufio.Writer // buf is kept for flush.
93 flusherStarted bool
William Kurkianea869482019-04-09 15:16:11 -040094
Abhay Kumara61c5222025-11-10 07:32:50 +000095 writeTicker *time.Ticker
96 done chan struct{}
William Kurkianea869482019-04-09 15:16:11 -040097}
98
Abhay Kumara61c5222025-11-10 07:32:50 +000099func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error {
William Kurkianea869482019-04-09 15:16:11 -0400100 fs.mu.Lock()
Abhay Kumara61c5222025-11-10 07:32:50 +0000101 defer fs.mu.Unlock()
102 if !fs.flusherStarted {
103 // Start the write loop when Write is called.
104 fs.startFlushGoroutine()
105 fs.flusherStarted = true
106 }
William Kurkianea869482019-04-09 15:16:11 -0400107 if err := fs.out.Write(e); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400108 return err
109 }
William Kurkianea869482019-04-09 15:16:11 -0400110 return nil
111}
112
113const (
114 bufFlushDuration = 60 * time.Second
115)
116
Abhay Kumara61c5222025-11-10 07:32:50 +0000117func (fs *bufferedSink) startFlushGoroutine() {
William Kurkianea869482019-04-09 15:16:11 -0400118 fs.writeTicker = time.NewTicker(bufFlushDuration)
119 go func() {
Abhay Kumara61c5222025-11-10 07:32:50 +0000120 for {
121 select {
122 case <-fs.done:
123 return
124 case <-fs.writeTicker.C:
125 }
William Kurkianea869482019-04-09 15:16:11 -0400126 fs.mu.Lock()
Abhay Kumara61c5222025-11-10 07:32:50 +0000127 if err := fs.buf.Flush(); err != nil {
128 grpclogLogger.Warningf("failed to flush to Sink: %v", err)
129 }
William Kurkianea869482019-04-09 15:16:11 -0400130 fs.mu.Unlock()
131 }
132 }()
133}
134
Abhay Kumara61c5222025-11-10 07:32:50 +0000135func (fs *bufferedSink) Close() error {
136 fs.mu.Lock()
137 defer fs.mu.Unlock()
William Kurkianea869482019-04-09 15:16:11 -0400138 if fs.writeTicker != nil {
139 fs.writeTicker.Stop()
140 }
Abhay Kumara61c5222025-11-10 07:32:50 +0000141 close(fs.done)
142 if err := fs.buf.Flush(); err != nil {
143 grpclogLogger.Warningf("failed to flush to Sink: %v", err)
144 }
145 if err := fs.closer.Close(); err != nil {
146 grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err)
147 }
148 if err := fs.out.Close(); err != nil {
149 grpclogLogger.Warningf("failed to close the Sink: %v", err)
150 }
William Kurkianea869482019-04-09 15:16:11 -0400151 return nil
152}
153
Abhay Kumara61c5222025-11-10 07:32:50 +0000154// NewBufferedSink creates a binary log sink with the given WriteCloser.
155//
156// Write() marshals the proto message and writes it to the given writer. Each
157// message is prefixed with a 4 byte big endian unsigned integer as the length.
158//
159// Content is kept in a buffer, and is flushed every 60 seconds.
160//
161// Close closes the WriteCloser.
162func NewBufferedSink(o io.WriteCloser) Sink {
William Kurkianea869482019-04-09 15:16:11 -0400163 bufW := bufio.NewWriter(o)
Abhay Kumara61c5222025-11-10 07:32:50 +0000164 return &bufferedSink{
William Kurkianea869482019-04-09 15:16:11 -0400165 closer: o,
166 out: newWriterSink(bufW),
167 buf: bufW,
Abhay Kumara61c5222025-11-10 07:32:50 +0000168 done: make(chan struct{}),
William Kurkianea869482019-04-09 15:16:11 -0400169 }
170}