pva2pva  1.4.1
 All Classes Functions Variables Pages
testmon.cpp
1 
2 #include <epicsAtomic.h>
3 #include <epicsGuard.h>
4 #include <epicsUnitTest.h>
5 #include <testMain.h>
6 
7 #include <pv/epicsException.h>
8 #include <pv/monitor.h>
9 #include <pv/thread.h>
10 #include <pv/serverContext.h>
11 
12 #include "server.h"
13 
14 #include "utilities.h"
15 
16 namespace pvd = epics::pvData;
17 namespace pva = epics::pvAccess;
18 
19 typedef epicsGuard<epicsMutex> Guard;
20 
21 namespace {
22 
23 pvd::PVStructurePtr makeRequest(size_t bsize)
24 {
25  pvd::StructureConstPtr dtype(pvd::getFieldCreate()->createFieldBuilder()
26  ->addNestedStructure("record")
27  ->addNestedStructure("_options")
28  ->add("queueSize", pvd::pvString) // yes, really. PVA wants a string
29  ->endNested()
30  ->endNested()
31  ->createStructure());
32 
33  pvd::PVStructurePtr ret(pvd::getPVDataCreate()->createPVStructure(dtype));
34  ret->getSubFieldT<pvd::PVScalar>("record._options.queueSize")->putFrom<pvd::int32>(bsize);
35 
36  return ret;
37 }
38 
39 struct TestMonitor {
40  TestProvider::shared_pointer upstream;
41  TestPV::shared_pointer test1;
42  ScalarAccessor<pvd::int32> test1_x, test1_y;
43 
44  GWServerChannelProvider::shared_pointer gateway;
45 
46  TestChannelRequester::shared_pointer client_req;
47  pva::Channel::shared_pointer client;
48 
49  // prepare providers and connect the client channel, don't setup monitor
50  TestMonitor()
51  :upstream(new TestProvider())
52  ,test1(upstream->addPV("test1", pvd::getFieldCreate()->createFieldBuilder()
53  ->add("x", pvd::pvInt)
54  ->add("y", pvd::pvInt)
55  ->createStructure()))
56  ,test1_x(test1->value, "x")
57  ,test1_y(test1->value, "y")
58  ,gateway(new GWServerChannelProvider(upstream))
59  ,client_req(new TestChannelRequester)
60  ,client(gateway->createChannel("test1", client_req))
61  {
62  testDiag("pre-test setup");
63  if(!client)
64  testAbort("channel \"test1\" not connected");
65  test1_x = 1;
66  test1_y = 2;
67  }
68 
69  ~TestMonitor()
70  {
71  client->destroy();
72  gateway->destroy(); // noop atm.
73  }
74 
75  void test_event()
76  {
77  testDiag("Push the initial event through from upstream to downstream");
78 
79  TestChannelMonitorRequester::shared_pointer mreq(new TestChannelMonitorRequester);
80  pvd::Monitor::shared_pointer mon(client->createMonitor(mreq, makeRequest(2)));
81  if(!mon) testAbort("Failed to create monitor");
82 
83  testEqual(mreq->eventCnt, 0u);
84  testOk1(mon->start().isSuccess());
85  upstream->dispatch(); // trigger monitorEvent() from upstream to gateway
86 
87  testEqual(mreq->eventCnt, 1u);
88  pva::MonitorElementPtr elem(mon->poll());
89  testOk1(!!elem.get());
90  if(!!elem.get()) testEqual(toString(*elem->changedBitSet), "{0}");
91  else testFail("oops");
92  testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==1);
93 
94  if(elem) mon->release(elem);
95 
96  testOk1(!mon->poll());
97 
98  mon->destroy();
99  }
100 
101  void test_share()
102  {
103  // here both downstream monitors are on the same Channel,
104  // which would be inefficient, and slightly unrealistic, w/ real PVA,
105  // but w/ TestProvider makes no difference
106  testDiag("Test two downstream monitors sharing the same upstream");
107 
108  TestChannelMonitorRequester::shared_pointer mreq(new TestChannelMonitorRequester);
109  pvd::Monitor::shared_pointer mon(client->createMonitor(mreq, makeRequest(2)));
110  if(!mon) testAbort("Failed to create monitor");
111 
112 
113  TestChannelMonitorRequester::shared_pointer mreq2(new TestChannelMonitorRequester);
114  pvd::Monitor::shared_pointer mon2(client->createMonitor(mreq2, makeRequest(2)));
115  if(!mon2) testAbort("Failed to create monitor2");
116 
117  testOk1(mreq->eventCnt==0);
118  testOk1(mreq2->eventCnt==0);
119  testOk1(mon->start().isSuccess());
120  testOk1(mon2->start().isSuccess());
121  upstream->dispatch(); // trigger monitorEvent() from upstream to gateway
122 
123  testOk1(mreq->eventCnt==1);
124  testOk1(mreq2->eventCnt==1);
125 
126  pva::MonitorElementPtr elem(mon->poll());
127  pva::MonitorElementPtr elem2(mon2->poll());
128  testOk1(!!elem.get());
129  testOk1(!!elem2.get());
130  testOk1(elem!=elem2);
131  testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==1);
132  testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("y")->get()==2);
133  testOk1(elem2 && elem2->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==1);
134  testOk1(elem2 && elem2->pvStructurePtr->getSubFieldT<pvd::PVInt>("y")->get()==2);
135 
136  if(elem) mon->release(elem);
137  if(elem2) mon2->release(elem2);
138 
139  testOk1(!mon->poll());
140  testOk1(!mon2->poll());
141 
142  testDiag("explicitly push an update");
143  test1_x = 42;
144  test1_y = 43;
145  pvd::BitSet changed;
146  changed.set(1); // only indicate that 'x' changed
147  test1->post(changed);
148 
149  elem = mon->poll();
150  elem2 = mon2->poll();
151  testOk1(!!elem.get());
152  testOk1(!!elem2.get());
153  testOk1(elem!=elem2);
154  if(elem) testDiag("elem changed '%s' overflow '%s'", toString(*elem->changedBitSet).c_str(), toString(*elem->overrunBitSet).c_str());
155  testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==42);
156  testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("y")->get()==2);
157  if(elem2) testDiag("elem2 changed '%s' overflow '%s'", toString(*elem2->changedBitSet).c_str(), toString(*elem2->overrunBitSet).c_str());
158  testOk1(elem2 && elem2->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==42);
159  testOk1(elem2 && elem2->pvStructurePtr->getSubFieldT<pvd::PVInt>("y")->get()==2);
160 
161  if(elem) mon->release(elem);
162  if(elem2) mon2->release(elem2);
163 
164  testOk1(!mon->poll());
165  testOk1(!mon2->poll());
166 
167  mon->destroy();
168  mon2->destroy();
169  }
170 
171  void test_ds_no_start()
172  {
173  testDiag("Test downstream monitor never start()s");
174 
175  TestChannelMonitorRequester::shared_pointer mreq(new TestChannelMonitorRequester);
176  pvd::Monitor::shared_pointer mon(client->createMonitor(mreq, makeRequest(2)));
177  if(!mon) testAbort("Failed to create monitor");
178 
179  upstream->dispatch(); // trigger monitorEvent() from upstream to gateway
180 
181  testOk1(mreq->eventCnt==0);
182  testOk1(!mon->poll());
183 
184  pvd::BitSet changed;
185  changed.set(1);
186  test1_x=50;
187  test1->post(changed, false);
188  test1_x=51;
189  test1->post(changed, false);
190  test1_x=52;
191  test1->post(changed, false);
192  test1_x=53;
193  test1->post(changed);
194 
195  testOk1(!mon->poll());
196 
197  mon->destroy();
198  }
199 
200  void test_overflow_upstream()
201  {
202  testDiag("Check behavour when upstream monitor overflows (mostly transparent)");
203 
204  TestChannelMonitorRequester::shared_pointer mreq(new TestChannelMonitorRequester);
205  pvd::Monitor::shared_pointer mon(client->createMonitor(mreq, makeRequest(2)));
206  if(!mon) testAbort("Failed to create monitor");
207 
208  testOk1(mreq->eventCnt==0);
209  testOk1(mon->start().isSuccess());
210  upstream->dispatch(); // trigger monitorEvent() from upstream to gateway
211  testOk1(mreq->eventCnt==1);
212 
213  testDiag("poll initial update");
214  pva::MonitorElementPtr elem(mon->poll());
215  testOk1(!!elem.get());
216  if(elem) mon->release(elem);
217 
218  testOk1(!mon->poll());
219 
220  // queue 4 events input buffer of size 2
221  // don't notify downstream until after overflow has occurred
222  pvd::BitSet changed;
223  changed.set(1);
224  test1_x=50;
225  testDiag("post 50");
226  test1->post(changed, false);
227  test1_x=51;
228  testDiag("post 51");
229  test1->post(changed, false);
230  test1_x=52;
231  testDiag("post 52");
232  test1->post(changed, false);
233  test1_x=53;
234  testDiag("post 53");
235  test1->post(changed);
236 
237  elem = mon->poll();
238  testOk1(!!elem.get());
239 
240  testDiag("XX %d", elem ? elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get() : -42);
241  testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==50);
242  testOk1(elem && elem->changedBitSet->nextSetBit(0)==1);
243  testOk1(elem && elem->changedBitSet->nextSetBit(2)==-1);
244  testOk1(elem && elem->overrunBitSet->nextSetBit(0)==-1);
245 
246  if(elem) mon->release(elem);
247 
248  elem = mon->poll();
249  testOk1(!!elem.get());
250 
251  testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==51);
252  testOk1(elem && elem->changedBitSet->nextSetBit(0)==1);
253  testOk1(elem && elem->changedBitSet->nextSetBit(2)==-1);
254  testOk1(elem && elem->overrunBitSet->nextSetBit(0)==-1);
255 
256  if(elem) mon->release(elem);
257 
258  elem = mon->poll();
259  testOk1(!!elem.get());
260 
261  testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==53);
262  testOk1(elem && elem->changedBitSet->nextSetBit(0)==1);
263  testOk1(elem && elem->changedBitSet->nextSetBit(2)==-1);
264  testOk1(elem && elem->overrunBitSet->nextSetBit(0)==1);
265  testOk1(elem && elem->overrunBitSet->nextSetBit(2)==-1);
266 
267  if(elem) mon->release(elem);
268 
269  testOk1(!mon->poll());
270 
271  mon->destroy();
272  }
273 
274  void test_overflow_downstream()
275  {
276  testDiag("Check behavour when downstream monitor overflows");
277 
278  TestChannelMonitorRequester::shared_pointer mreq(new TestChannelMonitorRequester);
279  pvd::Monitor::shared_pointer mon(client->createMonitor(mreq, makeRequest(2)));
280  if(!mon) testAbort("Failed to create monitor");
281 
282  testOk1(mreq->eventCnt==0);
283  testOk1(mon->start().isSuccess());
284  upstream->dispatch(); // trigger monitorEvent() from upstream to gateway
285  testOk1(mreq->eventCnt==1);
286 
287  testDiag("poll initial update");
288  pva::MonitorElementPtr elem(mon->poll());
289  testOk1(!!elem.get());
290  if(elem) mon->release(elem);
291 
292  // queue 4 events into buffer of size 2 (plus the overflow element)
293  // notify downstream after each update
294  // so the upstream queue never overflows
295  pvd::BitSet changed;
296  changed.set(1);
297  test1_x=50;
298  test1->post(changed);
299  test1_x=51;
300  test1->post(changed);
301  test1_x=52;
302  test1->post(changed);
303  test1_x=53;
304  test1->post(changed);
305 
306  elem = mon->poll();
307  testOk1(!!elem.get());
308 
309  testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==50);
310  testOk1(elem && elem->changedBitSet->nextSetBit(0)==1);
311  testOk1(elem && elem->changedBitSet->nextSetBit(2)==-1);
312  testOk1(elem && elem->overrunBitSet->nextSetBit(0)==-1);
313 
314  if(elem) mon->release(elem);
315 
316  elem = mon->poll();
317  testOk1(!!elem.get());
318 
319  testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==51);
320  testOk1(elem && elem->changedBitSet->nextSetBit(0)==1);
321  testOk1(elem && elem->changedBitSet->nextSetBit(2)==-1);
322  testOk1(elem && elem->overrunBitSet->nextSetBit(0)==-1);
323 
324  if(elem) mon->release(elem);
325 
326  elem = mon->poll();
327  testOk1(!!elem.get());
328 
329  testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==53);
330  testOk1(elem && elem->changedBitSet->nextSetBit(0)==1);
331  testOk1(elem && elem->changedBitSet->nextSetBit(2)==-1);
332  testOk1(elem && elem->overrunBitSet->nextSetBit(0)==1);
333  testOk1(elem && elem->overrunBitSet->nextSetBit(2)==-1);
334 
335  if(elem) mon->release(elem);
336 
337  testOk1(!mon->poll());
338 
339  mon->destroy();
340  }
341 };
342 
343 } // namespace
344 
345 MAIN(testmon)
346 {
347  testPlan(79);
348  TEST_METHOD(TestMonitor, test_event);
349  TEST_METHOD(TestMonitor, test_share);
350  TEST_METHOD(TestMonitor, test_ds_no_start);
351  TEST_METHOD(TestMonitor, test_overflow_upstream);
352  TEST_METHOD(TestMonitor, test_overflow_downstream);
353  TestProvider::testCounts();
354  int ok = 1;
355  size_t temp;
356 #define TESTC(name) temp=epicsAtomicGetSizeT(&name::num_instances); ok &= temp==0; testDiag("num. live " #name " %u", (unsigned)temp)
357  TESTC(GWChannel);
359  TESTC(ChannelCacheEntry);
360  TESTC(MonitorCacheEntry);
361  TESTC(MonitorUser);
362 #undef TESTC
363  testOk(ok, "All instances free'd");
364  return testDone();
365 }
Definition: chancache.h:132
Definition: chancache.h:103
Definition: chancache.h:22