2 #include <epicsAtomic.h>
5 #include <epicsMutex.h>
6 #include <epicsTimer.h>
8 #include <pv/pvAccess.h>
10 #define epicsExportSharedSymbols
13 #include "chancache.h"
15 namespace pva = epics::pvAccess;
16 namespace pvd = epics::pvData;
18 size_t MonitorCacheEntry::num_instances;
19 size_t MonitorUser::num_instances;
24 T getS(
const pvd::PVStructurePtr& s,
const char* name, T dft)
27 return s->getSubFieldT<pvd::PVScalar>(name)->getAs<T>();
28 }
catch(std::runtime_error& e){
34 MonitorCacheEntry::MonitorCacheEntry(
ChannelCacheEntry *ent,
const pvd::PVStructure::shared_pointer& pvr)
36 ,bufferSize(getS<pvd::uint32>(pvr,
"record._options.queueSize", 2))
42 epicsAtomicIncrSizeT(&num_instances);
45 MonitorCacheEntry::~MonitorCacheEntry()
47 pvd::Monitor::shared_pointer M;
52 epicsAtomicDecrSizeT(&num_instances);
57 MonitorCacheEntry::monitorConnect(pvd::Status
const & status,
58 pvd::MonitorPtr
const & monitor,
59 pvd::StructureConstPtr
const & structure)
61 interested_t::vector_type tonotify;
67 std::cerr<<
"monitorConnect() w/ new type. Monitor has outlived it's connection.\n";
74 if(status.isSuccess()) {
75 startresult = monitor->start();
80 if(startresult.isSuccess()) {
81 lastelem.reset(
new pvd::MonitorElement(pvd::getPVDataCreate()->createPVStructure(structure)));
89 if(!startresult.isSuccess())
90 std::cout<<
"upstream monitor start() fails\n";
92 shared_pointer
self(weakref);
94 for(interested_t::vector_type::const_iterator it = tonotify.begin(),
95 end = tonotify.end(); it!=end; ++it)
97 pvd::MonitorRequester::shared_pointer req((*it)->req);
99 req->monitorConnect(startresult, *it, structure);
110 MonitorCacheEntry::monitorEvent(pvd::MonitorPtr
const & monitor)
121 epicsAtomicIncrSizeT(&nwakeups);
123 shared_pointer
self(weakref);
125 pva::MonitorElementPtr update;
127 typedef std::vector<MonitorUser::shared_pointer> dsnotify_t;
136 while((update=monitor->poll()))
138 epicsAtomicIncrSizeT(&nevents);
140 lastelem->pvStructurePtr->copyUnchecked(*update->pvStructurePtr,
141 *update->changedBitSet);
142 *
lastelem->changedBitSet = *update->changedBitSet;
143 *
lastelem->overrunBitSet = *update->overrunBitSet;
144 monitor->release(update);
147 interested_t::iterator IIT(interested);
148 for(interested_t::value_pointer pusr = IIT.next(); pusr; pusr = IIT.next())
153 Guard G(usr->mutex());
157 if(!usr->running || usr->empty.empty()) {
158 usr->inoverflow =
true;
165 *usr->overflowElement->overrunBitSet |= *
lastelem->overrunBitSet;
166 usr->overflowElement->overrunBitSet->or_and(*usr->overflowElement->changedBitSet,
168 *usr->overflowElement->changedBitSet |= *
lastelem->changedBitSet;
170 usr->overflowElement->pvStructurePtr->copyUnchecked(*
lastelem->pvStructurePtr,
173 epicsAtomicIncrSizeT(&usr->ndropped);
179 assert(!usr->inoverflow);
181 if(usr->filled.empty())
182 dsnotify.push_back(pusr);
184 pvd::MonitorElementPtr elem(usr->empty.front());
186 *elem->overrunBitSet = *
lastelem->overrunBitSet;
187 *elem->changedBitSet = *
lastelem->changedBitSet;
190 elem->pvStructurePtr->copyUnchecked(*
lastelem->pvStructurePtr);
192 usr->filled.push_back(elem);
193 usr->empty.pop_front();
195 epicsAtomicIncrSizeT(&usr->nevents);
204 FOREACH(dsnotify_t::iterator, it,end,dsnotify) {
206 pvd::MonitorRequester::shared_pointer req(usr->req);
207 epicsAtomicIncrSizeT(&usr->nwakeups);
208 req->monitorEvent(*it);
214 MonitorCacheEntry::unlisten(pvd::MonitorPtr
const & monitor)
216 pvd::Monitor::shared_pointer M;
217 interested_t::vector_type tonotify;
225 startresult = pvd::Status(pvd::Status::STATUSTYPE_ERROR,
"upstream unlisten()");
230 FOREACH(interested_t::vector_type::iterator, it, end, tonotify) {
232 pvd::MonitorRequester::shared_pointer req(usr->req);
233 if(usr->inuse.empty())
239 MonitorCacheEntry::getRequesterName()
241 return "MonitorCacheEntry";
244 MonitorUser::MonitorUser(
const MonitorCacheEntry::shared_pointer &e)
252 epicsAtomicIncrSizeT(&num_instances);
255 MonitorUser::~MonitorUser()
257 epicsAtomicDecrSizeT(&num_instances);
262 MonitorUser::destroy()
273 pvd::MonitorRequester::shared_pointer req(this->req.lock());
275 return pvd::Status(pvd::Status::STATUSTYPE_FATAL,
"already dead");
279 Guard G(entry->mutex());
281 if(!entry->startresult.isSuccess())
282 return entry->startresult;
284 pvd::PVStructurePtr lval;
286 lval = entry->lastelem->pvStructurePtr;
287 pvd::StructureConstPtr typedesc(entry->typedesc);
292 empty.resize(entry->bufferSize);
293 pvd::PVDataCreatePtr fact(pvd::getPVDataCreate());
294 for(
unsigned i=0; i<empty.size(); i++) {
295 empty[i].reset(
new pvd::MonitorElement(fact->createPVStructure(typedesc)));
299 overflowElement.reset(
new pvd::MonitorElement(fact->createPVStructure(typedesc)));
302 doEvt = filled.empty();
304 if(lval && !empty.empty()) {
307 const pva::MonitorElementPtr& elem(empty.front());
308 elem->pvStructurePtr->copy(*lval);
309 elem->changedBitSet->set(0);
310 elem->overrunBitSet->clear();
311 filled.push_back(elem);
315 doEvt &= !filled.empty();
319 req->monitorEvent(shared_pointer(weakref));
320 return pvd::Status();
328 return pvd::Status::Ok;
331 pva::MonitorElementPtr
335 pva::MonitorElementPtr ret;
336 if(!filled.empty()) {
337 ret = filled.front();
346 MonitorUser::release(pva::MonitorElementPtr
const & monitorElement)
350 std::set<epics::pvData::MonitorElementPtr>::iterator it = inuse.find(monitorElement);
351 if(it!=inuse.end()) {
359 filled.push_back(overflowElement);
360 overflowElement = monitorElement;
361 overflowElement->changedBitSet->clear();
362 overflowElement->overrunBitSet->clear();
367 empty.push_back(monitorElement);
372 throw std::invalid_argument(
"Can't release MonitorElement not in use");
378 MonitorUser::getRequesterName()
380 return "MonitorCacheEntry";
epics::pvData::MonitorElement::shared_pointer lastelem
vector_type lock_vector() const