blob: 06a243df91407bf0e9ba55a59d6207db281e9f0d [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package schedule
16
17import (
18 "context"
19 "sync"
20
21 "go.uber.org/zap"
22
23 "go.etcd.io/etcd/client/pkg/v3/verify"
24)
25
26type Job interface {
27 Name() string
28 Do(context.Context)
29}
30
31type job struct {
32 name string
33 do func(context.Context)
34}
35
36func (j job) Name() string {
37 return j.name
38}
39
40func (j job) Do(ctx context.Context) {
41 j.do(ctx)
42}
43
44func NewJob(name string, do func(ctx context.Context)) Job {
45 return job{
46 name: name,
47 do: do,
48 }
49}
50
51// Scheduler can schedule jobs.
52type Scheduler interface {
53 // Schedule asks the scheduler to schedule a job defined by the given func.
54 // Schedule to a stopped scheduler might panic.
55 Schedule(j Job)
56
57 // Pending returns number of pending jobs
58 Pending() int
59
60 // Scheduled returns the number of scheduled jobs (excluding pending jobs)
61 Scheduled() int
62
63 // Finished returns the number of finished jobs
64 Finished() int
65
66 // WaitFinish waits until at least n job are finished and all pending jobs are finished.
67 WaitFinish(n int)
68
69 // Stop stops the scheduler.
70 Stop()
71}
72
73type fifo struct {
74 mu sync.Mutex
75
76 resume chan struct{}
77 scheduled int
78 finished int
79 pendings []Job
80
81 ctx context.Context
82 cancel context.CancelFunc
83
84 finishCond *sync.Cond
85 donec chan struct{}
86 lg *zap.Logger
87}
88
89// NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO
90// order sequentially
91func NewFIFOScheduler(lg *zap.Logger) Scheduler {
92 verify.Assert(lg != nil, "the logger should not be nil")
93
94 f := &fifo{
95 resume: make(chan struct{}, 1),
96 donec: make(chan struct{}, 1),
97 lg: lg,
98 }
99 f.finishCond = sync.NewCond(&f.mu)
100 f.ctx, f.cancel = context.WithCancel(context.Background())
101 go f.run()
102 return f
103}
104
105// Schedule schedules a job that will be ran in FIFO order sequentially.
106func (f *fifo) Schedule(j Job) {
107 f.mu.Lock()
108 defer f.mu.Unlock()
109
110 if f.cancel == nil {
111 panic("schedule: schedule to stopped scheduler")
112 }
113
114 if len(f.pendings) == 0 {
115 select {
116 case f.resume <- struct{}{}:
117 default:
118 }
119 }
120 f.pendings = append(f.pendings, j)
121}
122
123func (f *fifo) Pending() int {
124 f.mu.Lock()
125 defer f.mu.Unlock()
126 return len(f.pendings)
127}
128
129func (f *fifo) Scheduled() int {
130 f.mu.Lock()
131 defer f.mu.Unlock()
132 return f.scheduled
133}
134
135func (f *fifo) Finished() int {
136 f.finishCond.L.Lock()
137 defer f.finishCond.L.Unlock()
138 return f.finished
139}
140
141func (f *fifo) WaitFinish(n int) {
142 f.finishCond.L.Lock()
143 for f.finished < n || len(f.pendings) != 0 {
144 f.finishCond.Wait()
145 }
146 f.finishCond.L.Unlock()
147}
148
149// Stop stops the scheduler and cancels all pending jobs.
150func (f *fifo) Stop() {
151 f.mu.Lock()
152 f.cancel()
153 f.cancel = nil
154 f.mu.Unlock()
155 <-f.donec
156}
157
158func (f *fifo) run() {
159 defer func() {
160 close(f.donec)
161 close(f.resume)
162 }()
163
164 for {
165 var todo Job
166 f.mu.Lock()
167 if len(f.pendings) != 0 {
168 f.scheduled++
169 todo = f.pendings[0]
170 }
171 f.mu.Unlock()
172 if todo == nil {
173 select {
174 case <-f.resume:
175 case <-f.ctx.Done():
176 f.mu.Lock()
177 pendings := f.pendings
178 f.pendings = nil
179 f.mu.Unlock()
180 // clean up pending jobs
181 for _, todo := range pendings {
182 f.executeJob(todo, true)
183 }
184 return
185 }
186 } else {
187 f.executeJob(todo, false)
188 }
189 }
190}
191
192func (f *fifo) executeJob(todo Job, updatedFinishedStats bool) {
193 defer func() {
194 if !updatedFinishedStats {
195 f.finishCond.L.Lock()
196 f.finished++
197 f.pendings = f.pendings[1:]
198 f.finishCond.Broadcast()
199 f.finishCond.L.Unlock()
200 }
201 if err := recover(); err != nil {
202 f.lg.Panic("execute job failed", zap.String("job", todo.Name()), zap.Any("panic", err))
203 }
204 }()
205
206 todo.Do(f.ctx)
207}