pva2pva  1.4.1
 All Classes Functions Variables Pages
pvahelper.h
1 #ifndef PVAHELPER_H
2 #define PVAHELPER_H
3 
4 #include <deque>
5 
6 #include <epicsGuard.h>
7 
8 #include <pv/pvAccess.h>
9 
10 template<typename T, typename A>
11 bool getS(const epics::pvData::PVStructurePtr& S, const char *name, A& val)
12 {
13  epics::pvData::PVScalarPtr F(S->getSubField<epics::pvData::PVScalar>(name));
14  if(F)
15  val = F->getAs<T>();
16  return !!F;
17 }
18 
19 struct BaseChannel : public epics::pvAccess::Channel
20 {
21  BaseChannel(const std::string& name,
22  const std::tr1::weak_ptr<epics::pvAccess::ChannelProvider>& prov,
23  const epics::pvAccess::ChannelRequester::shared_pointer& req,
24  const epics::pvData::StructureConstPtr& dtype
25  )
26  :pvname(name), provider(prov), requester(req), fielddesc(dtype)
27  {}
28  virtual ~BaseChannel() {}
29 
30  mutable epicsMutex lock;
31  typedef epicsGuard<epicsMutex> guard_t;
32 
33  const std::string pvname;
34  const epics::pvAccess::ChannelProvider::weak_pointer provider;
35  const requester_type::weak_pointer requester;
36  const epics::pvData::StructureConstPtr fielddesc;
37 
38  // assume Requester methods not called after destory()
39  virtual std::string getRequesterName() OVERRIDE
40  { return getChannelRequester()->getRequesterName(); }
41 
42  virtual void destroy() OVERRIDE FINAL {}
43 
44  virtual std::tr1::shared_ptr<epics::pvAccess::ChannelProvider> getProvider() OVERRIDE FINAL
45  { return epics::pvAccess::ChannelProvider::shared_pointer(provider); }
46  virtual std::string getRemoteAddress() OVERRIDE
47  { return getRequesterName(); }
48 
49  virtual std::string getChannelName() OVERRIDE FINAL { return pvname; }
50  virtual std::tr1::shared_ptr<epics::pvAccess::ChannelRequester> getChannelRequester() OVERRIDE FINAL
51  { return requester_type::shared_pointer(requester); }
52 
53  virtual void getField(epics::pvAccess::GetFieldRequester::shared_pointer const & requester,std::string const & subField) OVERRIDE
54  { requester->getDone(epics::pvData::Status(), fielddesc); }
55 
56  virtual void printInfo(std::ostream& out) OVERRIDE {
57  out<<"Channel '"<<pvname<<"' "<<getRemoteAddress()<<"\n";
58  }
59 };
60 
69 struct BaseMonitor : public epics::pvAccess::Monitor
70 {
71  POINTER_DEFINITIONS(BaseMonitor);
72  weak_pointer weakself;
73  inline shared_pointer shared_from_this() { return shared_pointer(weakself); }
74 
75  typedef epics::pvAccess::MonitorRequester requester_t;
76 
77  epicsMutex& lock; // not held during any callback
78  typedef epicsGuard<epicsMutex> guard_t;
79  typedef epicsGuardRelease<epicsMutex> unguard_t;
80 
81 private:
82  const requester_t::weak_pointer requester;
83 
84  epics::pvData::PVStructurePtr complete;
85  epics::pvData::BitSet changed, overflow;
86 
87  typedef std::deque<epics::pvAccess::MonitorElementPtr> buffer_t;
88  bool inoverflow;
89  bool running;
90  size_t nbuffers;
91  buffer_t inuse, empty;
92 
93 public:
94  BaseMonitor(epicsMutex& lock,
95  const requester_t::weak_pointer& requester,
96  const epics::pvData::PVStructure::shared_pointer& pvReq)
97  :lock(lock)
98  ,requester(requester)
99  ,inoverflow(false)
100  ,running(false)
101  ,nbuffers(2)
102  {}
103 
104  virtual ~BaseMonitor() {destroy();}
105 
106  inline const epics::pvData::PVStructurePtr& getValue() { return complete; }
107 
110  void connect(guard_t& guard, const epics::pvData::PVStructurePtr& value)
111  {
112  guard.assertIdenticalMutex(lock);
113  epics::pvData::StructureConstPtr dtype(value->getStructure());
114  epics::pvData::PVDataCreatePtr create(epics::pvData::getPVDataCreate());
115  BaseMonitor::shared_pointer self(shared_from_this());
116  requester_t::shared_pointer req(requester.lock());
117 
118  assert(!complete); // can't call twice
119 
120  complete = value;
121  empty.resize(nbuffers);
122  for(size_t i=0; i<empty.size(); i++) {
123  empty[i].reset(new epics::pvAccess::MonitorElement(create->createPVStructure(dtype)));
124  }
125 
126  if(req) {
127  unguard_t U(guard);
128  epics::pvData::Status sts;
129  req->monitorConnect(sts, self, dtype);
130  }
131  }
132 
133  struct no_overflow {};
134 
136  bool post(guard_t& guard, const epics::pvData::BitSet& updated, no_overflow)
137  {
138  guard.assertIdenticalMutex(lock);
139  requester_t::shared_pointer req;
140 
141  if(!complete || !running) return false;
142 
143  changed |= updated;
144 
145  if(empty.empty()) return false;
146 
147  if(p_postone())
148  req = requester.lock();
149  inoverflow = false;
150 
151  if(req) {
152  unguard_t U(guard);
153  req->monitorEvent(shared_from_this());
154  }
155  return true;
156  }
157 
159  bool post(guard_t& guard)
160  {
161  guard.assertIdenticalMutex(lock);
162  bool oflow;
163  requester_t::shared_pointer req;
164 
165  if(!complete || !running) return false;
166 
167  if(empty.empty()) {
168  oflow = inoverflow = true;
169 
170  } else {
171 
172  if(p_postone())
173  req = requester.lock();
174  oflow = inoverflow = false;
175  }
176 
177  if(req) {
178  unguard_t U(guard);
179  req->monitorEvent(shared_from_this());
180  }
181  return !oflow;
182  }
183 
185  bool post(guard_t& guard,
186  const epics::pvData::BitSet& updated,
187  const epics::pvData::BitSet& overflowed)
188  {
189  guard.assertIdenticalMutex(lock);
190  bool oflow;
191  requester_t::shared_pointer req;
192 
193  if(!complete || !running) return false;
194 
195  if(empty.empty()) {
196  oflow = inoverflow = true;
197  overflow |= overflowed;
198  overflow.or_and(updated, changed);
199  changed |= updated;
200 
201  } else {
202 
203  changed |= updated;
204  if(p_postone())
205  req = requester.lock();
206  oflow = inoverflow = false;
207  }
208 
209  if(req) {
210  unguard_t U(guard);
211  req->monitorEvent(shared_from_this());
212  }
213  return !oflow;
214  }
215 
217  bool post(guard_t& guard, const epics::pvData::BitSet& updated) {
218  bool oflow;
219  requester_t::shared_pointer req;
220 
221  if(!complete || !running) return false;
222 
223  if(empty.empty()) {
224  oflow = inoverflow = true;
225  overflow.or_and(updated, changed);
226  changed |= updated;
227 
228  } else {
229 
230  changed |= updated;
231  if(p_postone())
232  req = requester.lock();
233  oflow = inoverflow = false;
234  }
235 
236  if(req) {
237  unguard_t U(guard);
238  req->monitorEvent(shared_from_this());
239  }
240  return !oflow;
241  }
242 
243 private:
244  bool p_postone()
245  {
246  bool ret;
247  // assume lock is held
248  assert(!empty.empty());
249 
250  epics::pvAccess::MonitorElementPtr& elem = empty.front();
251 
252  elem->pvStructurePtr->copyUnchecked(*complete);
253  *elem->changedBitSet = changed;
254  *elem->overrunBitSet = overflow;
255 
256  overflow.clear();
257  changed.clear();
258 
259  ret = inuse.empty();
260  inuse.push_back(elem);
261  empty.pop_front();
262  return ret;
263  }
264 public:
265 
266  // for special handling when MonitorRequester start()s or stop()s
267  virtual void onStart() {}
268  virtual void onStop() {}
271  virtual void requestUpdate() {guard_t G(lock); post(G);}
272 
273  virtual void destroy()
274  {
275  stop();
276  }
277 
278 private:
279  virtual epics::pvData::Status start()
280  {
281  epics::pvData::Status ret;
282  bool notify = false;
283  BaseMonitor::shared_pointer self;
284  {
285  guard_t G(lock);
286  if(running) return ret;
287  running = true;
288  if(!complete) return ret; // haveType() not called (error?)
289  inoverflow = empty.empty();
290  if(!inoverflow) {
291 
292  // post complete event
293  overflow.clear();
294  changed.clear();
295  changed.set(0);
296  notify = true;
297  }
298  }
299  if(notify) onStart(); // may result in post()
300  return ret;
301  }
302 
303  virtual epics::pvData::Status stop()
304  {
305  BaseMonitor::shared_pointer self;
306  bool notify;
307  epics::pvData::Status ret;
308  {
309  guard_t G(lock);
310  notify = running;
311  running = false;
312  }
313  if(notify) onStop();
314  return ret;
315  }
316 
317  virtual epics::pvAccess::MonitorElementPtr poll()
318  {
319  epics::pvAccess::MonitorElementPtr ret;
320  guard_t G(lock);
321  if(running && complete && !inuse.empty()) {
322  ret = inuse.front();
323  inuse.pop_front();
324  }
325  return ret;
326  }
327 
328  virtual void release(epics::pvAccess::MonitorElementPtr const & elem)
329  {
330  BaseMonitor::shared_pointer self;
331  {
332  guard_t G(lock);
333  empty.push_back(elem);
334  if(inoverflow)
335  self = weakself.lock(); //TODO: concurrent release?
336  }
337  if(self)
338  self->requestUpdate(); // may result in post()
339  }
340 public:
341  virtual void getStats(Stats& s) const
342  {
343  guard_t G(lock);
344  s.nempty = empty.size();
345  s.nfilled = inuse.size();
346  s.noutstanding = nbuffers - s.nempty - s.nfilled;
347  }
348 };
349 
350 template<class CP>
351 struct BaseChannelProviderFactory : epics::pvAccess::ChannelProviderFactory
352 {
353  typedef CP provider_type;
354  std::string name;
355  epicsMutex lock;
356  std::tr1::weak_ptr<CP> last_shared;
357 
358  BaseChannelProviderFactory(const char *name) :name(name) {}
359  virtual ~BaseChannelProviderFactory() {}
360 
361  virtual std::string getFactoryName() { return name; }
362 
363  virtual epics::pvAccess::ChannelProvider::shared_pointer sharedInstance()
364  {
365  epicsGuard<epicsMutex> G(lock);
366  std::tr1::shared_ptr<CP> ret(last_shared.lock());
367  if(!ret) {
368  ret.reset(new CP());
369  last_shared = ret;
370  }
371  return ret;
372  }
373 
374  virtual epics::pvAccess::ChannelProvider::shared_pointer newInstance(const std::tr1::shared_ptr<epics::pvAccess::Configuration>&)
375  {
376  std::tr1::shared_ptr<CP> ret(new CP());
377  return ret;
378  }
379 };
380 
381 #endif // PVAHELPER_H
bool post(guard_t &guard, const epics::pvData::BitSet &updated)
post update with changed
Definition: pvahelper.h:217
bool post(guard_t &guard, const epics::pvData::BitSet &updated, no_overflow)
post update if queue not full, if full return false w/o overflow
Definition: pvahelper.h:136
bool post(guard_t &guard)
post update of pending changes. eg. call from requestUpdate()
Definition: pvahelper.h:159
void connect(guard_t &guard, const epics::pvData::PVStructurePtr &value)
Definition: pvahelper.h:110
virtual void requestUpdate()
Definition: pvahelper.h:271
bool post(guard_t &guard, const epics::pvData::BitSet &updated, const epics::pvData::BitSet &overflowed)
post update with changed and overflowed masks (eg. when updates were lost in some upstream queue) ...
Definition: pvahelper.h:185