blob: cb091e542a3ec4d3294056ada66065000fd0d837 [file] [log] [blame]
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracker
// inflight describes an in-flight MsgApp message.
type inflight struct {
index uint64 // the index of the last entry inside the message
bytes uint64 // the total byte size of the entries in the message
}
// Inflights limits the number of MsgApp (represented by the largest index
// contained within) sent to followers but not yet acknowledged by them. Callers
// use Full() to check whether more messages can be sent, call Add() whenever
// they are sending a new append, and release "quota" via FreeLE() whenever an
// ack is received.
type Inflights struct {
// the starting index in the buffer
start int
count int // number of inflight messages in the buffer
bytes uint64 // number of inflight bytes
size int // the max number of inflight messages
maxBytes uint64 // the max total byte size of inflight messages
// buffer is a ring buffer containing info about all in-flight messages.
buffer []inflight
}
// NewInflights sets up an Inflights that allows up to size inflight messages,
// with the total byte size up to maxBytes. If maxBytes is 0 then there is no
// byte size limit. The maxBytes limit is soft, i.e. we accept a single message
// that brings it from size < maxBytes to size >= maxBytes.
func NewInflights(size int, maxBytes uint64) *Inflights {
return &Inflights{
size: size,
maxBytes: maxBytes,
}
}
// Clone returns an *Inflights that is identical to but shares no memory with
// the receiver.
func (in *Inflights) Clone() *Inflights {
ins := *in
ins.buffer = append([]inflight(nil), in.buffer...)
return &ins
}
// Add notifies the Inflights that a new message with the given index and byte
// size is being dispatched. Full() must be called prior to Add() to verify that
// there is room for one more message, and consecutive calls to Add() must
// provide a monotonic sequence of indexes.
func (in *Inflights) Add(index, bytes uint64) {
if in.Full() {
panic("cannot add into a Full inflights")
}
next := in.start + in.count
size := in.size
if next >= size {
next -= size
}
if next >= len(in.buffer) {
in.grow()
}
in.buffer[next] = inflight{index: index, bytes: bytes}
in.count++
in.bytes += bytes
}
// grow the inflight buffer by doubling up to inflights.size. We grow on demand
// instead of preallocating to inflights.size to handle systems which have
// thousands of Raft groups per process.
func (in *Inflights) grow() {
newSize := len(in.buffer) * 2
if newSize == 0 {
newSize = 1
} else if newSize > in.size {
newSize = in.size
}
newBuffer := make([]inflight, newSize)
copy(newBuffer, in.buffer)
in.buffer = newBuffer
}
// FreeLE frees the inflights smaller or equal to the given `to` flight.
func (in *Inflights) FreeLE(to uint64) {
if in.count == 0 || to < in.buffer[in.start].index {
// out of the left side of the window
return
}
idx := in.start
var i int
var bytes uint64
for i = 0; i < in.count; i++ {
if to < in.buffer[idx].index { // found the first large inflight
break
}
bytes += in.buffer[idx].bytes
// increase index and maybe rotate
size := in.size
if idx++; idx >= size {
idx -= size
}
}
// free i inflights and set new start index
in.count -= i
in.bytes -= bytes
in.start = idx
if in.count == 0 {
// inflights is empty, reset the start index so that we don't grow the
// buffer unnecessarily.
in.start = 0
}
}
// Full returns true if no more messages can be sent at the moment.
func (in *Inflights) Full() bool {
return in.count == in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes)
}
// Count returns the number of inflight messages.
func (in *Inflights) Count() int { return in.count }
// reset frees all inflights.
func (in *Inflights) reset() {
in.start = 0
in.count = 0
in.bytes = 0
}