pva2pva  1.4.1
 All Classes Functions Variables Pages
utilities.cpp
1 
2 #include <epicsAtomic.h>
3 #include <errlog.h>
4 #include <epicsEvent.h>
5 #include <epicsUnitTest.h>
6 #include <dbUnitTest.h>
7 
8 #include <pv/pvUnitTest.h>
9 #include <pv/pvAccess.h>
10 
11 #define epicsExportSharedSymbols
12 #include <utilities.h>
13 #include <helper.h>
14 
15 typedef epicsGuard<epicsMutex> Guard;
16 typedef epicsGuardRelease<epicsMutex> UnGuard;
17 
18 namespace pvd = epics::pvData;
19 namespace pva = epics::pvAccess;
20 
21 static size_t countTestChannelRequester;
22 
23 TestChannelRequester::TestChannelRequester()
24  :laststate(pva::Channel::NEVER_CONNECTED)
25 {
26  epicsAtomicIncrSizeT(&countTestChannelRequester);
27 }
28 
29 
30 TestChannelRequester::~TestChannelRequester()
31 {
32  epicsAtomicDecrSizeT(&countTestChannelRequester);
33 }
34 
35 void TestChannelRequester::channelCreated(const pvd::Status& status, pva::Channel::shared_pointer const & channel)
36 {
37  testDiag("channelCreated %s", channel ? channel->getChannelName().c_str() : "<fails>");
38  Guard G(lock);
39  laststate = pva::Channel::CONNECTED;
40  this->status = status;
41  chan = channel;
42  wait.trigger();
43 }
44 
45 void TestChannelRequester::channelStateChange(pva::Channel::shared_pointer const & channel,
46  pva::Channel::ConnectionState connectionState)
47 {
48  testDiag("channelStateChange %s %d", channel->getChannelName().c_str(), (int)connectionState);
49  Guard G(lock);
50  laststate = connectionState;
51  wait.trigger();
52 }
53 
54 bool TestChannelRequester::waitForConnect()
55 {
56  Guard G(lock);
57  assert(chan);
58  while(true) {
59  pva::Channel::ConnectionState cur = chan->getConnectionState();
60  switch(cur) {
61  case pva::Channel::NEVER_CONNECTED:
62  break;
63  case pva::Channel::CONNECTED:
64  return true;
65  case pva::Channel::DISCONNECTED:
66  case pva::Channel::DESTROYED:
67  return false;
68  }
69  UnGuard U(G);
70  wait.wait();
71  }
72 
73 }
74 
75 static size_t countTestChannelGetRequester;
76 
77 TestChannelGetRequester::TestChannelGetRequester()
78  :connected(false)
79  ,done(false)
80 {
81  epicsAtomicIncrSizeT(&countTestChannelGetRequester);
82 }
83 
84 TestChannelGetRequester::~TestChannelGetRequester()
85 {
86  epicsAtomicDecrSizeT(&countTestChannelGetRequester);
87 }
88 
89 void TestChannelGetRequester::channelGetConnect(const epics::pvData::Status &status,
90  const epics::pvAccess::ChannelGet::shared_pointer &get,
91  const epics::pvData::Structure::const_shared_pointer &structure)
92 {
93  if(connected)
94  testFail("channelGetConnect() called twice");
95  statusConnect = status;
96  channelGet = get;
97  fielddesc = structure;
98  connected = true;
99 }
100 
101 void TestChannelGetRequester::getDone(const epics::pvData::Status &status,
102  const epics::pvAccess::ChannelGet::shared_pointer &get,
103  const epics::pvData::PVStructure::shared_pointer &pvStructure,
104  const epics::pvData::BitSet::shared_pointer &bitSet)
105 {
106  statusDone = status;
107  channelGet = get;
108  value = pvStructure;
109  changed = bitSet;
110  done = true;
111 }
112 
113 TestChannelPutRequester::TestChannelPutRequester()
114  :connected(false)
115  ,doneGet(false)
116  ,donePut(false)
117 {}
118 TestChannelPutRequester::~TestChannelPutRequester() {}
119 
120 void TestChannelPutRequester::channelPutConnect(
121  const epics::pvData::Status& status,
122  epics::pvAccess::ChannelPut::shared_pointer const & channelPut,
123  epics::pvData::Structure::const_shared_pointer const & structure)
124 {
125  statusConnect = status;
126  put = channelPut;
127  fielddesc = structure;
128  connected = true;
129 }
130 
131 void TestChannelPutRequester::putDone(
132  const epics::pvData::Status& status,
133  epics::pvAccess::ChannelPut::shared_pointer const & channelPut)
134 {
135  statusPut = status;
136  put = channelPut;
137  donePut = true;
138 }
139 
140 void TestChannelPutRequester::getDone(
141  const epics::pvData::Status& status,
142  epics::pvAccess::ChannelPut::shared_pointer const & channelPut,
143  epics::pvData::PVStructure::shared_pointer const & pvStructure,
144  epics::pvData::BitSet::shared_pointer const & bitSet)
145 {
146  statusGet = status;
147  put = channelPut;
148  value = pvStructure;
149  changed = bitSet;
150  doneGet = true;
151 }
152 
153 
154 static size_t countTestChannelMonitorRequester;
155 
156 TestChannelMonitorRequester::TestChannelMonitorRequester()
157  :connected(false)
158  ,unlistend(false)
159  ,eventCnt(0)
160 {
161  epicsAtomicIncrSizeT(&countTestChannelMonitorRequester);
162 }
163 
164 TestChannelMonitorRequester::~TestChannelMonitorRequester()
165 {
166  epicsAtomicDecrSizeT(&countTestChannelMonitorRequester);
167 }
168 
169 void TestChannelMonitorRequester::monitorConnect(pvd::Status const & status,
170  pvd::MonitorPtr const & monitor,
171  pvd::StructureConstPtr const & structure)
172 {
173  testDiag("monitorConnect %p %d", monitor.get(), (int)status.isSuccess());
174  Guard G(lock);
175  connectStatus = status;
176  dtype = structure;
177  connected = true;
178  wait.trigger();
179 }
180 
181 void TestChannelMonitorRequester::monitorEvent(pvd::MonitorPtr const & monitor)
182 {
183  testDiag("monitorEvent %p", monitor.get());
184  mon = monitor;
185  eventCnt++;
186  wait.trigger();
187 }
188 
189 void TestChannelMonitorRequester::unlisten(pvd::MonitorPtr const & monitor)
190 {
191  testDiag("unlisten %p", monitor.get());
192  Guard G(lock);
193  unlistend = true;
194  wait.trigger();
195 }
196 
197 bool TestChannelMonitorRequester::waitForConnect()
198 {
199  Guard G(lock);
200  while(!connected) {
201  UnGuard U(G);
202  wait.wait();
203  }
204  return true;
205 }
206 
207 bool TestChannelMonitorRequester::waitForEvent()
208 {
209  Guard G(lock);
210  size_t icnt = eventCnt;
211  while(!unlistend && eventCnt==icnt) {
212  UnGuard U(G);
213  wait.wait();
214  }
215  return !unlistend;
216 }
217 
218 static size_t countTestPVChannel;
219 
220 TestPVChannel::TestPVChannel(const std::tr1::shared_ptr<TestPV> &pv,
221  const std::tr1::shared_ptr<pva::ChannelRequester> &req)
222  :BaseChannel(pv->name, pv->provider, req, pv->dtype)
223  ,pv(pv)
224  ,state(CONNECTED)
225 {
226  epicsAtomicIncrSizeT(&countTestPVChannel);
227 }
228 
229 TestPVChannel::~TestPVChannel()
230 {
231  epicsAtomicDecrSizeT(&countTestPVChannel);
232 }
233 
234 TestPVChannel::ConnectionState TestPVChannel::getConnectionState()
235 {
236  Guard G(pv->lock);
237  return state;
238 }
239 
240 void TestPVChannel::getField(pva::GetFieldRequester::shared_pointer const & requester,std::string const & subField)
241 {
242  Guard G(pv->lock);
243 
244  //TODO subField?
245  requester->getDone(pvd::Status(), pv->dtype);
246 }
247 
248 pvd::Monitor::shared_pointer
249 TestPVChannel::createMonitor(
250  pvd::MonitorRequester::shared_pointer const & requester,
251  pvd::PVStructure::shared_pointer const & pvRequest)
252 {
253  shared_pointer self(weakself);
254  TestPVMonitor::shared_pointer ret(new TestPVMonitor(self, requester, 2));
255  {
256  Guard G(pv->lock);
257  monitors.insert(ret);
258  static_cast<TestPVMonitor*>(ret.get())->weakself = ret; // save wrapped weak ref
259  }
260  testDiag("TestPVChannel::createMonitor %s %p", pv->name.c_str(), ret.get());
261  requester->monitorConnect(pvd::Status(), ret, pv->dtype);
262  return ret;
263 }
264 
265 static size_t countTestPVMonitor;
266 
267 TestPVMonitor::TestPVMonitor(const TestPVChannel::shared_pointer& ch,
268  const pvd::MonitorRequester::shared_pointer& req,
269  size_t bsize)
270  :channel(ch)
271  ,requester(req)
272  ,running(false)
273  ,finalize(false)
274  ,inoverflow(false)
275  ,needWakeup(false)
276 {
277  pvd::PVDataCreatePtr fact(pvd::PVDataCreate::getPVDataCreate());
278  for(size_t i=0; i<bsize; i++) {
279  pva::MonitorElementPtr elem(new pvd::MonitorElement(fact->createPVStructure(channel->pv->dtype)));
280  free.push_back(elem);
281  }
282  overflow.reset(new pvd::MonitorElement(fact->createPVStructure(channel->pv->dtype)));
283  overflow->changedBitSet->set(0); // initially all changed
284  epicsAtomicIncrSizeT(&countTestPVMonitor);
285 }
286 
287 TestPVMonitor::~TestPVMonitor()
288 {
289  epicsAtomicDecrSizeT(&countTestPVMonitor);
290 }
291 
292 void TestPVMonitor::destroy()
293 {
294  Guard G(channel->pv->lock);
295 
296  shared_pointer self(weakself);
297  channel->monitors.erase(self); // ensure we don't get more notifications
298 }
299 
300 pvd::Status TestPVMonitor::start()
301 {
302  testDiag("TestPVMonitor::start %p", this);
303 
304  Guard G(channel->pv->lock);
305  if(finalize && buffer.empty())
306  return pvd::Status();
307 
308  if(running)
309  return pvd::Status();
310  running = true;
311 
312  // overflow element does double duty to hold this monitor's copy
313  overflow->pvStructurePtr->copyUnchecked(*channel->pv->value);
314 
315  if(this->buffer.empty()) {
316  needWakeup = true;
317  testDiag(" need wakeup");
318  }
319 
320  if(!this->free.empty()) {
321  pva::MonitorElementPtr monitorElement(this->free.front());
322 
323  if(overflow->changedBitSet->isEmpty()) {
324  overflow->changedBitSet->set(0); // initial update has all changed
325  overflow->overrunBitSet->clear();
326  }
327 
328  monitorElement->pvStructurePtr->copyUnchecked(*overflow->pvStructurePtr);
329  *monitorElement->changedBitSet = *overflow->changedBitSet;
330  *monitorElement->overrunBitSet = *overflow->overrunBitSet;
331  overflow->changedBitSet->clear();
332  overflow->overrunBitSet->clear();
333 
334  buffer.push_back(monitorElement);
335  this->free.pop_front();
336  testDiag(" push current");
337 
338  } else {
339  inoverflow = true;
340  overflow->changedBitSet->clear();
341  overflow->changedBitSet->set(0);
342  testDiag(" push overflow");
343  }
344 
345  return pvd::Status();
346 }
347 
348 pvd::Status TestPVMonitor::stop()
349 {
350  testDiag("TestPVMonitor::stop %p", this);
351  Guard G(channel->pv->lock);
352  running = false;
353  return pvd::Status();
354 }
355 
356 pva::MonitorElementPtr TestPVMonitor::poll()
357 {
358  pva::MonitorElementPtr ret;
359  Guard G(channel->pv->lock);
360  if(!buffer.empty()) {
361  ret = buffer.front();
362  buffer.pop_front();
363  }
364  testDiag("TestPVMonitor::poll %p %p", this, ret.get());
365  return ret;
366 }
367 
368 void TestPVMonitor::release(pva::MonitorElementPtr const & monitorElement)
369 {
370  Guard G(channel->pv->lock);
371  testDiag("TestPVMonitor::release %p %p", this, monitorElement.get());
372 
373  if(inoverflow) {
374  // buffer.empty() may be true if all elements poll()d by user
375  assert(this->free.empty());
376 
377  monitorElement->pvStructurePtr->copyUnchecked(*overflow->pvStructurePtr);
378  *monitorElement->changedBitSet = *overflow->changedBitSet;
379  *monitorElement->overrunBitSet = *overflow->overrunBitSet;
380 
381  overflow->changedBitSet->clear();
382  overflow->overrunBitSet->clear();
383 
384  buffer.push_back(monitorElement);
385  testDiag("TestPVMonitor::release overflow resume %p %p", this, monitorElement.get());
386  inoverflow = false;
387  } else {
388  this->free.push_back(monitorElement);
389  }
390 }
391 
392 static size_t countTestPV;
393 
394 TestPV::TestPV(const std::string& name,
395  const std::tr1::shared_ptr<TestProvider>& provider,
396  const pvd::StructureConstPtr& dtype)
397  :name(name)
398  ,provider(provider)
399  ,factory(pvd::PVDataCreate::getPVDataCreate())
400  ,dtype(dtype)
401  ,value(factory->createPVStructure(dtype))
402 {
403  epicsAtomicIncrSizeT(&countTestPV);
404 }
405 
406 TestPV::~TestPV()
407 {
408  epicsAtomicDecrSizeT(&countTestPV);
409 }
410 
411 void TestPV::post(bool notify)
412 {
413  pvd::BitSet changed;
414  changed.set(0); // all
415  post(changed, notify);
416 }
417 
418 void TestPV::post(const pvd::BitSet& changed, bool notify)
419 {
420  testDiag("post %s %d changed '%s'", name.c_str(), (int)notify, toString(changed).c_str());
421  Guard G(lock);
422 
423  channels_t::vector_type toupdate(channels.lock_vector());
424 
425  FOREACH(channels_t::vector_type::const_iterator, it, end, toupdate) // channel
426  {
427  TestPVChannel *chan = it->get();
428 
429  TestPVChannel::monitors_t::vector_type tomon(chan->monitors.lock_vector());
430  FOREACH(TestPVChannel::monitors_t::vector_type::const_iterator, it2, end2, tomon) // monitor/subscription
431  {
432  TestPVMonitor *mon = it2->get();
433 
434  if(!mon->running)
435  continue;
436 
437  mon->overflow->pvStructurePtr->copyUnchecked(*value, changed);
438 
439  if(mon->free.empty()) {
440  mon->inoverflow = true;
441  mon->overflow->overrunBitSet->or_and(*mon->overflow->changedBitSet, changed); // oflow |= prev_changed & new_changed
442  *mon->overflow->changedBitSet |= changed;
443  testDiag("overflow changed '%s' overrun '%s'",
444  toString(*mon->overflow->changedBitSet).c_str(),
445  toString(*mon->overflow->overrunBitSet).c_str());
446 
447  } else {
448  assert(!mon->inoverflow);
449 
450  if(mon->buffer.empty())
451  mon->needWakeup = true;
452 
453  pvd::MonitorElementPtr& elem(mon->free.front());
454  // Note: can't use 'changed' to optimize this copy since we don't know
455  // the state of the free element
456  elem->pvStructurePtr->copyUnchecked(*mon->overflow->pvStructurePtr);
457  *elem->changedBitSet = changed;
458  elem->overrunBitSet->clear(); // redundant/paranoia
459 
460  mon->buffer.push_back(elem);
461  mon->free.pop_front();
462  testDiag("push %p changed '%s' overflow '%s'", elem.get(),
463  toString(*elem->changedBitSet).c_str(),
464  toString(*elem->overrunBitSet).c_str());
465  }
466 
467  if(mon->needWakeup && notify) {
468  testDiag(" wakeup");
469  mon->needWakeup = false;
470  pva::MonitorRequester::shared_pointer req(mon->requester.lock());
471  UnGuard U(G);
472  if(req)
473  req->monitorEvent(*it2);
474  }
475  }
476  }
477 }
478 
479 void TestPV::disconnect()
480 {
481  Guard G(lock);
482  channels_t::vector_type toupdate(channels.lock_vector());
483 
484  FOREACH(channels_t::vector_type::const_iterator, it, end, toupdate) // channel
485  {
486  TestPVChannel *chan = it->get();
487 
488  chan->state = TestPVChannel::DISCONNECTED;
489  {
490  pva::ChannelRequester::shared_pointer req(chan->requester.lock());
491  UnGuard U(G);
492  if(req)
493  req->channelStateChange(*it, TestPVChannel::DISCONNECTED);
494  }
495  }
496 }
497 
498 static size_t countTestProvider;
499 
500 TestProvider::TestProvider()
501 {
502  epicsAtomicIncrSizeT(&countTestProvider);
503 }
504 
505 TestProvider::~TestProvider()
506 {
507  epicsAtomicDecrSizeT(&countTestProvider);
508 }
509 
510 void TestProvider::destroy()
511 {
512  // TODO: disconnect all?
513 }
514 
515 pva::ChannelFind::shared_pointer
516 TestProvider::channelList(pva::ChannelListRequester::shared_pointer const & requester)
517 {
518  pva::ChannelFind::shared_pointer ret;
519  pvd::PVStringArray::const_svector names;
520  requester->channelListResult(pvd::Status(pvd::Status::STATUSTYPE_FATAL, "Not implemented"),
521  ret,
522  names,
523  true);
524  return ret;
525 }
526 
527 pva::ChannelFind::shared_pointer
528 TestProvider::channelFind(std::string const & channelName,
529  pva::ChannelFindRequester::shared_pointer const & requester)
530 {
531  pva::ChannelFind::shared_pointer ret;
532  requester->channelFindResult(pvd::Status(pvd::Status::STATUSTYPE_FATAL, "Not implemented"),
533  ret, false);
534  return ret;
535 }
536 
537 pva::Channel::shared_pointer
538 TestProvider::createChannel(std::string const & channelName,pva::ChannelRequester::shared_pointer const & requester,
539  short priority)
540 {
541  return createChannel(channelName, requester, priority, "<unused>");
542 }
543 
544 pva::Channel::shared_pointer
545 TestProvider::createChannel(std::string const & channelName,
546  pva::ChannelRequester::shared_pointer const & requester,
547  short priority, std::string const & address)
548 {
549  pva::Channel::shared_pointer ret;
550 
551  {
552  Guard G(lock);
553 
554  TestPV::shared_pointer pv(pvs.find(channelName));
555  if(pv) {
556  TestPVChannel::shared_pointer chan(new TestPVChannel(pv, requester));
557  pv->channels.insert(chan);
558  chan->weakself = chan;
559  ret = chan;
560  }
561  }
562 
563  if(ret) {
564  requester->channelCreated(pvd::Status(), ret);
565  } else {
566  requester->channelCreated(pvd::Status(pvd::Status::STATUSTYPE_ERROR, "PV not found"), ret);
567  }
568  testDiag("createChannel %s %p", channelName.c_str(), ret.get());
569  return ret;
570 }
571 
572 TestPV::shared_pointer
573 TestProvider::addPV(const std::string& name, const pvd::StructureConstPtr& tdef)
574 {
575  Guard G(lock);
576  TestPV::shared_pointer ret(new TestPV(name, shared_from_this(), tdef));
577  pvs.insert(name, ret);
578  return ret;
579 }
580 
581 void TestProvider::dispatch()
582 {
583  Guard G(lock);
584  testDiag("TestProvider::dispatch");
585 
586  pvs_t::lock_vector_type allpvs(pvs.lock_vector());
587  FOREACH(pvs_t::lock_vector_type::const_iterator, pvit, pvend, allpvs)
588  {
589  TestPV *pv = pvit->second.get();
590  TestPV::channels_t::vector_type channels(pv->channels.lock_vector());
591 
592  FOREACH(TestPV::channels_t::vector_type::const_iterator, chit, chend, channels)
593  {
594  TestPVChannel *chan = chit->get();
595  TestPVChannel::monitors_t::vector_type monitors(chan->monitors.lock_vector());
596 
597  if(!chan->isConnected())
598  continue;
599 
600  FOREACH(TestPVChannel::monitors_t::vector_type::const_iterator, monit, monend, monitors)
601  {
602  TestPVMonitor *mon = monit->get();
603 
604  if(mon->finalize || !mon->running)
605  continue;
606 
607  if(mon->needWakeup) {
608  testDiag(" wakeup monitor %p", mon);
609  mon->needWakeup = false;
610  pva::MonitorRequester::shared_pointer req(mon->requester.lock());
611  UnGuard U(G);
612  if(req)
613  req->monitorEvent(*monit);
614  }
615  }
616  }
617  }
618 }
619 
620 void TestProvider::testCounts()
621 {
622  int ok = 1;
623  size_t temp;
624 #define TESTC(name) temp=epicsAtomicGetSizeT(&count##name); ok &= temp==0; testDiag("num. live " #name " %u", (unsigned)temp)
626  TESTC(TestChannelRequester);
627  TESTC(TestProvider);
628  TESTC(TestPV);
629  TESTC(TestPVChannel);
630  TESTC(TestPVMonitor);
631 #undef TESTC
632  testOk(ok, "All instances free'd");
633 }
lock_vector_type lock_vector() const
Definition: weakmap.h:259
value_pointer find(const K &k) const
Definition: weakmap.h:215
vector_type lock_vector() const
Definition: weakset.h:268
value_pointer insert(const K &k, value_pointer &v)
Definition: weakmap.h:230
void insert(value_pointer &)
Definition: weakset.h:227