| Abhay Kumar | a61c522 | 2025-11-10 07:32:50 +0000 | [diff] [blame^] | 1 | /* |
| 2 | * |
| 3 | * Copyright 2023 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | // Package idle contains a component for managing idleness (entering and exiting) |
| 20 | // based on RPC activity. |
| 21 | package idle |
| 22 | |
| 23 | import ( |
| 24 | "fmt" |
| 25 | "math" |
| 26 | "sync" |
| 27 | "sync/atomic" |
| 28 | "time" |
| 29 | ) |
| 30 | |
| 31 | // For overriding in unit tests. |
| 32 | var timeAfterFunc = func(d time.Duration, f func()) *time.Timer { |
| 33 | return time.AfterFunc(d, f) |
| 34 | } |
| 35 | |
| 36 | // Enforcer is the functionality provided by grpc.ClientConn to enter |
| 37 | // and exit from idle mode. |
| 38 | type Enforcer interface { |
| 39 | ExitIdleMode() error |
| 40 | EnterIdleMode() |
| 41 | } |
| 42 | |
| 43 | // Manager implements idleness detection and calls the configured Enforcer to |
| 44 | // enter/exit idle mode when appropriate. Must be created by NewManager. |
| 45 | type Manager struct { |
| 46 | // State accessed atomically. |
| 47 | lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed. |
| 48 | activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there. |
| 49 | activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback. |
| 50 | closed int32 // Boolean; True when the manager is closed. |
| 51 | |
| 52 | // Can be accessed without atomics or mutex since these are set at creation |
| 53 | // time and read-only after that. |
| 54 | enforcer Enforcer // Functionality provided by grpc.ClientConn. |
| 55 | timeout time.Duration |
| 56 | |
| 57 | // idleMu is used to guarantee mutual exclusion in two scenarios: |
| 58 | // - Opposing intentions: |
| 59 | // - a: Idle timeout has fired and handleIdleTimeout() is trying to put |
| 60 | // the channel in idle mode because the channel has been inactive. |
| 61 | // - b: At the same time an RPC is made on the channel, and OnCallBegin() |
| 62 | // is trying to prevent the channel from going idle. |
| 63 | // - Competing intentions: |
| 64 | // - The channel is in idle mode and there are multiple RPCs starting at |
| 65 | // the same time, all trying to move the channel out of idle. Only one |
| 66 | // of them should succeed in doing so, while the other RPCs should |
| 67 | // piggyback on the first one and be successfully handled. |
| 68 | idleMu sync.RWMutex |
| 69 | actuallyIdle bool |
| 70 | timer *time.Timer |
| 71 | } |
| 72 | |
| 73 | // NewManager creates a new idleness manager implementation for the |
| 74 | // given idle timeout. It begins in idle mode. |
| 75 | func NewManager(enforcer Enforcer, timeout time.Duration) *Manager { |
| 76 | return &Manager{ |
| 77 | enforcer: enforcer, |
| 78 | timeout: timeout, |
| 79 | actuallyIdle: true, |
| 80 | activeCallsCount: -math.MaxInt32, |
| 81 | } |
| 82 | } |
| 83 | |
| 84 | // resetIdleTimerLocked resets the idle timer to the given duration. Called |
| 85 | // when exiting idle mode or when the timer fires and we need to reset it. |
| 86 | func (m *Manager) resetIdleTimerLocked(d time.Duration) { |
| 87 | if m.isClosed() || m.timeout == 0 || m.actuallyIdle { |
| 88 | return |
| 89 | } |
| 90 | |
| 91 | // It is safe to ignore the return value from Reset() because this method is |
| 92 | // only ever called from the timer callback or when exiting idle mode. |
| 93 | if m.timer != nil { |
| 94 | m.timer.Stop() |
| 95 | } |
| 96 | m.timer = timeAfterFunc(d, m.handleIdleTimeout) |
| 97 | } |
| 98 | |
| 99 | func (m *Manager) resetIdleTimer(d time.Duration) { |
| 100 | m.idleMu.Lock() |
| 101 | defer m.idleMu.Unlock() |
| 102 | m.resetIdleTimerLocked(d) |
| 103 | } |
| 104 | |
| 105 | // handleIdleTimeout is the timer callback that is invoked upon expiry of the |
| 106 | // configured idle timeout. The channel is considered inactive if there are no |
| 107 | // ongoing calls and no RPC activity since the last time the timer fired. |
| 108 | func (m *Manager) handleIdleTimeout() { |
| 109 | if m.isClosed() { |
| 110 | return |
| 111 | } |
| 112 | |
| 113 | if atomic.LoadInt32(&m.activeCallsCount) > 0 { |
| 114 | m.resetIdleTimer(m.timeout) |
| 115 | return |
| 116 | } |
| 117 | |
| 118 | // There has been activity on the channel since we last got here. Reset the |
| 119 | // timer and return. |
| 120 | if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 { |
| 121 | // Set the timer to fire after a duration of idle timeout, calculated |
| 122 | // from the time the most recent RPC completed. |
| 123 | atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0) |
| 124 | m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime)-time.Now().UnixNano()) + m.timeout) |
| 125 | return |
| 126 | } |
| 127 | |
| 128 | // Now that we've checked that there has been no activity, attempt to enter |
| 129 | // idle mode, which is very likely to succeed. |
| 130 | if m.tryEnterIdleMode() { |
| 131 | // Successfully entered idle mode. No timer needed until we exit idle. |
| 132 | return |
| 133 | } |
| 134 | |
| 135 | // Failed to enter idle mode due to a concurrent RPC that kept the channel |
| 136 | // active, or because of an error from the channel. Undo the attempt to |
| 137 | // enter idle, and reset the timer to try again later. |
| 138 | m.resetIdleTimer(m.timeout) |
| 139 | } |
| 140 | |
| 141 | // tryEnterIdleMode instructs the channel to enter idle mode. But before |
| 142 | // that, it performs a last minute check to ensure that no new RPC has come in, |
| 143 | // making the channel active. |
| 144 | // |
| 145 | // Return value indicates whether or not the channel moved to idle mode. |
| 146 | // |
| 147 | // Holds idleMu which ensures mutual exclusion with exitIdleMode. |
| 148 | func (m *Manager) tryEnterIdleMode() bool { |
| 149 | // Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() |
| 150 | // that the channel is either in idle mode or is trying to get there. |
| 151 | if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { |
| 152 | // This CAS operation can fail if an RPC started after we checked for |
| 153 | // activity in the timer handler, or one was ongoing from before the |
| 154 | // last time the timer fired, or if a test is attempting to enter idle |
| 155 | // mode without checking. In all cases, abort going into idle mode. |
| 156 | return false |
| 157 | } |
| 158 | // N.B. if we fail to enter idle mode after this, we must re-add |
| 159 | // math.MaxInt32 to m.activeCallsCount. |
| 160 | |
| 161 | m.idleMu.Lock() |
| 162 | defer m.idleMu.Unlock() |
| 163 | |
| 164 | if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 { |
| 165 | // We raced and lost to a new RPC. Very rare, but stop entering idle. |
| 166 | atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) |
| 167 | return false |
| 168 | } |
| 169 | if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 { |
| 170 | // A very short RPC could have come in (and also finished) after we |
| 171 | // checked for calls count and activity in handleIdleTimeout(), but |
| 172 | // before the CAS operation. So, we need to check for activity again. |
| 173 | atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) |
| 174 | return false |
| 175 | } |
| 176 | |
| 177 | // No new RPCs have come in since we set the active calls count value to |
| 178 | // -math.MaxInt32. And since we have the lock, it is safe to enter idle mode |
| 179 | // unconditionally now. |
| 180 | m.enforcer.EnterIdleMode() |
| 181 | m.actuallyIdle = true |
| 182 | return true |
| 183 | } |
| 184 | |
| 185 | // EnterIdleModeForTesting instructs the channel to enter idle mode. |
| 186 | func (m *Manager) EnterIdleModeForTesting() { |
| 187 | m.tryEnterIdleMode() |
| 188 | } |
| 189 | |
| 190 | // OnCallBegin is invoked at the start of every RPC. |
| 191 | func (m *Manager) OnCallBegin() error { |
| 192 | if m.isClosed() { |
| 193 | return nil |
| 194 | } |
| 195 | |
| 196 | if atomic.AddInt32(&m.activeCallsCount, 1) > 0 { |
| 197 | // Channel is not idle now. Set the activity bit and allow the call. |
| 198 | atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1) |
| 199 | return nil |
| 200 | } |
| 201 | |
| 202 | // Channel is either in idle mode or is in the process of moving to idle |
| 203 | // mode. Attempt to exit idle mode to allow this RPC. |
| 204 | if err := m.ExitIdleMode(); err != nil { |
| 205 | // Undo the increment to calls count, and return an error causing the |
| 206 | // RPC to fail. |
| 207 | atomic.AddInt32(&m.activeCallsCount, -1) |
| 208 | return err |
| 209 | } |
| 210 | |
| 211 | atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1) |
| 212 | return nil |
| 213 | } |
| 214 | |
| 215 | // ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's |
| 216 | // internal state. |
| 217 | func (m *Manager) ExitIdleMode() error { |
| 218 | // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. |
| 219 | m.idleMu.Lock() |
| 220 | defer m.idleMu.Unlock() |
| 221 | |
| 222 | if m.isClosed() || !m.actuallyIdle { |
| 223 | // This can happen in three scenarios: |
| 224 | // - handleIdleTimeout() set the calls count to -math.MaxInt32 and called |
| 225 | // tryEnterIdleMode(). But before the latter could grab the lock, an RPC |
| 226 | // came in and OnCallBegin() noticed that the calls count is negative. |
| 227 | // - Channel is in idle mode, and multiple new RPCs come in at the same |
| 228 | // time, all of them notice a negative calls count in OnCallBegin and get |
| 229 | // here. The first one to get the lock would get the channel to exit idle. |
| 230 | // - Channel is not in idle mode, and the user calls Connect which calls |
| 231 | // m.ExitIdleMode. |
| 232 | // |
| 233 | // In any case, there is nothing to do here. |
| 234 | return nil |
| 235 | } |
| 236 | |
| 237 | if err := m.enforcer.ExitIdleMode(); err != nil { |
| 238 | return fmt.Errorf("failed to exit idle mode: %w", err) |
| 239 | } |
| 240 | |
| 241 | // Undo the idle entry process. This also respects any new RPC attempts. |
| 242 | atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) |
| 243 | m.actuallyIdle = false |
| 244 | |
| 245 | // Start a new timer to fire after the configured idle timeout. |
| 246 | m.resetIdleTimerLocked(m.timeout) |
| 247 | return nil |
| 248 | } |
| 249 | |
| 250 | // OnCallEnd is invoked at the end of every RPC. |
| 251 | func (m *Manager) OnCallEnd() { |
| 252 | if m.isClosed() { |
| 253 | return |
| 254 | } |
| 255 | |
| 256 | // Record the time at which the most recent call finished. |
| 257 | atomic.StoreInt64(&m.lastCallEndTime, time.Now().UnixNano()) |
| 258 | |
| 259 | // Decrement the active calls count. This count can temporarily go negative |
| 260 | // when the timer callback is in the process of moving the channel to idle |
| 261 | // mode, but one or more RPCs come in and complete before the timer callback |
| 262 | // can get done with the process of moving to idle mode. |
| 263 | atomic.AddInt32(&m.activeCallsCount, -1) |
| 264 | } |
| 265 | |
| 266 | func (m *Manager) isClosed() bool { |
| 267 | return atomic.LoadInt32(&m.closed) == 1 |
| 268 | } |
| 269 | |
| 270 | // Close stops the timer associated with the Manager, if it exists. |
| 271 | func (m *Manager) Close() { |
| 272 | atomic.StoreInt32(&m.closed, 1) |
| 273 | |
| 274 | m.idleMu.Lock() |
| 275 | if m.timer != nil { |
| 276 | m.timer.Stop() |
| 277 | m.timer = nil |
| 278 | } |
| 279 | m.idleMu.Unlock() |
| 280 | } |