| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 1 | // Copyright 2019 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package tracker |
| 16 | |
| 17 | // inflight describes an in-flight MsgApp message. |
| 18 | type inflight struct { |
| 19 | index uint64 // the index of the last entry inside the message |
| 20 | bytes uint64 // the total byte size of the entries in the message |
| 21 | } |
| 22 | |
| 23 | // Inflights limits the number of MsgApp (represented by the largest index |
| 24 | // contained within) sent to followers but not yet acknowledged by them. Callers |
| 25 | // use Full() to check whether more messages can be sent, call Add() whenever |
| 26 | // they are sending a new append, and release "quota" via FreeLE() whenever an |
| 27 | // ack is received. |
| 28 | type Inflights struct { |
| 29 | // the starting index in the buffer |
| 30 | start int |
| 31 | |
| 32 | count int // number of inflight messages in the buffer |
| 33 | bytes uint64 // number of inflight bytes |
| 34 | |
| 35 | size int // the max number of inflight messages |
| 36 | maxBytes uint64 // the max total byte size of inflight messages |
| 37 | |
| 38 | // buffer is a ring buffer containing info about all in-flight messages. |
| 39 | buffer []inflight |
| 40 | } |
| 41 | |
| 42 | // NewInflights sets up an Inflights that allows up to size inflight messages, |
| 43 | // with the total byte size up to maxBytes. If maxBytes is 0 then there is no |
| 44 | // byte size limit. The maxBytes limit is soft, i.e. we accept a single message |
| 45 | // that brings it from size < maxBytes to size >= maxBytes. |
| 46 | func NewInflights(size int, maxBytes uint64) *Inflights { |
| 47 | return &Inflights{ |
| 48 | size: size, |
| 49 | maxBytes: maxBytes, |
| 50 | } |
| 51 | } |
| 52 | |
| 53 | // Clone returns an *Inflights that is identical to but shares no memory with |
| 54 | // the receiver. |
| 55 | func (in *Inflights) Clone() *Inflights { |
| 56 | ins := *in |
| 57 | ins.buffer = append([]inflight(nil), in.buffer...) |
| 58 | return &ins |
| 59 | } |
| 60 | |
| 61 | // Add notifies the Inflights that a new message with the given index and byte |
| 62 | // size is being dispatched. Full() must be called prior to Add() to verify that |
| 63 | // there is room for one more message, and consecutive calls to Add() must |
| 64 | // provide a monotonic sequence of indexes. |
| 65 | func (in *Inflights) Add(index, bytes uint64) { |
| 66 | if in.Full() { |
| 67 | panic("cannot add into a Full inflights") |
| 68 | } |
| 69 | next := in.start + in.count |
| 70 | size := in.size |
| 71 | if next >= size { |
| 72 | next -= size |
| 73 | } |
| 74 | if next >= len(in.buffer) { |
| 75 | in.grow() |
| 76 | } |
| 77 | in.buffer[next] = inflight{index: index, bytes: bytes} |
| 78 | in.count++ |
| 79 | in.bytes += bytes |
| 80 | } |
| 81 | |
| 82 | // grow the inflight buffer by doubling up to inflights.size. We grow on demand |
| 83 | // instead of preallocating to inflights.size to handle systems which have |
| 84 | // thousands of Raft groups per process. |
| 85 | func (in *Inflights) grow() { |
| 86 | newSize := len(in.buffer) * 2 |
| 87 | if newSize == 0 { |
| 88 | newSize = 1 |
| 89 | } else if newSize > in.size { |
| 90 | newSize = in.size |
| 91 | } |
| 92 | newBuffer := make([]inflight, newSize) |
| 93 | copy(newBuffer, in.buffer) |
| 94 | in.buffer = newBuffer |
| 95 | } |
| 96 | |
| 97 | // FreeLE frees the inflights smaller or equal to the given `to` flight. |
| 98 | func (in *Inflights) FreeLE(to uint64) { |
| 99 | if in.count == 0 || to < in.buffer[in.start].index { |
| 100 | // out of the left side of the window |
| 101 | return |
| 102 | } |
| 103 | |
| 104 | idx := in.start |
| 105 | var i int |
| 106 | var bytes uint64 |
| 107 | for i = 0; i < in.count; i++ { |
| 108 | if to < in.buffer[idx].index { // found the first large inflight |
| 109 | break |
| 110 | } |
| 111 | bytes += in.buffer[idx].bytes |
| 112 | |
| 113 | // increase index and maybe rotate |
| 114 | size := in.size |
| 115 | if idx++; idx >= size { |
| 116 | idx -= size |
| 117 | } |
| 118 | } |
| 119 | // free i inflights and set new start index |
| 120 | in.count -= i |
| 121 | in.bytes -= bytes |
| 122 | in.start = idx |
| 123 | if in.count == 0 { |
| 124 | // inflights is empty, reset the start index so that we don't grow the |
| 125 | // buffer unnecessarily. |
| 126 | in.start = 0 |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | // Full returns true if no more messages can be sent at the moment. |
| 131 | func (in *Inflights) Full() bool { |
| 132 | return in.count == in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes) |
| 133 | } |
| 134 | |
| 135 | // Count returns the number of inflight messages. |
| 136 | func (in *Inflights) Count() int { return in.count } |
| 137 | |
| 138 | // reset frees all inflights. |
| 139 | func (in *Inflights) reset() { |
| 140 | in.start = 0 |
| 141 | in.count = 0 |
| 142 | in.bytes = 0 |
| 143 | } |