pva2pva  1.4.1
 All Classes Functions Variables Pages
pdbsingle.cpp
1 #include <sstream>
2 
3 #include <string.h>
4 
5 #include <asLib.h>
6 #include <dbAccess.h>
7 #include <dbChannel.h>
8 #include <dbStaticLib.h>
9 #include <errlog.h>
10 #include <dbNotify.h>
11 #include <osiSock.h>
12 #include <epicsAtomic.h>
13 
14 #include <pv/epicsException.h>
15 #include <pv/pvAccess.h>
16 #include <pv/security.h>
17 #include <pv/configuration.h>
18 
19 #include "helper.h"
20 #include "pdbsingle.h"
21 #include "pdb.h"
22 
23 namespace pvd = epics::pvData;
24 namespace pva = epics::pvAccess;
25 
26 size_t PDBSinglePV::num_instances;
27 size_t PDBSingleChannel::num_instances;
28 size_t PDBSinglePut::num_instances;
29 size_t PDBSingleMonitor::num_instances;
30 
31 typedef epicsGuard<epicsMutex> Guard;
32 
33 static
34 void pdb_single_event(void *user_arg, struct dbChannel *chan,
35  int eventsRemaining, struct db_field_log *pfl)
36 {
37  DBEvent *evt=(DBEvent*)user_arg;
38  try{
39  PDBSinglePV::shared_pointer self(std::tr1::static_pointer_cast<PDBSinglePV>(((PDBSinglePV*)evt->self)->shared_from_this()));
40  PDBSinglePV::interested_remove_t temp;
41  {
42  Guard G(self->lock);
43 
44  // we have exclusive use of self->scratch
45  self->scratch.clear();
46  {
47  DBScanLocker L(dbChannelRecord(self->chan));
48  // dbGet() into self->complete
49  self->pvif->put(self->scratch, evt->dbe_mask, pfl);
50  }
51 
52  if(evt->dbe_mask&DBE_PROPERTY)
53  self->hadevent_PROPERTY = true;
54  else
55  self->hadevent_VALUE = true;
56 
57  if(self->hadevent_VALUE && self->hadevent_PROPERTY) {
58  self->interested_iterating = true;
59 
60  FOREACH(PDBSinglePV::interested_t::const_iterator, it, end, self->interested) {
61  PDBSingleMonitor& mon = **it;
62  // from self->complete into monitor queue element
63  mon.post(G, self->scratch); // G unlocked during call
64  }
65 
66  while(!self->interested_add.empty()) {
67  PDBSinglePV::interested_t::iterator first(self->interested_add.begin());
68  self->interested.insert(*first);
69  self->interested_add.erase(first);
70  }
71 
72  temp.swap(self->interested_remove);
73  for(PDBSinglePV::interested_remove_t::iterator it(temp.begin()),
74  end(temp.end()); it != end; ++it)
75  {
76  self->interested.erase(static_cast<PDBSingleMonitor*>(it->get()));
77  }
78 
79  self->interested_iterating = false;
80 
81  self->finalizeMonitor();
82  }
83  }
84 
85  }catch(std::tr1::bad_weak_ptr&){
86  /* We are racing destruction of the PDBSinglePV, but things are ok.
87  * The destructor is running, but has not completed db_cancel_event()
88  * so storage is still valid.
89  * Just do nothing
90  */
91  }catch(std::exception& e){
92  std::cerr<<"Unhandled exception in pdb_single_event(): "<<e.what()<<"\n"
93  <<SHOW_EXCEPTION(e)<<"\n";
94  }
95 }
96 
97 PDBSinglePV::PDBSinglePV(DBCH& chan,
98  const PDBProvider::shared_pointer& prov)
99  :provider(prov)
100  ,builder(new ScalarBuilder(chan.chan))
101  ,interested_iterating(false)
102  ,evt_VALUE(this)
103  ,evt_PROPERTY(this)
104  ,hadevent_VALUE(false)
105  ,hadevent_PROPERTY(false)
106 {
107  if(ellCount(&chan.chan->pre_chain) || ellCount(&chan.chan->post_chain)) {
108  DBCH temp(dbChannelName(chan.chan));
109  this->chan2.swap(temp);
110  }
111  this->chan.swap(chan);
112  fielddesc = std::tr1::static_pointer_cast<const pvd::Structure>(builder->dtype());
113 
114  complete = pvd::getPVDataCreate()->createPVStructure(fielddesc);
115  FieldName temp;
116  pvif.reset(builder->attach(complete, temp));
117 
118  epics::atomic::increment(num_instances);
119 }
120 
121 PDBSinglePV::~PDBSinglePV()
122 {
123  epics::atomic::decrement(num_instances);
124 }
125 
126 void PDBSinglePV::activate()
127 {
128  dbChannel *pchan = this->chan2.chan ? this->chan2.chan : this->chan.chan;
129  evt_VALUE.create(provider->event_context, this->chan, &pdb_single_event, DBE_VALUE|DBE_ALARM);
130  evt_PROPERTY.create(provider->event_context, pchan, &pdb_single_event, DBE_PROPERTY);
131 }
132 
133 pva::Channel::shared_pointer
134 PDBSinglePV::connect(const std::tr1::shared_ptr<PDBProvider>& prov,
135  const pva::ChannelRequester::shared_pointer& req)
136 {
137  PDBSingleChannel::shared_pointer ret(new PDBSingleChannel(shared_from_this(), req));
138 
139  ret->cred.update(req);
140 
141  ret->aspvt.add(chan, ret->cred);
142 
143  return ret;
144 }
145 
146 void PDBSinglePV::addMonitor(PDBSingleMonitor* mon)
147 {
148  Guard G(lock);
149  if(interested.empty() && interested_add.empty()) {
150  // first monitor
151  // start subscription
152 
153  hadevent_VALUE = false;
154  hadevent_PROPERTY = false;
155  db_event_enable(evt_VALUE.subscript);
156  db_event_enable(evt_PROPERTY.subscript);
157  db_post_single_event(evt_VALUE.subscript);
158  db_post_single_event(evt_PROPERTY.subscript);
159 
160  } else if(hadevent_VALUE && hadevent_PROPERTY) {
161  // new subscriber and already had initial update
162  mon->post(G);
163  } // else new subscriber, but no initial update. so just wait
164 
165  if(interested_iterating) {
166  interested_add.insert(mon);
167  } else {
168  interested.insert(mon);
169  }
170 }
171 
172 void PDBSinglePV::removeMonitor(PDBSingleMonitor* mon)
173 {
174  Guard G(lock);
175 
176  if(interested_add.erase(mon)) {
177  // and+remove while iterating. no-op
178 
179  } else if(interested_iterating) {
180  // keep monitor alive while iterating
181  interested_remove.insert(mon->shared_from_this());
182 
183  } else {
184  interested.erase(mon);
185  finalizeMonitor();
186  }
187 }
188 
189 void PDBSinglePV::finalizeMonitor()
190 {
191  assert(!interested_iterating);
192 
193  if(interested.empty()) {
194  db_event_disable(evt_VALUE.subscript);
195  db_event_disable(evt_PROPERTY.subscript);
196  }
197 }
198 
199 PDBSingleChannel::PDBSingleChannel(const PDBSinglePV::shared_pointer& pv,
200  const pva::ChannelRequester::shared_pointer& req)
201  :BaseChannel(dbChannelName(pv->chan), pv->provider, req, pv->fielddesc)
202  ,pv(pv)
203 {
204  assert(!!this->pv);
205  epics::atomic::increment(num_instances);
206 }
207 
208 PDBSingleChannel::~PDBSingleChannel()
209 {
210  epics::atomic::decrement(num_instances);
211 }
212 
213 void PDBSingleChannel::printInfo(std::ostream& out)
214 {
215  if(aspvt.canWrite())
216  out << "RW ";
217  else
218  out << "RO ";
219  out<<(&cred.user[0])<<'@'<<(&cred.host[0]);
220  for(size_t i=0, N=cred.groups.size(); i<N; i++) {
221  out<<", "<<(&cred.groups[i][0]);
222  }
223  out<<"\n";
224 }
225 
226 pva::ChannelPut::shared_pointer
227 PDBSingleChannel::createChannelPut(
228  pva::ChannelPutRequester::shared_pointer const & requester,
229  pvd::PVStructure::shared_pointer const & pvRequest)
230 {
231  PDBSinglePut::shared_pointer ret(new PDBSinglePut(shared_from_this(), requester, pvRequest));
232  requester->channelPutConnect(pvd::Status(), ret, fielddesc);
233  return ret;
234 }
235 
236 
237 pva::Monitor::shared_pointer
238 PDBSingleChannel::createMonitor(
239  pva::MonitorRequester::shared_pointer const & requester,
240  pvd::PVStructure::shared_pointer const & pvRequest)
241 {
242  PDBSingleMonitor::shared_pointer ret(new PDBSingleMonitor(pv->shared_from_this(), requester, pvRequest));
243  ret->weakself = ret;
244  assert(!!pv->complete);
245  guard_t G(pv->lock);
246  ret->connect(G, pv->complete);
247  return ret;
248 }
249 
250 
251 static
252 int single_put_callback(struct processNotify *notify,notifyPutType type)
253 {
254  PDBSinglePut *self = (PDBSinglePut*)notify->usrPvt;
255 
256  if(notify->status!=notifyOK) return 0;
257 
258  // we've previously ensured that wait_changed&DBE_VALUE is true
259 
260  switch(type) {
261  case putDisabledType:
262  return 0;
263  case putFieldType:
264  {
265  DBScanLocker L(notify->chan);
266  self->wait_pvif->get(*self->wait_changed);
267  }
268  break;
269  case putType:
270  self->wait_pvif->get(*self->wait_changed);
271  break;
272  }
273  return 1;
274 }
275 
276 static
277 void single_done_callback(struct processNotify *notify)
278 {
279  PDBSinglePut *self = (PDBSinglePut*)notify->usrPvt;
280  pvd::Status sts;
281 
282  // busy state should be 1 (normal completion) or 2 (if cancel in progress)
283  if(epics::atomic::compareAndSwap(self->notifyBusy, 1, 0)==0) {
284  std::cerr<<"PDBSinglePut dbNotify state error?\n";
285  }
286 
287  switch(notify->status) {
288  case notifyOK:
289  break;
290  case notifyCanceled:
291  return; // skip notification
292  case notifyError:
293  sts = pvd::Status::error("Error in dbNotify");
294  break;
295  case notifyPutDisabled:
296  sts = pvd::Status::error("Put disabled");
297  break;
298  }
299 
300  PDBSinglePut::requester_type::shared_pointer req(self->requester.lock());
301  if(req)
302  req->putDone(sts, self->shared_from_this());
303 }
304 
305 PDBSinglePut::PDBSinglePut(const PDBSingleChannel::shared_pointer &channel,
306  const pva::ChannelPutRequester::shared_pointer &requester,
307  const pvd::PVStructure::shared_pointer &pvReq)
308  :channel(channel)
309  ,requester(requester)
310  ,changed(new pvd::BitSet(channel->fielddesc->getNumberFields()))
311  ,pvf(pvd::getPVDataCreate()->createPVStructure(channel->fielddesc))
312  ,pvif(channel->pv->builder->attach(pvf, FieldName()))
313  ,notifyBusy(0)
314  ,doProc(PVIF::ProcPassive)
315  ,doWait(false)
316 {
317  epics::atomic::increment(num_instances);
318  dbChannel *chan = channel->pv->chan;
319 
320  try {
321  getS<pvd::boolean>(pvReq, "record._options.block", doWait);
322  } catch(std::runtime_error& e) {
323  requester->message(std::string("block= not understood : ")+e.what(), pva::warningMessage);
324  }
325 
326  std::string proccmd;
327  if(getS<std::string>(pvReq, "record._options.process", proccmd)) {
328  if(proccmd=="true") {
329  doProc = PVIF::ProcForce;
330  } else if(proccmd=="false") {
331  doProc = PVIF::ProcInhibit;
332  doWait = false; // no point in waiting
333  } else if(proccmd=="passive") {
334  doProc = PVIF::ProcPassive;
335  } else {
336  requester->message("process= expects: true|false|passive", pva::warningMessage);
337  }
338  }
339 
340  memset((void*)&notify, 0, sizeof(notify));
341  notify.usrPvt = (void*)this;
342  notify.chan = chan;
343  notify.putCallback = &single_put_callback;
344  notify.doneCallback = &single_done_callback;
345 }
346 
347 PDBSinglePut::~PDBSinglePut()
348 {
349  cancel();
350  epics::atomic::decrement(num_instances);
351 }
352 
353 void PDBSinglePut::put(pvd::PVStructure::shared_pointer const & value,
354  pvd::BitSet::shared_pointer const & changed)
355 {
356  dbChannel *chan = channel->pv->chan;
357  dbFldDes *fld = dbChannelFldDes(chan);
358 
359  AsWritePvt asWritePvt (
360  asTrapWriteWithData(channel->aspvt.aspvt,
361  std::string(channel->cred.user.begin(), channel->cred.user.end()).c_str(),
362  std::string(channel->cred.host.begin(), channel->cred.host.end()).c_str(),
363  chan,
364  chan->final_type,
365  chan->final_no_elements,
366  NULL
367  )
368  );
369 
370  pvd::Status ret;
371  if(!channel->aspvt.canWrite()) {
372  ret = pvd::Status::error("Put not permitted");
373 
374  } else if(dbChannelFieldType(chan)>=DBF_INLINK && dbChannelFieldType(chan)<=DBF_FWDLINK) {
375  try{
376  std::string lval(value->getSubFieldT<pvd::PVScalar>("value")->getAs<std::string>());
377  long status = dbChannelPutField(chan, DBF_STRING, lval.c_str(), 1);
378  if(status)
379  ret = pvd::Status(pvd::Status::error("dbPutField() error"));
380  }catch(std::exception& e) {
381  std::ostringstream strm;
382  strm<<"Failed to put link field "<<dbChannelName(chan)<<"."<<fld->name<<" : "<<e.what()<<"\n";
383  ret = pvd::Status(pvd::Status::error(strm.str()));
384  }
385 
386  } else if(doWait) {
387  // TODO: dbNotify doesn't allow us for force processing
388 
389  // assume value may be a different struct each time
390  p2p::auto_ptr<PVIF> putpvif(channel->pv->builder->attach(value, FieldName()));
391  unsigned mask = putpvif->dbe(*changed);
392 
393  if(mask!=DBE_VALUE) {
394  requester_type::shared_pointer req(requester.lock());
395  if(req)
396  req->message("block=true only supports .value (empty put mask)", pva::warningMessage);
397  }
398 
399  if(epics::atomic::compareAndSwap(notifyBusy, 0, 1)!=0)
400  throw std::logic_error("Previous put() not complete");
401 
402  notify.requestType = (mask&DBE_VALUE) ? putProcessRequest : processRequest;
403 
404  wait_pvif = PTRMOVE(putpvif);
405  wait_changed = changed;
406 
407  dbProcessNotify(&notify);
408 
409  return; // skip notification
410  } else {
411  // assume value may be a different struct each time
412  p2p::auto_ptr<PVIF> putpvif(channel->pv->builder->attach(value, FieldName()));
413  try{
414  DBScanLocker L(chan);
415  ret = putpvif->get(*changed, doProc);
416 
417  }catch(std::runtime_error& e){
418  ret = pvd::Status::error(e.what());
419  }
420  }
421  requester_type::shared_pointer req(requester.lock());
422  if(req)
423  req->putDone(ret, shared_from_this());
424 }
425 
426 void PDBSinglePut::cancel()
427 {
428  if(epics::atomic::compareAndSwap(notifyBusy, 1, 2)==1) {
429  dbNotifyCancel(&notify);
430  wait_changed.reset();
431  wait_pvif.reset();
432  epics::atomic::set(notifyBusy, 0);
433  }
434 }
435 
436 void PDBSinglePut::get()
437 {
438  changed->clear();
439  {
440  DBScanLocker L(pvif->chan);
441  LocalFL FL(NULL, pvif->chan);
442  pvif->put(*changed, DBE_VALUE|DBE_ALARM|DBE_PROPERTY, FL.pfl);
443  }
444  //TODO: report unused fields as changed?
445  changed->clear();
446  changed->set(0);
447 
448  requester_type::shared_pointer req(requester.lock());
449  if(req)
450  req->getDone(pvd::Status(), shared_from_this(), pvf, changed);
451 }
452 
453 PDBSingleMonitor::PDBSingleMonitor(const PDBSinglePV::shared_pointer& pv,
454  const requester_t::shared_pointer& requester,
455  const pvd::PVStructure::shared_pointer& pvReq)
456  :BaseMonitor(pv->lock, requester, pvReq)
457  ,pv(pv)
458 {
459  epics::atomic::increment(num_instances);
460 }
461 
462 PDBSingleMonitor::~PDBSingleMonitor()
463 {
464  destroy();
465  epics::atomic::decrement(num_instances);
466 }
467 
468 void PDBSingleMonitor::destroy()
469 {
470  BaseMonitor::destroy();
471 }
472 
473 void PDBSingleMonitor::onStart()
474 {
475  pv->addMonitor(this);
476 }
477 
478 void PDBSingleMonitor::onStop()
479 {
480  guard_t G(pv->lock);
481 
482  pv->removeMonitor(this);
483 }
484 
486 {
487  guard_t G(pv->lock);
488  post(G);
489 }
virtual void requestUpdate() OVERRIDE FINAL
Definition: pdbsingle.cpp:485
bool post(guard_t &guard, const epics::pvData::BitSet &updated, no_overflow)
post update if queue not full, if full return false w/o overflow
Definition: pvahelper.h:136
Definition: pvif.h:81
Definition: pvif.h:365
Definition: pdb.h:77
Definition: pvif.h:223
Definition: pvif.h:250