4 #include <pv/reftrack.h>
8 int pvaLinkNWorkers = 1;
12 pvaGlobal_t *pvaGlobal;
15 pvaGlobal_t::pvaGlobal_t()
16 :create(pvd::getPVDataCreate())
21 queue.start(std::max(1, pvaLinkNWorkers), epicsThreadPriorityMedium);
24 pvaGlobal_t::~pvaGlobal_t()
28 size_t pvaLinkChannel::num_instances;
29 size_t pvaLink::num_instances;
32 bool pvaLinkChannel::LinkSort::operator()(
const pvaLink *L,
const pvaLink *R)
const {
33 if(L->monorder==R->monorder)
35 return L->monorder < R->monorder;
39 pvaLinkChannel::pvaLinkChannel(
const pvaGlobal_t::channels_key_t &key,
const pvd::PVStructure::const_shared_pointer& pvRequest)
45 ,connected_latched(false)
53 pvaLinkChannel::~pvaLinkChannel() {
55 Guard G(pvaGlobal->lock);
56 pvaGlobal->channels.erase(key);
61 assert(links.empty());
62 REFTRACE_DECREMENT(num_instances);
65 void pvaLinkChannel::open()
70 chan = pvaGlobal->provider_local.connect(key.first);
71 DEBUG(
this, <<key.first<<
" OPEN Local");
72 providerName = pvaGlobal->provider_local.name();
73 }
catch(std::exception& e){
78 DEBUG(
this, <<key.first<<
" OPEN Not local "<<e.what());
80 if(!pvaLinkIsolate && !chan) {
81 chan = pvaGlobal->provider_remote.connect(key.first);
82 DEBUG(
this, <<key.first<<
" OPEN Remote ");
83 providerName = pvaGlobal->provider_remote.name();
86 op_mon = chan.monitor(
this, pvRequest);
88 REFTRACE_INCREMENT(num_instances);
92 pvd::StructureConstPtr putRequestType = pvd::getFieldCreate()->createFieldBuilder()
93 ->addNestedStructure(
"field")
95 ->addNestedStructure(
"record")
96 ->addNestedStructure(
"_options")
97 ->add(
"block", pvd::pvBoolean)
98 ->add(
"process", pvd::pvString)
104 void pvaLinkChannel::put(
bool force)
106 pvd::PVStructurePtr pvReq(pvd::getPVDataCreate()->createPVStructure(putRequestType));
107 pvReq->getSubFieldT<pvd::PVBoolean>(
"record._options.block")->put(!after_put.empty());
109 unsigned reqProcess = 0;
111 for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
115 if(!link->used_scratch)
continue;
117 pvd::shared_vector<const void> temp;
118 temp.swap(link->put_scratch);
119 link->used_scratch =
false;
120 temp.swap(link->put_queue);
121 link->used_queue =
true;
129 case pvaLink::Default:
145 const char *proc =
"passive";
146 if((reqProcess&2) || force) {
148 }
else if(reqProcess&1) {
151 pvReq->getSubFieldT<pvd::PVString>(
"record._options.process")->put(proc);
153 DEBUG(
this, <<key.first<<
"Start put "<<doit);
156 op_put = chan.put(
this, pvReq);
160 void pvaLinkChannel::putBuild(
const epics::pvData::StructureConstPtr& build, pvac::ClientChannel::PutCallback::Args& args)
164 pvd::PVStructurePtr top(pvaGlobal->create->createPVStructure(build));
166 for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
170 if(!link->used_queue)
continue;
171 link->used_queue =
false;
173 pvd::PVFieldPtr value(link->fieldName.empty() ? pvd::PVFieldPtr(top) : top->getSubField(link->fieldName));
174 if(value && value->getField()->getType()==pvd::structure) {
176 pvd::PVFieldPtr sub(static_cast<pvd::PVStructure*>(value.get())->getSubField(
"value"));
183 pvd::PVStringArray::const_svector choices;
185 DEBUG(
this, <<key.first<<
" <- "<<value->getFullName());
186 copyDBF2PVD(link->put_queue, value, args.tosend, choices);
188 link->put_queue.clear();
190 DEBUG(
this, <<key.first<<
" Put built");
198 std::tr1::shared_ptr<pvaLinkChannel> chan;
199 AFLinker(
const std::tr1::shared_ptr<pvaLinkChannel>& chan) :chan(chan) {}
200 void operator()(pvaLinkChannel::AfterPut *) {
206 void pvaLinkChannel::putDone(
const pvac::PutEvent& evt)
208 if(evt.event==pvac::PutEvent::Fail) {
209 errlogPrintf(
"%s PVA link put ERROR: %s\n", key.first.c_str(), evt.message.c_str());
216 DEBUG(
this, <<key.first<<
" Put result "<<evt.event);
218 needscans = !after_put.empty();
219 op_put = pvac::Operation();
221 if(evt.event==pvac::PutEvent::Success) {
228 pvaGlobal->queue.add(AP);
232 void pvaLinkChannel::AfterPut::run()
234 std::set<dbCommon*> toscan;
235 std::tr1::shared_ptr<pvaLinkChannel> link(lc.lock());
241 toscan.swap(link->after_put);
244 for(after_put_t::iterator it=toscan.begin(), end=toscan.end();
247 dbCommon *prec = *it;
250 (prec)->rset->process(prec);
254 errlogPrintf(
"%s : not PACT when async PVA link completed. Logic error?\n", prec->name);
261 void pvaLinkChannel::monitorEvent(
const pvac::MonitorEvent& evt)
266 DEBUG(
this, <<key.first<<
" EVENT "<<evt.event);
270 case pvac::MonitorEvent::Disconnect:
271 case pvac::MonitorEvent::Data:
272 connected = evt.event == pvac::MonitorEvent::Data;
275 case pvac::MonitorEvent::Cancel:
277 case pvac::MonitorEvent::Fail:
280 errlogPrintf(
"%s: PVA link monitor ERROR: %s\n", chan.name().c_str(), evt.message.c_str());
291 pvaGlobal->queue.add(shared_from_this());
297 void pvaLinkChannel::run_dbProcess(
size_t idx)
299 dbCommon *precord = scan_records[idx];
301 if(scan_check_passive[idx] && precord->scan!=0) {
304 }
else if(connected_latched && !op_mon.changed.logical_and(scan_changed[idx])) {
307 }
else if (precord->pact) {
309 printf(
"%s: Active %s\n",
310 epicsThreadGetNameSelf(), precord->name);
311 precord->rpro = TRUE;
318 void pvaLinkChannel::run()
320 bool requeue =
false;
326 connected_latched = connected;
330 if(connected && !op_mon.poll()) {
331 DEBUG(
this, <<key.first<<
" RUN "<<
"empty");
336 DEBUG(
this, <<key.first<<
" RUN "<<(connected_latched?
"connected":
"disconnected"));
338 assert(!connected || !!op_mon.root);
344 op_put = pvac::Operation();
346 for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
349 link->onDisconnect();
356 }
else if(previous_root.get() != (
const void*)op_mon.root.get()) {
359 for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
362 link->onTypeChange();
365 previous_root = std::tr1::static_pointer_cast<
const void>(op_mon.root);
370 requeue = queued = connected_latched;
376 scan_records.clear();
377 scan_check_passive.clear();
378 scan_changed.clear();
380 for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
383 assert(link && link->alive);
385 if(!link->plink)
continue;
388 if(link->type!=DBF_INLINK)
394 if(link->pp != pvaLink::PP && link->pp != pvaLink::CPP && link->pp != pvaLink::CP)
397 scan_records.push_back(link->plink->precord);
398 scan_check_passive.push_back(link->pp != pvaLink::CP);
399 scan_changed.push_back(link->proc_changed);
402 DBManyLock ML(scan_records);
404 atomic_lock.swap(ML);
406 links_changed =
false;
410 if(scan_records.empty()) {
413 }
else if(isatomic && scan_records.size() > 1u) {
414 DBManyLocker L(atomic_lock);
416 for(
size_t i=0, N=scan_records.size(); i<N; i++) {
421 for(
size_t i=0, N=scan_records.size(); i<N; i++) {
429 pvaGlobal->queue.add(shared_from_this());