5 #include <epicsStdio.h>
7 #include <epicsAtomic.h>
10 #include <dbStaticLib.h>
13 #include <pv/pvAccess.h>
14 #include <pv/configuration.h>
15 #include <pv/epicsException.h>
21 namespace pvd = epics::pvData;
22 namespace pva = epics::pvAccess;
24 size_t PDBGroupPV::num_instances;
25 size_t PDBGroupChannel::num_instances;
26 size_t PDBGroupPut::num_instances;
27 size_t PDBGroupMonitor::num_instances;
29 typedef epicsGuard<epicsMutex> Guard;
31 void pdb_group_event(
void *user_arg,
struct dbChannel *chan,
32 int eventsRemaining,
struct db_field_log *pfl)
35 unsigned idx = evt->index;
37 PDBGroupPV::shared_pointer
self(std::tr1::static_pointer_cast<
PDBGroupPV>(((
PDBGroupPV*)evt->self)->shared_from_this()));
40 PDBGroupPV::interested_remove_t temp;
45 self->scratch.clear();
46 if(evt->dbe_mask&DBE_PROPERTY || !self->monatomic)
49 self->members[idx].pvif->put(self->scratch, evt->dbe_mask, pfl);
54 DBManyLocker L(info.locker);
55 FOREACH(PDBGroupPV::Info::triggers_t::const_iterator, it, end, info.triggers)
59 LocalFL FL(NULL, self->members[i].chan);
60 self->members[i].pvif->put(self->scratch, evt->dbe_mask, FL.pfl);
64 if(!(evt->dbe_mask&DBE_PROPERTY)) {
65 if(!info.had_initial_VALUE) {
66 info.had_initial_VALUE =
true;
67 assert(self->initial_waits>0);
68 self->initial_waits--;
71 if(!info.had_initial_PROPERTY) {
72 info.had_initial_PROPERTY =
true;
73 assert(self->initial_waits>0);
74 self->initial_waits--;
78 if(self->initial_waits==0) {
79 self->interested_iterating =
true;
81 FOREACH(PDBGroupPV::interested_t::const_iterator, it, end, self->interested) {
83 mon.
post(G, self->scratch);
89 assert(self->interested_iterating);
91 while(!self->interested_add.empty()) {
92 PDBGroupPV::interested_t::iterator first(self->interested_add.begin());
93 self->interested.insert(*first);
94 self->interested_add.erase(first);
97 temp.swap(self->interested_remove);
98 for(PDBGroupPV::interested_remove_t::iterator it(temp.begin()),
99 end(temp.end()); it != end; ++it)
101 self->interested.erase(static_cast<PDBGroupMonitor*>(it->get()));
104 self->interested_iterating =
false;
106 self->finalizeMonitor();
111 }
catch(std::tr1::bad_weak_ptr&){
117 }
catch(std::exception& e){
118 std::cerr<<
"Unhandled exception in pdb_group_event(): "<<e.what()<<
"\n"
119 <<SHOW_EXCEPTION(e)<<
"\n";
123 PDBGroupPV::PDBGroupPV()
126 ,interested_iterating(false)
129 epics::atomic::increment(num_instances);
132 PDBGroupPV::~PDBGroupPV()
134 epics::atomic::decrement(num_instances);
137 pva::Channel::shared_pointer
138 PDBGroupPV::connect(
const std::tr1::shared_ptr<PDBProvider>& prov,
139 const pva::ChannelRequester::shared_pointer& req)
141 PDBGroupChannel::shared_pointer ret(
new PDBGroupChannel(shared_from_this(), prov, req));
143 ret->cred.update(req);
145 ret->aspvt.resize(members.size());
146 for(
size_t i=0, N=members.size(); i<N; i++)
148 ret->aspvt[i].add(members[i].chan, ret->cred);
158 if(interested.empty() && interested_add.empty()) {
163 for(
size_t i=0; i<members.size(); i++) {
166 if(!!info.evt_VALUE) {
167 db_event_enable(info.evt_VALUE.subscript);
168 db_post_single_event(info.evt_VALUE.subscript);
170 info.had_initial_VALUE =
false;
172 info.had_initial_VALUE =
true;
174 assert(info.evt_PROPERTY.subscript);
175 db_event_enable(info.evt_PROPERTY.subscript);
176 db_post_single_event(info.evt_PROPERTY.subscript);
178 info.had_initial_PROPERTY =
false;
180 initial_waits = ievts;
182 }
else if(initial_waits==0) {
187 if(interested_iterating)
188 interested_add.insert(mon);
190 interested.insert(mon);
198 if(interested_add.erase(mon)) {
201 }
else if(interested_iterating) {
203 interested_remove.insert(mon->shared_from_this());
206 interested.erase(mon);
212 void PDBGroupPV::finalizeMonitor()
214 assert(!interested_iterating);
216 if(!interested.empty())
220 for(
size_t i=0; i<members.size(); i++) {
223 if(!!info.evt_VALUE) {
224 db_event_disable(info.evt_VALUE.subscript);
226 db_event_disable(info.evt_PROPERTY.subscript);
230 void PDBGroupPV::show(
int lvl)
234 printf(
" Atomic Get/Put:%s Monitor:%s Members:%zu\n",
235 pgatomic?
"yes":
"no", monatomic?
"yes":
"no", members.size());
240 for(members_t::const_iterator it(members.begin()), end(members.end());
243 const Info& info = *it;
245 info.attachment.show();
246 printf(
"\t<-> %s\n", dbChannelName(info.chan));
251 PDBGroupChannel::PDBGroupChannel(
const PDBGroupPV::shared_pointer& pv,
252 const std::tr1::shared_ptr<pva::ChannelProvider>& prov,
253 const pva::ChannelRequester::shared_pointer& req)
257 epics::atomic::increment(num_instances);
260 PDBGroupChannel::~PDBGroupChannel()
262 epics::atomic::decrement(num_instances);
265 void PDBGroupChannel::printInfo(std::ostream& out)
267 out<<
"PDBGroupChannel";
270 pva::ChannelPut::shared_pointer
271 PDBGroupChannel::createChannelPut(
272 pva::ChannelPutRequester::shared_pointer
const & requester,
273 pvd::PVStructure::shared_pointer
const & pvRequest)
275 PDBGroupPut::shared_pointer ret(
new PDBGroupPut(shared_from_this(), requester, pvRequest));
276 requester->channelPutConnect(pvd::Status(), ret, fielddesc);
280 pva::Monitor::shared_pointer
281 PDBGroupChannel::createMonitor(
282 pva::MonitorRequester::shared_pointer
const & requester,
283 pvd::PVStructure::shared_pointer
const & pvRequest)
285 PDBGroupMonitor::shared_pointer ret(
new PDBGroupMonitor(pv->shared_from_this(), requester, pvRequest));
287 assert(!!pv->complete);
289 ret->connect(G, pv->complete);
295 PDBGroupPut::PDBGroupPut(
const PDBGroupChannel::shared_pointer& channel,
296 const requester_type::shared_pointer& requester,
297 const epics::pvData::PVStructure::shared_pointer &pvReq)
299 ,requester(requester)
300 ,atomic(channel->pv->pgatomic)
302 ,doProc(
PVIF::ProcPassive)
303 ,changed(new pvd::BitSet(channel->fielddesc->getNumberFields()))
304 ,pvf(pvd::getPVDataCreate()->createPVStructure(channel->fielddesc))
306 epics::atomic::increment(num_instances);
308 getS<pvd::boolean>(pvReq,
"record._options.atomic", atomic);
310 getS<pvd::boolean>(pvReq,
"record._options.block", doWait);
313 if(getS<std::string>(pvReq,
"record._options.process", proccmd)) {
314 if(proccmd==
"true") {
315 doProc = PVIF::ProcForce;
316 }
else if(proccmd==
"false") {
317 doProc = PVIF::ProcInhibit;
319 }
else if(proccmd==
"passive") {
320 doProc = PVIF::ProcPassive;
322 requester->message(
"process= expects: true|false|passive", pva::warningMessage);
325 }
catch(std::exception& e){
326 requester->message(std::string(
"Error processing request options: ")+e.what());
329 pvf->getSubFieldT<pvd::PVBoolean>(
"record._options.atomic")->put(atomic);
332 const size_t npvs = channel->pv->members.size();
334 for(
size_t i=0; i<npvs; i++)
338 pvif[i].reset(info.builder->attach(pvf, info.attachment));
342 PDBGroupPut::~PDBGroupPut()
344 epics::atomic::decrement(num_instances);
347 void PDBGroupPut::put(pvd::PVStructure::shared_pointer
const & value,
348 pvd::BitSet::shared_pointer
const & changed)
351 const size_t npvs = channel->pv->members.size();
352 std::vector<std::tr1::shared_ptr<PVIF> > putpvif(npvs);
353 pvd::shared_vector<AsWritePvt> asWritePvt(npvs);
355 for(
size_t i=0; i<npvs; i++)
360 channel->aspvt.at(i).aspvt,
361 &channel->cred.user[0],
362 &channel->cred.host[0],
364 info.chan->final_type,
365 info.chan->final_no_elements,
369 asWritePvt[i].swap(wrt);
371 if(!info.allowProc)
continue;
372 putpvif[i].reset(info.builder->attach(value, info.attachment));
377 DBManyLocker L(channel->pv->locker);
378 for(
size_t i=0; ret && i<npvs; i++) {
379 if(!putpvif[i].
get())
continue;
381 ret |= putpvif[i]->get(*changed, doProc, channel->aspvt[i].canWrite());
385 for(
size_t i=0; ret && i<npvs; i++)
387 if(!putpvif[i].
get())
continue;
393 ret |= putpvif[i]->get(*changed,
394 info.allowProc ? doProc : PVIF::ProcInhibit,
395 channel->aspvt[i].canWrite());
399 requester_type::shared_pointer req(requester.lock());
401 req->putDone(ret, shared_from_this());
404 void PDBGroupPut::get()
406 const size_t npvs = pvif.size();
410 DBManyLocker L(channel->pv->locker);
411 for(
size_t i=0; i<npvs; i++) {
412 LocalFL FL(NULL, channel->pv->members[i].chan);
413 pvif[i]->put(*changed, DBE_VALUE|DBE_ALARM|DBE_PROPERTY, FL.pfl);
417 for(
size_t i=0; i<npvs; i++)
423 pvif[i]->put(*changed, DBE_VALUE|DBE_ALARM|DBE_PROPERTY, FL.pfl);
430 requester_type::shared_pointer req(requester.lock());
432 req->getDone(pvd::Status(), shared_from_this(), pvf, changed);
435 PDBGroupMonitor::PDBGroupMonitor(
const PDBGroupPV::shared_pointer& pv,
436 const epics::pvAccess::MonitorRequester::weak_pointer &requester,
437 const pvd::PVStructure::shared_pointer& pvReq)
441 epics::atomic::increment(num_instances);
444 PDBGroupMonitor::~PDBGroupMonitor()
447 epics::atomic::decrement(num_instances);
450 void PDBGroupMonitor::destroy()
452 BaseMonitor::destroy();
453 PDBGroupPV::shared_pointer pv;
460 void PDBGroupMonitor::onStart()
462 pv->addMonitor(
this);
465 void PDBGroupMonitor::onStop()
467 pv->removeMonitor(
this);
bool post(guard_t &guard, const epics::pvData::BitSet &updated, no_overflow)
post update if queue not full, if full return false w/o overflow
virtual void requestUpdate() OVERRIDE FINAL