pva2pva  1.4.1
 All Classes Functions Variables Pages
tpool.cpp
1 
2 #include <typeinfo>
3 #include <stdexcept>
4 
5 #include <epicsEvent.h>
6 #include <epicsGuard.h>
7 #include <epicsThread.h>
8 #include <errlog.h>
9 
10 #include <pv/sharedPtr.h>
11 
12 #include "helper.h"
13 #include "tpool.h"
14 
15 typedef epicsGuard<epicsMutex> Guard;
16 typedef epicsGuardRelease<epicsMutex> UnGuard;
17 
18 WorkQueue::WorkQueue(const std::string& name)
19  :name(name)
20  ,state(Idle)
21 {}
22 
23 WorkQueue::~WorkQueue() { close(); }
24 
25 void WorkQueue::start(unsigned nworkers, unsigned prio)
26 {
27  Guard G(mutex);
28 
29  if(state!=Idle)
30  throw std::logic_error("Already started");
31 
32  try {
33  state = Active;
34 
35  for(unsigned i=0; i<nworkers; i++) {
36  p2p::auto_ptr<epicsThread> worker(new epicsThread(*this, name.c_str(),
37  epicsThreadGetStackSize(epicsThreadStackSmall),
38  prio));
39 
40  worker->start();
41 
42  workers.push_back(worker.get());
43  worker.release();
44  }
45  }catch(...){
46  UnGuard U(G); // unlock as close() blocks to join any workers which were started
47  close();
48  throw;
49  }
50 }
51 
52 void WorkQueue::close()
53 {
54  workers_t temp;
55 
56  {
57  Guard G(mutex);
58  if(state!=Active)
59  return;
60 
61  temp.swap(workers);
62  state = Stopping;
63  }
64 
65  wakeup.signal();
66 
67  for(workers_t::iterator it(temp.begin()), end(temp.end()); it!=end; ++it)
68  {
69  (*it)->exitWait();
70  delete *it;
71  }
72 
73  {
74  Guard G(mutex);
75  state = Idle;
76  }
77 }
78 
79 void WorkQueue::add(const value_type& work)
80 {
81  bool empty;
82 
83  {
84  Guard G(mutex);
85  if(state!=Active)
86  return;
87 
88  empty = queue.empty();
89 
90  queue.push_back(work);
91  }
92 
93  if(empty) {
94  wakeup.signal();
95  }
96 }
97 
98 void WorkQueue::run()
99 {
100  Guard G(mutex);
101 
102  std::tr1::shared_ptr<epicsThreadRunable> work;
103 
104  while(state==Active) {
105 
106  if(!queue.empty()) {
107  work = queue.front().lock();
108  queue.pop_front();
109  }
110 
111  bool last = queue.empty();
112 
113  {
114  UnGuard U(G);
115 
116  if(work) {
117  try {
118  work->run();
119  work.reset();
120  }catch(std::exception& e){
121  errlogPrintf("%s Unhandled exception from %s: %s\n",
122  name.c_str(), typeid(work.get()).name(), e.what());
123  work.reset();
124  }
125  }
126 
127  if(last) {
128  wakeup.wait();
129  }
130  }
131  }
132 
133  // pass along the close() signal to next worker
134  wakeup.signal();
135 }