8 #include <dbStaticLib.h>
12 #include <epicsAtomic.h>
14 #include <pv/epicsException.h>
15 #include <pv/pvAccess.h>
16 #include <pv/security.h>
17 #include <pv/configuration.h>
20 #include "pdbsingle.h"
23 namespace pvd = epics::pvData;
24 namespace pva = epics::pvAccess;
26 size_t PDBSinglePV::num_instances;
27 size_t PDBSingleChannel::num_instances;
28 size_t PDBSinglePut::num_instances;
29 size_t PDBSingleMonitor::num_instances;
31 typedef epicsGuard<epicsMutex> Guard;
34 void pdb_single_event(
void *user_arg,
struct dbChannel *chan,
35 int eventsRemaining,
struct db_field_log *pfl)
39 PDBSinglePV::shared_pointer
self(std::tr1::static_pointer_cast<
PDBSinglePV>(((
PDBSinglePV*)evt->self)->shared_from_this()));
40 PDBSinglePV::interested_remove_t temp;
45 self->scratch.clear();
49 self->pvif->put(self->scratch, evt->dbe_mask, pfl);
52 if(evt->dbe_mask&DBE_PROPERTY)
53 self->hadevent_PROPERTY =
true;
55 self->hadevent_VALUE =
true;
57 if(self->hadevent_VALUE && self->hadevent_PROPERTY) {
58 self->interested_iterating =
true;
60 FOREACH(PDBSinglePV::interested_t::const_iterator, it, end, self->interested) {
63 mon.
post(G, self->scratch);
66 while(!self->interested_add.empty()) {
67 PDBSinglePV::interested_t::iterator first(self->interested_add.begin());
68 self->interested.insert(*first);
69 self->interested_add.erase(first);
72 temp.swap(self->interested_remove);
73 for(PDBSinglePV::interested_remove_t::iterator it(temp.begin()),
74 end(temp.end()); it != end; ++it)
76 self->interested.erase(static_cast<PDBSingleMonitor*>(it->get()));
79 self->interested_iterating =
false;
81 self->finalizeMonitor();
85 }
catch(std::tr1::bad_weak_ptr&){
91 }
catch(std::exception& e){
92 std::cerr<<
"Unhandled exception in pdb_single_event(): "<<e.what()<<
"\n"
93 <<SHOW_EXCEPTION(e)<<
"\n";
97 PDBSinglePV::PDBSinglePV(
DBCH& chan,
98 const PDBProvider::shared_pointer& prov)
101 ,interested_iterating(false)
104 ,hadevent_VALUE(false)
105 ,hadevent_PROPERTY(false)
107 if(ellCount(&chan.chan->pre_chain) || ellCount(&chan.chan->post_chain)) {
108 DBCH temp(dbChannelName(chan.chan));
109 this->chan2.swap(temp);
111 this->chan.swap(chan);
112 fielddesc = std::tr1::static_pointer_cast<
const pvd::Structure>(builder->dtype());
114 complete = pvd::getPVDataCreate()->createPVStructure(fielddesc);
116 pvif.reset(builder->attach(complete, temp));
118 epics::atomic::increment(num_instances);
121 PDBSinglePV::~PDBSinglePV()
123 epics::atomic::decrement(num_instances);
126 void PDBSinglePV::activate()
128 dbChannel *pchan = this->chan2.chan ? this->chan2.chan : this->chan.chan;
129 evt_VALUE.create(provider->event_context, this->chan, &pdb_single_event, DBE_VALUE|DBE_ALARM);
130 evt_PROPERTY.create(provider->event_context, pchan, &pdb_single_event, DBE_PROPERTY);
133 pva::Channel::shared_pointer
134 PDBSinglePV::connect(
const std::tr1::shared_ptr<PDBProvider>& prov,
135 const pva::ChannelRequester::shared_pointer& req)
137 PDBSingleChannel::shared_pointer ret(
new PDBSingleChannel(shared_from_this(), req));
139 ret->cred.update(req);
141 ret->aspvt.add(chan, ret->cred);
149 if(interested.empty() && interested_add.empty()) {
153 hadevent_VALUE =
false;
154 hadevent_PROPERTY =
false;
155 db_event_enable(evt_VALUE.subscript);
156 db_event_enable(evt_PROPERTY.subscript);
157 db_post_single_event(evt_VALUE.subscript);
158 db_post_single_event(evt_PROPERTY.subscript);
160 }
else if(hadevent_VALUE && hadevent_PROPERTY) {
165 if(interested_iterating) {
166 interested_add.insert(mon);
168 interested.insert(mon);
176 if(interested_add.erase(mon)) {
179 }
else if(interested_iterating) {
181 interested_remove.insert(mon->shared_from_this());
184 interested.erase(mon);
189 void PDBSinglePV::finalizeMonitor()
191 assert(!interested_iterating);
193 if(interested.empty()) {
194 db_event_disable(evt_VALUE.subscript);
195 db_event_disable(evt_PROPERTY.subscript);
199 PDBSingleChannel::PDBSingleChannel(
const PDBSinglePV::shared_pointer& pv,
200 const pva::ChannelRequester::shared_pointer& req)
201 :
BaseChannel(dbChannelName(pv->chan), pv->provider, req, pv->fielddesc)
205 epics::atomic::increment(num_instances);
208 PDBSingleChannel::~PDBSingleChannel()
210 epics::atomic::decrement(num_instances);
213 void PDBSingleChannel::printInfo(std::ostream& out)
219 out<<(&cred.user[0])<<
'@'<<(&cred.host[0]);
220 for(
size_t i=0, N=cred.groups.size(); i<N; i++) {
221 out<<
", "<<(&cred.groups[i][0]);
226 pva::ChannelPut::shared_pointer
227 PDBSingleChannel::createChannelPut(
228 pva::ChannelPutRequester::shared_pointer
const & requester,
229 pvd::PVStructure::shared_pointer
const & pvRequest)
231 PDBSinglePut::shared_pointer ret(
new PDBSinglePut(shared_from_this(), requester, pvRequest));
232 requester->channelPutConnect(pvd::Status(), ret, fielddesc);
237 pva::Monitor::shared_pointer
238 PDBSingleChannel::createMonitor(
239 pva::MonitorRequester::shared_pointer
const & requester,
240 pvd::PVStructure::shared_pointer
const & pvRequest)
242 PDBSingleMonitor::shared_pointer ret(
new PDBSingleMonitor(pv->shared_from_this(), requester, pvRequest));
244 assert(!!pv->complete);
246 ret->connect(G, pv->complete);
252 int single_put_callback(
struct processNotify *notify,notifyPutType type)
256 if(notify->status!=notifyOK)
return 0;
261 case putDisabledType:
266 self->wait_pvif->get(*self->wait_changed);
270 self->wait_pvif->get(*self->wait_changed);
277 void single_done_callback(
struct processNotify *notify)
283 if(epics::atomic::compareAndSwap(self->notifyBusy, 1, 0)==0) {
284 std::cerr<<
"PDBSinglePut dbNotify state error?\n";
287 switch(notify->status) {
293 sts = pvd::Status::error(
"Error in dbNotify");
295 case notifyPutDisabled:
296 sts = pvd::Status::error(
"Put disabled");
300 PDBSinglePut::requester_type::shared_pointer req(self->requester.lock());
302 req->putDone(sts, self->shared_from_this());
305 PDBSinglePut::PDBSinglePut(
const PDBSingleChannel::shared_pointer &channel,
306 const pva::ChannelPutRequester::shared_pointer &requester,
307 const pvd::PVStructure::shared_pointer &pvReq)
309 ,requester(requester)
310 ,changed(new pvd::BitSet(channel->fielddesc->getNumberFields()))
311 ,pvf(pvd::getPVDataCreate()->createPVStructure(channel->fielddesc))
312 ,pvif(channel->pv->builder->attach(pvf,
FieldName()))
314 ,doProc(
PVIF::ProcPassive)
317 epics::atomic::increment(num_instances);
318 dbChannel *chan = channel->pv->chan;
321 getS<pvd::boolean>(pvReq,
"record._options.block", doWait);
322 }
catch(std::runtime_error& e) {
323 requester->message(std::string(
"block= not understood : ")+e.what(), pva::warningMessage);
327 if(getS<std::string>(pvReq,
"record._options.process", proccmd)) {
328 if(proccmd==
"true") {
329 doProc = PVIF::ProcForce;
330 }
else if(proccmd==
"false") {
331 doProc = PVIF::ProcInhibit;
333 }
else if(proccmd==
"passive") {
334 doProc = PVIF::ProcPassive;
336 requester->message(
"process= expects: true|false|passive", pva::warningMessage);
340 memset((
void*)¬ify, 0,
sizeof(notify));
341 notify.usrPvt = (
void*)
this;
343 notify.putCallback = &single_put_callback;
344 notify.doneCallback = &single_done_callback;
347 PDBSinglePut::~PDBSinglePut()
350 epics::atomic::decrement(num_instances);
353 void PDBSinglePut::put(pvd::PVStructure::shared_pointer
const & value,
354 pvd::BitSet::shared_pointer
const & changed)
356 dbChannel *chan = channel->pv->chan;
357 dbFldDes *fld = dbChannelFldDes(chan);
360 asTrapWriteWithData(channel->aspvt.aspvt,
361 std::string(channel->cred.user.begin(), channel->cred.user.end()).c_str(),
362 std::string(channel->cred.host.begin(), channel->cred.host.end()).c_str(),
365 chan->final_no_elements,
371 if(!channel->aspvt.canWrite()) {
372 ret = pvd::Status::error(
"Put not permitted");
374 }
else if(dbChannelFieldType(chan)>=DBF_INLINK && dbChannelFieldType(chan)<=DBF_FWDLINK) {
376 std::string lval(value->getSubFieldT<pvd::PVScalar>(
"value")->getAs<std::string>());
377 long status = dbChannelPutField(chan, DBF_STRING, lval.c_str(), 1);
379 ret = pvd::Status(pvd::Status::error(
"dbPutField() error"));
380 }
catch(std::exception& e) {
381 std::ostringstream strm;
382 strm<<
"Failed to put link field "<<dbChannelName(chan)<<
"."<<fld->name<<
" : "<<e.what()<<
"\n";
383 ret = pvd::Status(pvd::Status::error(strm.str()));
390 p2p::auto_ptr<PVIF> putpvif(channel->pv->builder->attach(value,
FieldName()));
391 unsigned mask = putpvif->dbe(*changed);
393 if(mask!=DBE_VALUE) {
394 requester_type::shared_pointer req(requester.lock());
396 req->message(
"block=true only supports .value (empty put mask)", pva::warningMessage);
399 if(epics::atomic::compareAndSwap(notifyBusy, 0, 1)!=0)
400 throw std::logic_error(
"Previous put() not complete");
402 notify.requestType = (mask&DBE_VALUE) ? putProcessRequest : processRequest;
404 wait_pvif = PTRMOVE(putpvif);
405 wait_changed = changed;
407 dbProcessNotify(¬ify);
412 p2p::auto_ptr<PVIF> putpvif(channel->pv->builder->attach(value,
FieldName()));
415 ret = putpvif->get(*changed, doProc);
417 }
catch(std::runtime_error& e){
418 ret = pvd::Status::error(e.what());
421 requester_type::shared_pointer req(requester.lock());
423 req->putDone(ret, shared_from_this());
426 void PDBSinglePut::cancel()
428 if(epics::atomic::compareAndSwap(notifyBusy, 1, 2)==1) {
429 dbNotifyCancel(¬ify);
430 wait_changed.reset();
432 epics::atomic::set(notifyBusy, 0);
436 void PDBSinglePut::get()
442 pvif->put(*changed, DBE_VALUE|DBE_ALARM|DBE_PROPERTY, FL.pfl);
448 requester_type::shared_pointer req(requester.lock());
450 req->getDone(pvd::Status(), shared_from_this(), pvf, changed);
453 PDBSingleMonitor::PDBSingleMonitor(
const PDBSinglePV::shared_pointer& pv,
454 const requester_t::shared_pointer& requester,
455 const pvd::PVStructure::shared_pointer& pvReq)
459 epics::atomic::increment(num_instances);
462 PDBSingleMonitor::~PDBSingleMonitor()
465 epics::atomic::decrement(num_instances);
468 void PDBSingleMonitor::destroy()
470 BaseMonitor::destroy();
473 void PDBSingleMonitor::onStart()
475 pv->addMonitor(
this);
478 void PDBSingleMonitor::onStop()
482 pv->removeMonitor(
this);
virtual void requestUpdate() OVERRIDE FINAL
bool post(guard_t &guard, const epics::pvData::BitSet &updated, no_overflow)
post update if queue not full, if full return false w/o overflow