| // Copyright 2016 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package schedule |
| |
| import ( |
| "context" |
| "sync" |
| |
| "go.uber.org/zap" |
| |
| "go.etcd.io/etcd/client/pkg/v3/verify" |
| ) |
| |
| type Job interface { |
| Name() string |
| Do(context.Context) |
| } |
| |
| type job struct { |
| name string |
| do func(context.Context) |
| } |
| |
| func (j job) Name() string { |
| return j.name |
| } |
| |
| func (j job) Do(ctx context.Context) { |
| j.do(ctx) |
| } |
| |
| func NewJob(name string, do func(ctx context.Context)) Job { |
| return job{ |
| name: name, |
| do: do, |
| } |
| } |
| |
| // Scheduler can schedule jobs. |
| type Scheduler interface { |
| // Schedule asks the scheduler to schedule a job defined by the given func. |
| // Schedule to a stopped scheduler might panic. |
| Schedule(j Job) |
| |
| // Pending returns number of pending jobs |
| Pending() int |
| |
| // Scheduled returns the number of scheduled jobs (excluding pending jobs) |
| Scheduled() int |
| |
| // Finished returns the number of finished jobs |
| Finished() int |
| |
| // WaitFinish waits until at least n job are finished and all pending jobs are finished. |
| WaitFinish(n int) |
| |
| // Stop stops the scheduler. |
| Stop() |
| } |
| |
| type fifo struct { |
| mu sync.Mutex |
| |
| resume chan struct{} |
| scheduled int |
| finished int |
| pendings []Job |
| |
| ctx context.Context |
| cancel context.CancelFunc |
| |
| finishCond *sync.Cond |
| donec chan struct{} |
| lg *zap.Logger |
| } |
| |
| // NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO |
| // order sequentially |
| func NewFIFOScheduler(lg *zap.Logger) Scheduler { |
| verify.Assert(lg != nil, "the logger should not be nil") |
| |
| f := &fifo{ |
| resume: make(chan struct{}, 1), |
| donec: make(chan struct{}, 1), |
| lg: lg, |
| } |
| f.finishCond = sync.NewCond(&f.mu) |
| f.ctx, f.cancel = context.WithCancel(context.Background()) |
| go f.run() |
| return f |
| } |
| |
| // Schedule schedules a job that will be ran in FIFO order sequentially. |
| func (f *fifo) Schedule(j Job) { |
| f.mu.Lock() |
| defer f.mu.Unlock() |
| |
| if f.cancel == nil { |
| panic("schedule: schedule to stopped scheduler") |
| } |
| |
| if len(f.pendings) == 0 { |
| select { |
| case f.resume <- struct{}{}: |
| default: |
| } |
| } |
| f.pendings = append(f.pendings, j) |
| } |
| |
| func (f *fifo) Pending() int { |
| f.mu.Lock() |
| defer f.mu.Unlock() |
| return len(f.pendings) |
| } |
| |
| func (f *fifo) Scheduled() int { |
| f.mu.Lock() |
| defer f.mu.Unlock() |
| return f.scheduled |
| } |
| |
| func (f *fifo) Finished() int { |
| f.finishCond.L.Lock() |
| defer f.finishCond.L.Unlock() |
| return f.finished |
| } |
| |
| func (f *fifo) WaitFinish(n int) { |
| f.finishCond.L.Lock() |
| for f.finished < n || len(f.pendings) != 0 { |
| f.finishCond.Wait() |
| } |
| f.finishCond.L.Unlock() |
| } |
| |
| // Stop stops the scheduler and cancels all pending jobs. |
| func (f *fifo) Stop() { |
| f.mu.Lock() |
| f.cancel() |
| f.cancel = nil |
| f.mu.Unlock() |
| <-f.donec |
| } |
| |
| func (f *fifo) run() { |
| defer func() { |
| close(f.donec) |
| close(f.resume) |
| }() |
| |
| for { |
| var todo Job |
| f.mu.Lock() |
| if len(f.pendings) != 0 { |
| f.scheduled++ |
| todo = f.pendings[0] |
| } |
| f.mu.Unlock() |
| if todo == nil { |
| select { |
| case <-f.resume: |
| case <-f.ctx.Done(): |
| f.mu.Lock() |
| pendings := f.pendings |
| f.pendings = nil |
| f.mu.Unlock() |
| // clean up pending jobs |
| for _, todo := range pendings { |
| f.executeJob(todo, true) |
| } |
| return |
| } |
| } else { |
| f.executeJob(todo, false) |
| } |
| } |
| } |
| |
| func (f *fifo) executeJob(todo Job, updatedFinishedStats bool) { |
| defer func() { |
| if !updatedFinishedStats { |
| f.finishCond.L.Lock() |
| f.finished++ |
| f.pendings = f.pendings[1:] |
| f.finishCond.Broadcast() |
| f.finishCond.L.Unlock() |
| } |
| if err := recover(); err != nil { |
| f.lg.Panic("execute job failed", zap.String("job", todo.Name()), zap.Any("panic", err)) |
| } |
| }() |
| |
| todo.Do(f.ctx) |
| } |