Source code for p4p.util
import logging
_log = logging.getLogger(__name__)
from functools import partial
try:
from Queue import Queue, Full, Empty
except ImportError:
from queue import Queue, Full, Empty
from threading import Thread, Event
__all__ = [
'WorkQueue',
]
[docs]class WorkQueue(object):
"""A threaded work queue.
"""
_stopit = object()
def __init__(self, maxsize=5):
self._Q = Queue(maxsize=maxsize)
def push(self, callable):
self._Q.put_nowait(callable) # throws Queue.Full
def push_wait(self, callable):
self._Q.put(callable)
[docs] def interrupt(self):
"""Break one call to handle()
eg. Call N times to break N threads.
This call blocks if the queue is full.
"""
self._Q.put(self._stopit)
[docs] def handle(self):
"""Process queued work until interrupt() is called
"""
while True:
# TODO: Queue.get() (and anything using thread.allocate_lock
# ignores signals :( so timeout periodically to allow delivery
try:
callable = None # ensure no lingering references to past work while blocking
callable = self._Q.get(True, 1.0)
except Empty:
continue # retry on timeout
try:
if callable is self._stopit:
break
callable()
except:
_log.exception("Error from WorkQueue w/ %r", callable)
finally:
self._Q.task_done()
class ThreadedWorkQueue(WorkQueue):
def __init__(self, name=None, workers=1, daemon=False, **kws):
assert workers>=1, workers
WorkQueue.__init__(self, **kws)
self.name = name
self._daemon = daemon
self._T = [None]*workers
def __enter__(self):
self.start()
def __exit__(self, A,B,C):
self.stop()
def start(self):
for n in range(len(self._T)):
if self._T[n] is not None:
continue
T = self._T[n] = Thread(name='%s[%d]'%(self.name, n), target=self.handle)
T.daemon = self._daemon
T.start()
return self # allow chaining
def stop(self):
[self.interrupt() for T in self._T if T is not None]
[T.join() for T in self._T if T is not None]
self._T = [None]*len(self._T)
return self # allow chaining
def sync(self, timeout=None):
wait1 = [Event() for _n in range(len(self._T))]
wait2 = [Event() for _n in range(len(self._T))]
def syncX(wait1, wait2):
wait1.set()
wait2.wait()
[self.push_wait(partial(syncX, wait1[n], wait2[n])) for n in range(len(self._T))]
# wait for all workers to ready wait1 barrier
for W in wait1:
W.wait(timeout=timeout)
# allow workers to proceeed
for W in wait2:
W.set()
return self # allow chaining
# lazy create a default work queues
class _DefaultWorkQueue(object):
def __init__(self, workers=4): # TODO: configurable?
self.W = [None]*workers
self.n = 0
def __del__(self):
self.stop()
def __call__(self):
W = self.W[self.n]
if W is None:
# daemon=True otherwise the MainThread exit handler tries to join too early
W = self.W[self.n] = ThreadedWorkQueue(maxsize=0, daemon=True).start()
# sort of load balancing by giving different queues to each SharedPV
# but preserve ordering or callbacks as each SharedPV has only one queue
self.n = (self.n+1)%len(self.W)
return W
def sync(self):
[W.sync() for W in self.W if W is not None]
def stop(self):
[W.stop() for W in self.W if W is not None]
self.W = [None]*len(self.W)
_defaultWorkQueue = _DefaultWorkQueue()