blob: 8126f39f112222adafe07fbd93e61ab125712de5 [file] [log] [blame]
Kent Hagermana05f4d42020-04-01 15:11:22 -04001/*
Joey Armstrong7a9af442024-01-03 19:26:36 -05002 * Copyright 2020-2024 Open Networking Foundation (ONF) and the ONF Contributors
Kent Hagermana05f4d42020-04-01 15:11:22 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package utils
18
19import (
20 "context"
21 "sync"
22)
23
24type request struct {
25 prev, next *request
26 notifyOnComplete chan<- struct{}
27}
28
29// RequestQueue represents a request processing queue where each request is processed to completion before another
30// request is given the green light to proceed.
31type RequestQueue struct {
Kent Hagermana05f4d42020-04-01 15:11:22 -040032 last, current *request
33 lastCompleteCh <-chan struct{}
Akash Reddy Kankanala929cc002025-04-08 15:05:21 +053034 mutex sync.Mutex
Kent Hagermana05f4d42020-04-01 15:11:22 -040035}
36
37// NewRequestQueue creates a new request queue
38func NewRequestQueue() *RequestQueue {
39 ch := make(chan struct{})
40 close(ch) // assume the "current" request is already complete
41 return &RequestQueue{lastCompleteCh: ch}
42}
43
44// WaitForGreenLight is invoked by a function processing a request to receive the green light before
45// proceeding. The caller can also provide a context with timeout. The timeout will be triggered if the wait is
46// too long (previous requests taking too long)
47func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
bseeniva4a2a53e2025-09-19 09:05:02 +053048 // if ctx is already canceled, no need to create a request
49 if err := ctx.Err(); err != nil {
50 return err
51 }
Kent Hagermana05f4d42020-04-01 15:11:22 -040052 // add ourselves to the end of the queue
53 rq.mutex.Lock()
54 waitingOn := rq.lastCompleteCh
55
56 ch := make(chan struct{})
57 rq.lastCompleteCh = ch
58 r := &request{notifyOnComplete: ch}
59
60 if rq.last != nil {
61 rq.last.next, r.prev = r, rq.last
62 }
63 rq.last = r
64 rq.mutex.Unlock()
65
66 // wait for our turn
67 select {
68 case <-ctx.Done():
69 // canceled, so cleanup
70 rq.mutex.Lock()
71 defer rq.mutex.Unlock()
72
73 select {
74 case <-waitingOn:
75 // chan has been closed, so the lock has been acquired
76 // context is canceled, so just release the lock immediately
77 rq.current = r
78 rq.releaseWithoutLock()
79 default:
80 // on abort, skip our position in the queue
bseeniva4a2a53e2025-09-19 09:05:02 +053081 if r.prev != nil {
82 r.prev.notifyOnComplete = r.notifyOnComplete
83 } else if rq.current != nil {
84 // On abort, if the previous pointer is nil, transfer notifyOnComplete to the current processing request
85 rq.current.notifyOnComplete = r.notifyOnComplete
86 }
87 // Remove ourselves from the queue
88 if r.next != nil && r.prev != nil { // If we are somewhere in the middle of the queue
Kent Hagermana05f4d42020-04-01 15:11:22 -040089 r.prev.next = r.next
90 r.next.prev = r.prev
bseeniva4a2a53e2025-09-19 09:05:02 +053091 } else if r.prev != nil { // If we are at the end of the queue
Kent Hagermana05f4d42020-04-01 15:11:22 -040092 rq.last = r.prev
93 r.prev.next = nil
bseeniva4a2a53e2025-09-19 09:05:02 +053094 } else if r.next != nil { // If we are at the start of the queue
95 r.next.prev = nil
96 } else { // If we are the only request in the queue
97 rq.last = nil
Kent Hagermana05f4d42020-04-01 15:11:22 -040098 }
bseeniva4a2a53e2025-09-19 09:05:02 +053099
100 // Clear references to help garbage collection
101 r.prev = nil
102 r.next = nil
Kent Hagermana05f4d42020-04-01 15:11:22 -0400103 }
104 return ctx.Err()
105
106 case <-waitingOn:
David K. Bainbridge5809b5b2020-08-27 00:07:41 +0000107 // Previous request has signaled that it is complete.
108 // This request now can proceed as the active
109 // request
David K. Bainbridge5809b5b2020-08-27 00:07:41 +0000110 rq.mutex.Lock()
111 defer rq.mutex.Unlock()
Kent Hagermana05f4d42020-04-01 15:11:22 -0400112 rq.current = r
bseeniva4a2a53e2025-09-19 09:05:02 +0530113
114 // Remove the processed request from the queue
115 if r.prev != nil {
116 r.prev.next = r.next
117 }
118 if r.next != nil {
119 r.next.prev = r.prev
120 }
121 if rq.last == r {
122 rq.last = r.prev
123 }
124
125 // Clear references to help garbage collection
126 r.prev = nil
127 r.next = nil
Kent Hagermana05f4d42020-04-01 15:11:22 -0400128 return nil
129 }
130}
131
132// RequestComplete must be invoked by a process when it completes processing the request. That process must have
133// invoked WaitForGreenLight() before.
134func (rq *RequestQueue) RequestComplete() {
135 rq.mutex.Lock()
136 defer rq.mutex.Unlock()
137
138 rq.releaseWithoutLock()
139}
140
141func (rq *RequestQueue) releaseWithoutLock() {
142 // Notify the next waiting request. This will panic if the lock is released more than once.
bseeniva4a2a53e2025-09-19 09:05:02 +0530143 if rq.current.notifyOnComplete != nil {
144 close(rq.current.notifyOnComplete)
145 rq.current.notifyOnComplete = nil
146 }
Kent Hagermana05f4d42020-04-01 15:11:22 -0400147
148 if rq.current.next != nil {
149 rq.current.next.prev = nil
150 }
bseeniva4a2a53e2025-09-19 09:05:02 +0530151
152 // Clear the current request reference to help garbage collection
153 rq.current = nil
Kent Hagermana05f4d42020-04-01 15:11:22 -0400154}