pva2pva  1.4.1
 All Classes Functions Variables Pages
pvalink_channel.cpp
1 
2 #include <alarm.h>
3 
4 #include <pv/reftrack.h>
5 
6 #include "pvalink.h"
7 
8 int pvaLinkNWorkers = 1;
9 
10 namespace pvalink {
11 
12 pvaGlobal_t *pvaGlobal;
13 
14 
15 pvaGlobal_t::pvaGlobal_t()
16  :create(pvd::getPVDataCreate())
17  ,queue("PVAL")
18  ,running(false)
19 {
20  // worker should be above PVA worker priority?
21  queue.start(std::max(1, pvaLinkNWorkers), epicsThreadPriorityMedium);
22 }
23 
24 pvaGlobal_t::~pvaGlobal_t()
25 {
26 }
27 
28 size_t pvaLinkChannel::num_instances;
29 size_t pvaLink::num_instances;
30 
31 
32 bool pvaLinkChannel::LinkSort::operator()(const pvaLink *L, const pvaLink *R) const {
33  if(L->monorder==R->monorder)
34  return L < R;
35  return L->monorder < R->monorder;
36 }
37 
38 // being called with pvaGlobal::lock held
39 pvaLinkChannel::pvaLinkChannel(const pvaGlobal_t::channels_key_t &key, const pvd::PVStructure::const_shared_pointer& pvRequest)
40  :key(key)
41  ,pvRequest(pvRequest)
42  ,num_disconnect(0u)
43  ,num_type_change(0u)
44  ,connected(false)
45  ,connected_latched(false)
46  ,isatomic(false)
47  ,queued(false)
48  ,debug(false)
49  ,links_changed(false)
50  ,AP(new AfterPut)
51 {}
52 
53 pvaLinkChannel::~pvaLinkChannel() {
54  {
55  Guard G(pvaGlobal->lock);
56  pvaGlobal->channels.erase(key);
57  }
58 
59  Guard G(lock);
60 
61  assert(links.empty());
62  REFTRACE_DECREMENT(num_instances);
63 }
64 
65 void pvaLinkChannel::open()
66 {
67  Guard G(lock);
68 
69  try {
70  chan = pvaGlobal->provider_local.connect(key.first);
71  DEBUG(this, <<key.first<<" OPEN Local");
72  providerName = pvaGlobal->provider_local.name();
73  } catch(std::exception& e){
74  // The PDBProvider doesn't have a way to communicate to us
75  // whether this is an invalid record or group name,
76  // or if this is some sort of internal error.
77  // So we are forced to assume it is an invalid name.
78  DEBUG(this, <<key.first<<" OPEN Not local "<<e.what());
79  }
80  if(!pvaLinkIsolate && !chan) {
81  chan = pvaGlobal->provider_remote.connect(key.first);
82  DEBUG(this, <<key.first<<" OPEN Remote ");
83  providerName = pvaGlobal->provider_remote.name();
84  }
85 
86  op_mon = chan.monitor(this, pvRequest);
87 
88  REFTRACE_INCREMENT(num_instances);
89 }
90 
91 static
92 pvd::StructureConstPtr putRequestType = pvd::getFieldCreate()->createFieldBuilder()
93  ->addNestedStructure("field")
94  ->endNested()
95  ->addNestedStructure("record")
96  ->addNestedStructure("_options")
97  ->add("block", pvd::pvBoolean)
98  ->add("process", pvd::pvString) // "true", "false", or "passive"
99  ->endNested()
100  ->endNested()
101  ->createStructure();
102 
103 // call with channel lock held
104 void pvaLinkChannel::put(bool force)
105 {
106  pvd::PVStructurePtr pvReq(pvd::getPVDataCreate()->createPVStructure(putRequestType));
107  pvReq->getSubFieldT<pvd::PVBoolean>("record._options.block")->put(!after_put.empty());
108 
109  unsigned reqProcess = 0;
110  bool doit = force;
111  for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
112  {
113  pvaLink *link = *it;
114 
115  if(!link->used_scratch) continue;
116 
117  pvd::shared_vector<const void> temp;
118  temp.swap(link->put_scratch);
119  link->used_scratch = false;
120  temp.swap(link->put_queue);
121  link->used_queue = true;
122 
123  doit = true;
124 
125  switch(link->pp) {
126  case pvaLink::NPP:
127  reqProcess |= 1;
128  break;
129  case pvaLink::Default:
130  break;
131  case pvaLink::PP:
132  case pvaLink::CP:
133  case pvaLink::CPP:
134  reqProcess |= 2;
135  break;
136  }
137  }
138 
139  /* By default, use remote default (passive).
140  * Request processing, or not, if any link asks.
141  * Prefer PP over NPP if both are specified.
142  *
143  * TODO: per field granularity?
144  */
145  const char *proc = "passive";
146  if((reqProcess&2) || force) {
147  proc = "true";
148  } else if(reqProcess&1) {
149  proc = "false";
150  }
151  pvReq->getSubFieldT<pvd::PVString>("record._options.process")->put(proc);
152 
153  DEBUG(this, <<key.first<<"Start put "<<doit);
154  if(doit) {
155  // start net Put, cancels in-progress put
156  op_put = chan.put(this, pvReq);
157  }
158 }
159 
160 void pvaLinkChannel::putBuild(const epics::pvData::StructureConstPtr& build, pvac::ClientChannel::PutCallback::Args& args)
161 {
162  Guard G(lock);
163 
164  pvd::PVStructurePtr top(pvaGlobal->create->createPVStructure(build));
165 
166  for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
167  {
168  pvaLink *link = *it;
169 
170  if(!link->used_queue) continue;
171  link->used_queue = false; // clear early so unexpected exception won't get us in a retry loop
172 
173  pvd::PVFieldPtr value(link->fieldName.empty() ? pvd::PVFieldPtr(top) : top->getSubField(link->fieldName));
174  if(value && value->getField()->getType()==pvd::structure) {
175  // maybe drill into NTScalar et al.
176  pvd::PVFieldPtr sub(static_cast<pvd::PVStructure*>(value.get())->getSubField("value"));
177  if(sub)
178  value.swap(sub);
179  }
180 
181  if(!value) continue; // TODO: how to signal error?
182 
183  pvd::PVStringArray::const_svector choices; // TODO populate from op_mon
184 
185  DEBUG(this, <<key.first<<" <- "<<value->getFullName());
186  copyDBF2PVD(link->put_queue, value, args.tosend, choices);
187 
188  link->put_queue.clear();
189  }
190  DEBUG(this, <<key.first<<" Put built");
191 
192  args.root = top;
193 }
194 
195 namespace {
196 // soo much easier with c++11 std::shared_ptr...
197 struct AFLinker {
198  std::tr1::shared_ptr<pvaLinkChannel> chan;
199  AFLinker(const std::tr1::shared_ptr<pvaLinkChannel>& chan) :chan(chan) {}
200  void operator()(pvaLinkChannel::AfterPut *) {
201  chan.reset();
202  }
203 };
204 } // namespace
205 
206 void pvaLinkChannel::putDone(const pvac::PutEvent& evt)
207 {
208  if(evt.event==pvac::PutEvent::Fail) {
209  errlogPrintf("%s PVA link put ERROR: %s\n", key.first.c_str(), evt.message.c_str());
210  }
211 
212  bool needscans;
213  {
214  Guard G(lock);
215 
216  DEBUG(this, <<key.first<<" Put result "<<evt.event);
217 
218  needscans = !after_put.empty();
219  op_put = pvac::Operation();
220 
221  if(evt.event==pvac::PutEvent::Success) {
222  // see if we need start a queue'd put
223  put();
224  }
225  }
226 
227  if(needscans) {
228  pvaGlobal->queue.add(AP);
229  }
230 }
231 
232 void pvaLinkChannel::AfterPut::run()
233 {
234  std::set<dbCommon*> toscan;
235  std::tr1::shared_ptr<pvaLinkChannel> link(lc.lock());
236  if(!link)
237  return;
238 
239  {
240  Guard G(link->lock);
241  toscan.swap(link->after_put);
242  }
243 
244  for(after_put_t::iterator it=toscan.begin(), end=toscan.end();
245  it!=end; ++it)
246  {
247  dbCommon *prec = *it;
248  dbScanLock(prec);
249  if(prec->pact) { // complete async. processing
250  (prec)->rset->process(prec);
251 
252  } else {
253  // maybe the result of "cancellation" or some record support logic error?
254  errlogPrintf("%s : not PACT when async PVA link completed. Logic error?\n", prec->name);
255  }
256  dbScanUnlock(prec);
257  }
258 
259 }
260 
261 void pvaLinkChannel::monitorEvent(const pvac::MonitorEvent& evt)
262 {
263  bool queue = false;
264 
265  {
266  DEBUG(this, <<key.first<<" EVENT "<<evt.event);
267  Guard G(lock);
268 
269  switch(evt.event) {
270  case pvac::MonitorEvent::Disconnect:
271  case pvac::MonitorEvent::Data:
272  connected = evt.event == pvac::MonitorEvent::Data;
273  queue = true;
274  break;
275  case pvac::MonitorEvent::Cancel:
276  break; // no-op
277  case pvac::MonitorEvent::Fail:
278  connected = false;
279  queue = true;
280  errlogPrintf("%s: PVA link monitor ERROR: %s\n", chan.name().c_str(), evt.message.c_str());
281  break;
282  }
283 
284  if(queued)
285  return; // already scheduled
286 
287  queued = queue;
288  }
289 
290  if(queue) {
291  pvaGlobal->queue.add(shared_from_this());
292  }
293 }
294 
295 // the work in calling dbProcess() which is common to
296 // both dbScanLock() and dbScanLockMany()
297 void pvaLinkChannel::run_dbProcess(size_t idx)
298 {
299  dbCommon *precord = scan_records[idx];
300 
301  if(scan_check_passive[idx] && precord->scan!=0) {
302  return;
303 
304  } else if(connected_latched && !op_mon.changed.logical_and(scan_changed[idx])) {
305  return;
306 
307  } else if (precord->pact) {
308  if (precord->tpro)
309  printf("%s: Active %s\n",
310  epicsThreadGetNameSelf(), precord->name);
311  precord->rpro = TRUE;
312 
313  }
314  dbProcess(precord);
315 }
316 
317 // Running from global WorkQueue thread
318 void pvaLinkChannel::run()
319 {
320  bool requeue = false;
321  {
322  Guard G(lock);
323 
324  queued = false;
325 
326  connected_latched = connected;
327 
328  // pop next update from monitor queue.
329  // still under lock to safeguard concurrent calls to lset functions
330  if(connected && !op_mon.poll()) {
331  DEBUG(this, <<key.first<<" RUN "<<"empty");
332  run_done.signal();
333  return; // monitor queue is empty, nothing more to do here
334  }
335 
336  DEBUG(this, <<key.first<<" RUN "<<(connected_latched?"connected":"disconnected"));
337 
338  assert(!connected || !!op_mon.root);
339 
340  if(!connected) {
341  num_disconnect++;
342 
343  // cancel pending put operations
344  op_put = pvac::Operation();
345 
346  for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
347  {
348  pvaLink *link = *it;
349  link->onDisconnect();
350  }
351 
352  // Don't clear previous_root on disconnect.
353  // We will usually re-connect with the same type,
354  // and may get back the same PVStructure.
355 
356  } else if(previous_root.get() != (const void*)op_mon.root.get()) {
357  num_type_change++;
358 
359  for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
360  {
361  pvaLink *link = *it;
362  link->onTypeChange();
363  }
364 
365  previous_root = std::tr1::static_pointer_cast<const void>(op_mon.root);
366  }
367 
368  // at this point we know we will re-queue, but not immediately
369  // so an expected error won't get us stuck in a tight loop.
370  requeue = queued = connected_latched;
371 
372  if(links_changed) {
373  // a link has been added or removed since the last update.
374  // rebuild our cached list of records to (maybe) process.
375 
376  scan_records.clear();
377  scan_check_passive.clear();
378  scan_changed.clear();
379 
380  for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
381  {
382  pvaLink *link = *it;
383  assert(link && link->alive);
384 
385  if(!link->plink) continue;
386 
387  // only scan on monitor update for input links
388  if(link->type!=DBF_INLINK)
389  continue;
390 
391  // NPP and none/Default don't scan
392  // PP, CP, and CPP do scan
393  // PP and CPP only if SCAN=Passive
394  if(link->pp != pvaLink::PP && link->pp != pvaLink::CPP && link->pp != pvaLink::CP)
395  continue;
396 
397  scan_records.push_back(link->plink->precord);
398  scan_check_passive.push_back(link->pp != pvaLink::CP);
399  scan_changed.push_back(link->proc_changed);
400  }
401 
402  DBManyLock ML(scan_records);
403 
404  atomic_lock.swap(ML);
405 
406  links_changed = false;
407  }
408  }
409 
410  if(scan_records.empty()) {
411  // Nothing to do, so don't bother locking
412 
413  } else if(isatomic && scan_records.size() > 1u) {
414  DBManyLocker L(atomic_lock);
415 
416  for(size_t i=0, N=scan_records.size(); i<N; i++) {
417  run_dbProcess(i);
418  }
419 
420  } else {
421  for(size_t i=0, N=scan_records.size(); i<N; i++) {
422  DBScanLocker L(scan_records[i]);
423  run_dbProcess(i);
424  }
425  }
426 
427  if(requeue) {
428  // re-queue until monitor queue is empty
429  pvaGlobal->queue.add(shared_from_this());
430  } else {
431  run_done.signal();
432  }
433 }
434 
435 } // namespace pvalink