pva2pva  1.4.1
 All Classes Functions Variables Pages
pdbgroup.cpp
1 
2 #include <stdio.h>
3 
4 // rediect stdio/stderr for iocsh
5 #include <epicsStdio.h>
6 
7 #include <epicsAtomic.h>
8 #include <dbAccess.h>
9 #include <dbChannel.h>
10 #include <dbStaticLib.h>
11 #include <asLib.h>
12 
13 #include <pv/pvAccess.h>
14 #include <pv/configuration.h>
15 #include <pv/epicsException.h>
16 
17 #include "helper.h"
18 #include "pdbgroup.h"
19 #include "pdb.h"
20 
21 namespace pvd = epics::pvData;
22 namespace pva = epics::pvAccess;
23 
24 size_t PDBGroupPV::num_instances;
25 size_t PDBGroupChannel::num_instances;
26 size_t PDBGroupPut::num_instances;
27 size_t PDBGroupMonitor::num_instances;
28 
29 typedef epicsGuard<epicsMutex> Guard;
30 
31 void pdb_group_event(void *user_arg, struct dbChannel *chan,
32  int eventsRemaining, struct db_field_log *pfl)
33 {
34  DBEvent *evt=(DBEvent*)user_arg;
35  unsigned idx = evt->index;
36  try{
37  PDBGroupPV::shared_pointer self(std::tr1::static_pointer_cast<PDBGroupPV>(((PDBGroupPV*)evt->self)->shared_from_this()));
38  PDBGroupPV::Info& info = self->members[idx];
39 
40  PDBGroupPV::interested_remove_t temp;
41  {
42 
43  Guard G(self->lock);
44 
45  self->scratch.clear();
46  if(evt->dbe_mask&DBE_PROPERTY || !self->monatomic)
47  {
48  DBScanLocker L(dbChannelRecord(info.chan));
49  self->members[idx].pvif->put(self->scratch, evt->dbe_mask, pfl);
50 
51  } else {
52  // we ignore 'pfl' (and the dbEvent queue) when collecting an atomic snapshot
53 
54  DBManyLocker L(info.locker); // lock only those records in the triggers list
55  FOREACH(PDBGroupPV::Info::triggers_t::const_iterator, it, end, info.triggers)
56  {
57  size_t i = *it;
58  // go get a consistent snapshot we must ignore the db_field_log which came through the dbEvent buffer
59  LocalFL FL(NULL, self->members[i].chan); // create a read fl if needed
60  self->members[i].pvif->put(self->scratch, evt->dbe_mask, FL.pfl);
61  }
62  }
63 
64  if(!(evt->dbe_mask&DBE_PROPERTY)) {
65  if(!info.had_initial_VALUE) {
66  info.had_initial_VALUE = true;
67  assert(self->initial_waits>0);
68  self->initial_waits--;
69  }
70  } else {
71  if(!info.had_initial_PROPERTY) {
72  info.had_initial_PROPERTY = true;
73  assert(self->initial_waits>0);
74  self->initial_waits--;
75  }
76  }
77 
78  if(self->initial_waits==0) {
79  self->interested_iterating = true;
80 
81  FOREACH(PDBGroupPV::interested_t::const_iterator, it, end, self->interested) {
82  PDBGroupMonitor& mon = **it;
83  mon.post(G, self->scratch); // G unlocked
84  }
85 
86  {
87  Guard G(self->lock);
88 
89  assert(self->interested_iterating);
90 
91  while(!self->interested_add.empty()) {
92  PDBGroupPV::interested_t::iterator first(self->interested_add.begin());
93  self->interested.insert(*first);
94  self->interested_add.erase(first);
95  }
96 
97  temp.swap(self->interested_remove);
98  for(PDBGroupPV::interested_remove_t::iterator it(temp.begin()),
99  end(temp.end()); it != end; ++it)
100  {
101  self->interested.erase(static_cast<PDBGroupMonitor*>(it->get()));
102  }
103 
104  self->interested_iterating = false;
105 
106  self->finalizeMonitor();
107  }
108  }
109  }
110 
111  }catch(std::tr1::bad_weak_ptr&){
112  /* We are racing destruction of the PDBGroupPV, but things are ok.
113  * The destructor is running, but has not completed db_cancel_event()
114  * so storage is still valid.
115  * Just do nothing
116  */
117  }catch(std::exception& e){
118  std::cerr<<"Unhandled exception in pdb_group_event(): "<<e.what()<<"\n"
119  <<SHOW_EXCEPTION(e)<<"\n";
120  }
121 }
122 
123 PDBGroupPV::PDBGroupPV()
124  :pgatomic(false)
125  ,monatomic(false)
126  ,interested_iterating(false)
127  ,initial_waits(0)
128 {
129  epics::atomic::increment(num_instances);
130 }
131 
132 PDBGroupPV::~PDBGroupPV()
133 {
134  epics::atomic::decrement(num_instances);
135 }
136 
137 pva::Channel::shared_pointer
138 PDBGroupPV::connect(const std::tr1::shared_ptr<PDBProvider>& prov,
139  const pva::ChannelRequester::shared_pointer& req)
140 {
141  PDBGroupChannel::shared_pointer ret(new PDBGroupChannel(shared_from_this(), prov, req));
142 
143  ret->cred.update(req);
144 
145  ret->aspvt.resize(members.size());
146  for(size_t i=0, N=members.size(); i<N; i++)
147  {
148  ret->aspvt[i].add(members[i].chan, ret->cred);
149  }
150 
151  return ret;
152 }
153 
154 // caller must not hold lock
155 void PDBGroupPV::addMonitor(PDBGroupMonitor *mon)
156 {
157  Guard G(lock);
158  if(interested.empty() && interested_add.empty()) {
159  // first monitor
160  // start subscriptions
161 
162  size_t ievts = 0;
163  for(size_t i=0; i<members.size(); i++) {
164  PDBGroupPV::Info& info = members[i];
165 
166  if(!!info.evt_VALUE) {
167  db_event_enable(info.evt_VALUE.subscript);
168  db_post_single_event(info.evt_VALUE.subscript);
169  ievts++;
170  info.had_initial_VALUE = false;
171  } else {
172  info.had_initial_VALUE = true;
173  }
174  assert(info.evt_PROPERTY.subscript);
175  db_event_enable(info.evt_PROPERTY.subscript);
176  db_post_single_event(info.evt_PROPERTY.subscript);
177  ievts++;
178  info.had_initial_PROPERTY = false;
179  }
180  initial_waits = ievts;
181 
182  } else if(initial_waits==0) {
183  // new subscriber and already had initial update
184  mon->post(G);
185  } // else new subscriber, but no initial update. so just wait
186 
187  if(interested_iterating)
188  interested_add.insert(mon);
189  else
190  interested.insert(mon);
191 }
192 
193 // caller must not hold lock
194 void PDBGroupPV::removeMonitor(PDBGroupMonitor *mon)
195 {
196  Guard G(lock);
197 
198  if(interested_add.erase(mon)) {
199  // and+remove while iterating. no-op
200 
201  } else if(interested_iterating) {
202  // keep the monitor alive until we've finished iterating
203  interested_remove.insert(mon->shared_from_this());
204 
205  } else {
206  interested.erase(mon);
207  finalizeMonitor();
208  }
209 }
210 
211 // must hold lock
212 void PDBGroupPV::finalizeMonitor()
213 {
214  assert(!interested_iterating);
215 
216  if(!interested.empty())
217  return;
218 
219  // last subscriber
220  for(size_t i=0; i<members.size(); i++) {
221  PDBGroupPV::Info& info = members[i];
222 
223  if(!!info.evt_VALUE) {
224  db_event_disable(info.evt_VALUE.subscript);
225  }
226  db_event_disable(info.evt_PROPERTY.subscript);
227  }
228 }
229 
230 void PDBGroupPV::show(int lvl)
231 {
232  // no locking as we only print things which are const after initialization
233 
234  printf(" Atomic Get/Put:%s Monitor:%s Members:%zu\n",
235  pgatomic?"yes":"no", monatomic?"yes":"no", members.size());
236 
237  if(lvl<=1)
238  return;
239 
240  for(members_t::const_iterator it(members.begin()), end(members.end());
241  it != end; ++it)
242  {
243  const Info& info = *it;
244  printf(" ");
245  info.attachment.show(); // printf()s
246  printf("\t<-> %s\n", dbChannelName(info.chan));
247  }
248 }
249 
250 
251 PDBGroupChannel::PDBGroupChannel(const PDBGroupPV::shared_pointer& pv,
252  const std::tr1::shared_ptr<pva::ChannelProvider>& prov,
253  const pva::ChannelRequester::shared_pointer& req)
254  :BaseChannel(pv->name, prov, req, pv->fielddesc)
255  ,pv(pv)
256 {
257  epics::atomic::increment(num_instances);
258 }
259 
260 PDBGroupChannel::~PDBGroupChannel()
261 {
262  epics::atomic::decrement(num_instances);
263 }
264 
265 void PDBGroupChannel::printInfo(std::ostream& out)
266 {
267  out<<"PDBGroupChannel";
268 }
269 
270 pva::ChannelPut::shared_pointer
271 PDBGroupChannel::createChannelPut(
272  pva::ChannelPutRequester::shared_pointer const & requester,
273  pvd::PVStructure::shared_pointer const & pvRequest)
274 {
275  PDBGroupPut::shared_pointer ret(new PDBGroupPut(shared_from_this(), requester, pvRequest));
276  requester->channelPutConnect(pvd::Status(), ret, fielddesc);
277  return ret;
278 }
279 
280 pva::Monitor::shared_pointer
281 PDBGroupChannel::createMonitor(
282  pva::MonitorRequester::shared_pointer const & requester,
283  pvd::PVStructure::shared_pointer const & pvRequest)
284 {
285  PDBGroupMonitor::shared_pointer ret(new PDBGroupMonitor(pv->shared_from_this(), requester, pvRequest));
286  ret->weakself = ret;
287  assert(!!pv->complete);
288  guard_t G(pv->lock);
289  ret->connect(G, pv->complete);
290  return ret;
291 }
292 
293 
294 
295 PDBGroupPut::PDBGroupPut(const PDBGroupChannel::shared_pointer& channel,
296  const requester_type::shared_pointer& requester,
297  const epics::pvData::PVStructure::shared_pointer &pvReq)
298  :channel(channel)
299  ,requester(requester)
300  ,atomic(channel->pv->pgatomic)
301  ,doWait(false)
302  ,doProc(PVIF::ProcPassive)
303  ,changed(new pvd::BitSet(channel->fielddesc->getNumberFields()))
304  ,pvf(pvd::getPVDataCreate()->createPVStructure(channel->fielddesc))
305 {
306  epics::atomic::increment(num_instances);
307  try {
308  getS<pvd::boolean>(pvReq, "record._options.atomic", atomic);
309 
310  getS<pvd::boolean>(pvReq, "record._options.block", doWait);
311 
312  std::string proccmd;
313  if(getS<std::string>(pvReq, "record._options.process", proccmd)) {
314  if(proccmd=="true") {
315  doProc = PVIF::ProcForce;
316  } else if(proccmd=="false") {
317  doProc = PVIF::ProcInhibit;
318  doWait = false; // no point in waiting
319  } else if(proccmd=="passive") {
320  doProc = PVIF::ProcPassive;
321  } else {
322  requester->message("process= expects: true|false|passive", pva::warningMessage);
323  }
324  }
325  }catch(std::exception& e){
326  requester->message(std::string("Error processing request options: ")+e.what());
327  }
328 
329  pvf->getSubFieldT<pvd::PVBoolean>("record._options.atomic")->put(atomic);
330 
331 
332  const size_t npvs = channel->pv->members.size();
333  pvif.resize(npvs);
334  for(size_t i=0; i<npvs; i++)
335  {
336  PDBGroupPV::Info& info = channel->pv->members[i];
337 
338  pvif[i].reset(info.builder->attach(pvf, info.attachment));
339  }
340 }
341 
342 PDBGroupPut::~PDBGroupPut()
343 {
344  epics::atomic::decrement(num_instances);
345 }
346 
347 void PDBGroupPut::put(pvd::PVStructure::shared_pointer const & value,
348  pvd::BitSet::shared_pointer const & changed)
349 {
350  // assume value may be a different struct each time... lot of wasted prep work
351  const size_t npvs = channel->pv->members.size();
352  std::vector<std::tr1::shared_ptr<PVIF> > putpvif(npvs);
353  pvd::shared_vector<AsWritePvt> asWritePvt(npvs);
354 
355  for(size_t i=0; i<npvs; i++)
356  {
357  PDBGroupPV::Info& info = channel->pv->members[i];
358 
359  AsWritePvt wrt(asTrapWriteWithData(
360  channel->aspvt.at(i).aspvt,
361  &channel->cred.user[0],
362  &channel->cred.host[0],
363  info.chan.chan,
364  info.chan->final_type,
365  info.chan->final_no_elements,
366  NULL
367  )
368  );
369  asWritePvt[i].swap(wrt);
370 
371  if(!info.allowProc) continue;
372  putpvif[i].reset(info.builder->attach(value, info.attachment));
373  }
374 
375  pvd::Status ret;
376  if(atomic) {
377  DBManyLocker L(channel->pv->locker);
378  for(size_t i=0; ret && i<npvs; i++) {
379  if(!putpvif[i].get()) continue;
380 
381  ret |= putpvif[i]->get(*changed, doProc, channel->aspvt[i].canWrite());
382  }
383 
384  } else {
385  for(size_t i=0; ret && i<npvs; i++)
386  {
387  if(!putpvif[i].get()) continue;
388 
389  PDBGroupPV::Info& info = channel->pv->members[i];
390 
391  DBScanLocker L(dbChannelRecord(info.chan));
392 
393  ret |= putpvif[i]->get(*changed,
394  info.allowProc ? doProc : PVIF::ProcInhibit,
395  channel->aspvt[i].canWrite());
396  }
397  }
398 
399  requester_type::shared_pointer req(requester.lock());
400  if(req)
401  req->putDone(ret, shared_from_this());
402 }
403 
404 void PDBGroupPut::get()
405 {
406  const size_t npvs = pvif.size();
407 
408  changed->clear();
409  if(atomic) {
410  DBManyLocker L(channel->pv->locker);
411  for(size_t i=0; i<npvs; i++) {
412  LocalFL FL(NULL, channel->pv->members[i].chan);
413  pvif[i]->put(*changed, DBE_VALUE|DBE_ALARM|DBE_PROPERTY, FL.pfl);
414  }
415  } else {
416 
417  for(size_t i=0; i<npvs; i++)
418  {
419  PDBGroupPV::Info& info = channel->pv->members[i];
420 
421  DBScanLocker L(dbChannelRecord(info.chan));
422  LocalFL FL(NULL, info.chan);
423  pvif[i]->put(*changed, DBE_VALUE|DBE_ALARM|DBE_PROPERTY, FL.pfl);
424  }
425  }
426  //TODO: report unused fields as changed?
427  changed->clear();
428  changed->set(0);
429 
430  requester_type::shared_pointer req(requester.lock());
431  if(req)
432  req->getDone(pvd::Status(), shared_from_this(), pvf, changed);
433 }
434 
435 PDBGroupMonitor::PDBGroupMonitor(const PDBGroupPV::shared_pointer& pv,
436  const epics::pvAccess::MonitorRequester::weak_pointer &requester,
437  const pvd::PVStructure::shared_pointer& pvReq)
438  :BaseMonitor(pv->lock, requester, pvReq)
439  ,pv(pv)
440 {
441  epics::atomic::increment(num_instances);
442 }
443 
444 PDBGroupMonitor::~PDBGroupMonitor()
445 {
446  destroy();
447  epics::atomic::decrement(num_instances);
448 }
449 
450 void PDBGroupMonitor::destroy()
451 {
452  BaseMonitor::destroy();
453  PDBGroupPV::shared_pointer pv;
454  {
455  Guard G(lock);
456  this->pv.swap(pv);
457  }
458 }
459 
460 void PDBGroupMonitor::onStart()
461 {
462  pv->addMonitor(this);
463 }
464 
465 void PDBGroupMonitor::onStop()
466 {
467  pv->removeMonitor(this);
468 }
469 
471 {
472  Guard G(pv->lock);
473  post(G);
474 }
bool post(guard_t &guard, const epics::pvData::BitSet &updated, no_overflow)
post update if queue not full, if full return false w/o overflow
Definition: pvahelper.h:136
Definition: pvif.h:365
Definition: pdb.h:77
virtual void requestUpdate() OVERRIDE FINAL
Definition: pdbgroup.cpp:470
Definition: pvif.h:223
Definition: pvif.h:250