pva2pva  1.4.1
 All Classes Functions Variables Pages
moncache.cpp
1 
2 #include <epicsAtomic.h>
3 #include <errlog.h>
4 
5 #include <epicsMutex.h>
6 #include <epicsTimer.h>
7 
8 #include <pv/pvAccess.h>
9 
10 #define epicsExportSharedSymbols
11 #include "helper.h"
12 #include "pva2pva.h"
13 #include "chancache.h"
14 
15 namespace pva = epics::pvAccess;
16 namespace pvd = epics::pvData;
17 
18 size_t MonitorCacheEntry::num_instances;
19 size_t MonitorUser::num_instances;
20 
21 namespace {
22 // fetch scalar value or default
23 template<typename T>
24 T getS(const pvd::PVStructurePtr& s, const char* name, T dft)
25 {
26  try{
27  return s->getSubFieldT<pvd::PVScalar>(name)->getAs<T>();
28  }catch(std::runtime_error& e){
29  return dft;
30  }
31 }
32 }
33 
34 MonitorCacheEntry::MonitorCacheEntry(ChannelCacheEntry *ent, const pvd::PVStructure::shared_pointer& pvr)
35  :chan(ent)
36  ,bufferSize(getS<pvd::uint32>(pvr, "record._options.queueSize", 2)) // should be same default as pvAccess, but not required
37  ,havedata(false)
38  ,done(false)
39  ,nwakeups(0)
40  ,nevents(0)
41 {
42  epicsAtomicIncrSizeT(&num_instances);
43 }
44 
45 MonitorCacheEntry::~MonitorCacheEntry()
46 {
47  pvd::Monitor::shared_pointer M;
48  M.swap(mon);
49  if(M) {
50  M->destroy();
51  }
52  epicsAtomicDecrSizeT(&num_instances);
53  const_cast<ChannelCacheEntry*&>(chan) = NULL; // spoil to fault use after free
54 }
55 
56 void
57 MonitorCacheEntry::monitorConnect(pvd::Status const & status,
58  pvd::MonitorPtr const & monitor,
59  pvd::StructureConstPtr const & structure)
60 {
61  interested_t::vector_type tonotify;
62  {
63  Guard G(mutex());
64  if(typedesc) {
65  // we shouldn't have to deal with monitor type change since we
66  // destroy() Monitors on Channel disconnect.
67  std::cerr<<"monitorConnect() w/ new type. Monitor has outlived it's connection.\n";
68  monitor->stop();
69  //TODO: unlisten()
70  return;
71  }
72  typedesc = structure;
73 
74  if(status.isSuccess()) {
75  startresult = monitor->start();
76  } else {
77  startresult = status;
78  }
79 
80  if(startresult.isSuccess()) {
81  lastelem.reset(new pvd::MonitorElement(pvd::getPVDataCreate()->createPVStructure(structure)));
82  }
83 
84  // set typedesc and startresult for futured MonitorUsers
85  // and copy snapshot of already interested MonitorUsers
86  tonotify = interested.lock_vector();
87  }
88 
89  if(!startresult.isSuccess())
90  std::cout<<"upstream monitor start() fails\n";
91 
92  shared_pointer self(weakref); // keeps us alive all MonitorUsers are destroy()ed
93 
94  for(interested_t::vector_type::const_iterator it = tonotify.begin(),
95  end = tonotify.end(); it!=end; ++it)
96  {
97  pvd::MonitorRequester::shared_pointer req((*it)->req);
98  if(req) {
99  req->monitorConnect(startresult, *it, structure);
100  }
101  }
102 }
103 
104 // notificaton from upstream client that its monitor queue has
105 // become not empty (transition from empty to not empty)
106 // will not be called again unless we completely empty the upstream queue.
107 // If we don't then it is our responsibly to schedule more poll().
108 // Note: this probably means this is a PVA client RX thread.
109 void
110 MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor)
111 {
112  /* PVA is being tricky, the Monitor* passed to monitorConnect()
113  * isn't the same one we see here!
114  * The original was a ChannelMonitorImpl, we now see a MonitorStrategyQueue
115  * owned by the original, which delegates deserialization and accumulation
116  * of deltas into complete events for us.
117  * However, we don't want to keep the MonitorStrategyQueue as it's
118  * destroy() method is a no-op!
119  */
120 
121  epicsAtomicIncrSizeT(&nwakeups);
122 
123  shared_pointer self(weakref); // keeps us alive in case all MonitorUsers are destroy()ed
124 
125  pva::MonitorElementPtr update;
126 
127  typedef std::vector<MonitorUser::shared_pointer> dsnotify_t;
128  dsnotify_t dsnotify;
129 
130  {
131  Guard G(mutex()); // MCE and MU guarded by the same mutex
132  if(!havedata)
133  havedata = true;
134 
135  //TODO: flow control, if all MU buffers are full, break before poll()==NULL
136  while((update=monitor->poll()))
137  {
138  epicsAtomicIncrSizeT(&nevents);
139 
140  lastelem->pvStructurePtr->copyUnchecked(*update->pvStructurePtr,
141  *update->changedBitSet);
142  *lastelem->changedBitSet = *update->changedBitSet;
143  *lastelem->overrunBitSet = *update->overrunBitSet;
144  monitor->release(update);
145  update.reset();
146 
147  interested_t::iterator IIT(interested); // recursively locks interested.mutex() (assumes this->mutex() is interestd.mutex())
148  for(interested_t::value_pointer pusr = IIT.next(); pusr; pusr = IIT.next())
149  {
150  MonitorUser *usr = pusr.get();
151 
152  {
153  Guard G(usr->mutex());
154  if(usr->initial)
155  continue; // no start() yet
156  // TODO: track overflow when !running (after stop())?
157  if(!usr->running || usr->empty.empty()) {
158  usr->inoverflow = true;
159 
160  /* overrun |= lastelem->overrun // upstream overflows
161  * overrun |= changed & lastelem->changed // downstream overflows
162  * changed |= lastelem->changed // accumulate changes
163  */
164 
165  *usr->overflowElement->overrunBitSet |= *lastelem->overrunBitSet;
166  usr->overflowElement->overrunBitSet->or_and(*usr->overflowElement->changedBitSet,
167  *lastelem->changedBitSet);
168  *usr->overflowElement->changedBitSet |= *lastelem->changedBitSet;
169 
170  usr->overflowElement->pvStructurePtr->copyUnchecked(*lastelem->pvStructurePtr,
171  *lastelem->changedBitSet);
172 
173  epicsAtomicIncrSizeT(&usr->ndropped);
174  continue;
175  }
176  // we only come out of overflow when downstream release()s an element to us
177  // empty.empty() does not imply inoverflow,
178  // however inoverflow does imply empty.empty()
179  assert(!usr->inoverflow);
180 
181  if(usr->filled.empty())
182  dsnotify.push_back(pusr);
183 
184  pvd::MonitorElementPtr elem(usr->empty.front());
185 
186  *elem->overrunBitSet = *lastelem->overrunBitSet;
187  *elem->changedBitSet = *lastelem->changedBitSet;
188  // Note: can't use changed mask to optimize this copy since we don't know
189  // the state of the free element
190  elem->pvStructurePtr->copyUnchecked(*lastelem->pvStructurePtr);
191 
192  usr->filled.push_back(elem);
193  usr->empty.pop_front();
194 
195  epicsAtomicIncrSizeT(&usr->nevents);
196  }
197  }
198  }
199  }
200 
201  // unlock here, race w/ stop(), unlisten()?
202  //TODO: notify from worker thread
203 
204  FOREACH(dsnotify_t::iterator, it,end,dsnotify) {
205  MonitorUser *usr = (*it).get();
206  pvd::MonitorRequester::shared_pointer req(usr->req);
207  epicsAtomicIncrSizeT(&usr->nwakeups);
208  req->monitorEvent(*it); // notify when first item added to empty queue, may call poll(), release(), and others
209  }
210 }
211 
212 // notificaton from upstream client that no more monitor updates will come, ever
213 void
214 MonitorCacheEntry::unlisten(pvd::MonitorPtr const & monitor)
215 {
216  pvd::Monitor::shared_pointer M;
217  interested_t::vector_type tonotify;
218  {
219  Guard G(mutex());
220  M.swap(mon);
221  tonotify = interested.lock_vector();
222  // assume that upstream won't call monitorEvent() again
223 
224  // cause future downstream start() to error
225  startresult = pvd::Status(pvd::Status::STATUSTYPE_ERROR, "upstream unlisten()");
226  }
227  if(M) {
228  M->destroy();
229  }
230  FOREACH(interested_t::vector_type::iterator, it, end, tonotify) {
231  MonitorUser *usr = it->get();
232  pvd::MonitorRequester::shared_pointer req(usr->req);
233  if(usr->inuse.empty()) // TODO: what about stopped?
234  req->unlisten(*it);
235  }
236 }
237 
238 std::string
239 MonitorCacheEntry::getRequesterName()
240 {
241  return "MonitorCacheEntry";
242 }
243 
244 MonitorUser::MonitorUser(const MonitorCacheEntry::shared_pointer &e)
245  :entry(e)
246  ,initial(true)
247  ,running(false)
248  ,inoverflow(false)
249  ,nevents(0)
250  ,ndropped(0)
251 {
252  epicsAtomicIncrSizeT(&num_instances);
253 }
254 
255 MonitorUser::~MonitorUser()
256 {
257  epicsAtomicDecrSizeT(&num_instances);
258 }
259 
260 // downstream server closes monitor
261 void
262 MonitorUser::destroy()
263 {
264  {
265  Guard G(mutex());
266  running = false;
267  }
268 }
269 
270 pvd::Status
271 MonitorUser::start()
272 {
273  pvd::MonitorRequester::shared_pointer req(this->req.lock());
274  if(!req)
275  return pvd::Status(pvd::Status::STATUSTYPE_FATAL, "already dead");
276 
277  bool doEvt = false;
278  {
279  Guard G(entry->mutex()); // MCE and MU have share a lock
280 
281  if(!entry->startresult.isSuccess())
282  return entry->startresult;
283 
284  pvd::PVStructurePtr lval;
285  if(entry->havedata)
286  lval = entry->lastelem->pvStructurePtr;
287  pvd::StructureConstPtr typedesc(entry->typedesc);
288 
289  if(initial) {
290  initial = false;
291 
292  empty.resize(entry->bufferSize);
293  pvd::PVDataCreatePtr fact(pvd::getPVDataCreate());
294  for(unsigned i=0; i<empty.size(); i++) {
295  empty[i].reset(new pvd::MonitorElement(fact->createPVStructure(typedesc)));
296  }
297 
298  // extra element to accumulate updates during overflow
299  overflowElement.reset(new pvd::MonitorElement(fact->createPVStructure(typedesc)));
300  }
301 
302  doEvt = filled.empty();
303 
304  if(lval && !empty.empty()) {
305  //already running, notify of initial element
306 
307  const pva::MonitorElementPtr& elem(empty.front());
308  elem->pvStructurePtr->copy(*lval);
309  elem->changedBitSet->set(0); // indicate all changed
310  elem->overrunBitSet->clear();
311  filled.push_back(elem);
312  empty.pop_front();
313  }
314 
315  doEvt &= !filled.empty();
316  running = true;
317  }
318  if(doEvt)
319  req->monitorEvent(shared_pointer(weakref)); // TODO: worker thread?
320  return pvd::Status();
321 }
322 
323 pvd::Status
324 MonitorUser::stop()
325 {
326  Guard G(mutex());
327  running = false;
328  return pvd::Status::Ok;
329 }
330 
331 pva::MonitorElementPtr
332 MonitorUser::poll()
333 {
334  Guard G(mutex());
335  pva::MonitorElementPtr ret;
336  if(!filled.empty()) {
337  ret = filled.front();
338  inuse.insert(ret); // track which ones are out for client use
339  filled.pop_front();
340  //TODO: track lost buffers w/ wrapped shared_ptr?
341  }
342  return ret;
343 }
344 
345 void
346 MonitorUser::release(pva::MonitorElementPtr const & monitorElement)
347 {
348  Guard G(mutex());
349  //TODO: ifdef DEBUG? (only track inuse when debugging?)
350  std::set<epics::pvData::MonitorElementPtr>::iterator it = inuse.find(monitorElement);
351  if(it!=inuse.end()) {
352  inuse.erase(it);
353 
354  if(inoverflow) { // leaving overflow condition
355 
356  // to avoid copy, enqueue the current overflowElement
357  // and replace it with the element being release()d
358 
359  filled.push_back(overflowElement);
360  overflowElement = monitorElement;
361  overflowElement->changedBitSet->clear();
362  overflowElement->overrunBitSet->clear();
363 
364  inoverflow = false;
365  } else {
366  // push_back empty element
367  empty.push_back(monitorElement);
368  }
369  } else {
370  // oh no, we've been given an element which we didn't give to downstream
371  //TODO: check empty and filled lists to see if this is one of ours, of from somewhere else
372  throw std::invalid_argument("Can't release MonitorElement not in use");
373  }
374  // TODO: pipeline window update?
375 }
376 
377 std::string
378 MonitorUser::getRequesterName()
379 {
380  return "MonitorCacheEntry";
381 }
epics::pvData::MonitorElement::shared_pointer lastelem
Definition: chancache.h:46
Definition: chancache.h:103
vector_type lock_vector() const
Definition: weakset.h:268