blob: 7c2b1dc108c50b6965f4cb3f71920a1012bcfab9 [file] [log] [blame]
Chetan Gaonkercb122cc2016-05-10 10:58:34 -07001#!/usr/bin/env python
Chetan Gaonkercfcce782016-05-10 10:10:42 -07002#
3# Copyright 2016-present Ciena Corporation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
Chetan Gaonkercd86bdd2016-03-17 00:08:12 -070017import threading
18import Queue
19
20class PoolThread(threading.Thread):
21
22 def __init__(self, requests_queue, wait_timeout, daemon, **kwds):
23 threading.Thread.__init__(self, **kwds)
24 self.daemon = daemon
25 self._queue = requests_queue
26 self._wait_timeout = wait_timeout
27 self._finished = threading.Event()
28 self.start()
29
30 def run(self):
31 while True:
32 if(self._finished.isSet()):
33 break
34
35 try:
36 work = self._queue.get(block=True, timeout=self._wait_timeout)
37 except Queue.Empty:
38 continue
39 else:
40 try:
41 work.__call__()
42 finally:
43 self._queue.task_done()
44
45
46
47class ThreadPool:
48
49 def __init__(self, pool_size, daemon=False, queue_size=0, wait_timeout=5):
50 """Set up the thread pool and create pool_size threads
51 """
52 self._queue = Queue.Queue(queue_size)
53 self._daemon = daemon
54 self._threads = []
55 self._pool_size = pool_size
56 self._wait_timeout = wait_timeout
57 self.createThreads()
58
59
60 def addTask(self, callableObject):
61 if (callable(callableObject)):
62 self._queue.put(callableObject, block=True)
63
64 def cleanUpThreads(self):
65 self._queue.join()
66
67 for t in self._threads:
68 t._finished.set()
69
70
71 def createThreads(self):
72 for i in range(self._pool_size):
73 self._threads.append(PoolThread(self._queue, self._wait_timeout, self._daemon))
74
75
76class CallObject:
77 def __init__(self, v = 0):
78 self.v = v
79 def callCb(self):
80 print 'Inside callback for %d' %self.v
81
82if __name__ == '__main__':
83 import multiprocessing
84 callList = []
85 cpu_count = multiprocessing.cpu_count()
86 for i in xrange(cpu_count * 2):
87 callList.append(CallObject(i))
88 tp = ThreadPool(cpu_count * 2, queue_size=1, wait_timeout=1)
89 for i in range(40):
90 callObject = callList[i% (cpu_count*2)]
91 f = callObject.callCb
92 tp.addTask(f)
93
94 tp.cleanUpThreads()
95
96