blob: cb091e542a3ec4d3294056ada66065000fd0d837 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001// Copyright 2019 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package tracker
16
17// inflight describes an in-flight MsgApp message.
18type inflight struct {
19 index uint64 // the index of the last entry inside the message
20 bytes uint64 // the total byte size of the entries in the message
21}
22
23// Inflights limits the number of MsgApp (represented by the largest index
24// contained within) sent to followers but not yet acknowledged by them. Callers
25// use Full() to check whether more messages can be sent, call Add() whenever
26// they are sending a new append, and release "quota" via FreeLE() whenever an
27// ack is received.
28type Inflights struct {
29 // the starting index in the buffer
30 start int
31
32 count int // number of inflight messages in the buffer
33 bytes uint64 // number of inflight bytes
34
35 size int // the max number of inflight messages
36 maxBytes uint64 // the max total byte size of inflight messages
37
38 // buffer is a ring buffer containing info about all in-flight messages.
39 buffer []inflight
40}
41
42// NewInflights sets up an Inflights that allows up to size inflight messages,
43// with the total byte size up to maxBytes. If maxBytes is 0 then there is no
44// byte size limit. The maxBytes limit is soft, i.e. we accept a single message
45// that brings it from size < maxBytes to size >= maxBytes.
46func NewInflights(size int, maxBytes uint64) *Inflights {
47 return &Inflights{
48 size: size,
49 maxBytes: maxBytes,
50 }
51}
52
53// Clone returns an *Inflights that is identical to but shares no memory with
54// the receiver.
55func (in *Inflights) Clone() *Inflights {
56 ins := *in
57 ins.buffer = append([]inflight(nil), in.buffer...)
58 return &ins
59}
60
61// Add notifies the Inflights that a new message with the given index and byte
62// size is being dispatched. Full() must be called prior to Add() to verify that
63// there is room for one more message, and consecutive calls to Add() must
64// provide a monotonic sequence of indexes.
65func (in *Inflights) Add(index, bytes uint64) {
66 if in.Full() {
67 panic("cannot add into a Full inflights")
68 }
69 next := in.start + in.count
70 size := in.size
71 if next >= size {
72 next -= size
73 }
74 if next >= len(in.buffer) {
75 in.grow()
76 }
77 in.buffer[next] = inflight{index: index, bytes: bytes}
78 in.count++
79 in.bytes += bytes
80}
81
82// grow the inflight buffer by doubling up to inflights.size. We grow on demand
83// instead of preallocating to inflights.size to handle systems which have
84// thousands of Raft groups per process.
85func (in *Inflights) grow() {
86 newSize := len(in.buffer) * 2
87 if newSize == 0 {
88 newSize = 1
89 } else if newSize > in.size {
90 newSize = in.size
91 }
92 newBuffer := make([]inflight, newSize)
93 copy(newBuffer, in.buffer)
94 in.buffer = newBuffer
95}
96
97// FreeLE frees the inflights smaller or equal to the given `to` flight.
98func (in *Inflights) FreeLE(to uint64) {
99 if in.count == 0 || to < in.buffer[in.start].index {
100 // out of the left side of the window
101 return
102 }
103
104 idx := in.start
105 var i int
106 var bytes uint64
107 for i = 0; i < in.count; i++ {
108 if to < in.buffer[idx].index { // found the first large inflight
109 break
110 }
111 bytes += in.buffer[idx].bytes
112
113 // increase index and maybe rotate
114 size := in.size
115 if idx++; idx >= size {
116 idx -= size
117 }
118 }
119 // free i inflights and set new start index
120 in.count -= i
121 in.bytes -= bytes
122 in.start = idx
123 if in.count == 0 {
124 // inflights is empty, reset the start index so that we don't grow the
125 // buffer unnecessarily.
126 in.start = 0
127 }
128}
129
130// Full returns true if no more messages can be sent at the moment.
131func (in *Inflights) Full() bool {
132 return in.count == in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes)
133}
134
135// Count returns the number of inflight messages.
136func (in *Inflights) Count() int { return in.count }
137
138// reset frees all inflights.
139func (in *Inflights) reset() {
140 in.start = 0
141 in.count = 0
142 in.bytes = 0
143}