pva2pva  1.4.1
 All Classes Functions Variables Pages
chancache.cpp
1 #include <stdio.h>
2 
3 #include <epicsAtomic.h>
4 #include <errlog.h>
5 
6 #include <epicsMutex.h>
7 #include <epicsTimer.h>
8 
9 #include <pv/epicsException.h>
10 #include <pv/serverContext.h>
11 #include <pv/pvAccess.h>
12 
13 #define epicsExportSharedSymbols
14 #include "pva2pva.h"
15 #include "helper.h"
16 #include "chancache.h"
17 #include "channel.h"
18 
19 namespace pvd = epics::pvData;
20 namespace pva = epics::pvAccess;
21 
22 size_t ChannelCacheEntry::num_instances;
23 
24 ChannelCacheEntry::ChannelCacheEntry(ChannelCache* c, const std::string& n)
25  :channelName(n), cache(c), dropPoke(true)
26 {
27  epicsAtomicIncrSizeT(&num_instances);
28 }
29 
30 ChannelCacheEntry::~ChannelCacheEntry()
31 {
32  // Should *not* be holding cache->cacheLock
33  if(channel.get())
34  channel->destroy(); // calls channelStateChange() w/ DESTROY
35  epicsAtomicDecrSizeT(&num_instances);
36 }
37 
38 std::string
39 ChannelCacheEntry::CRequester::getRequesterName()
40 {
41  return "GWClient";
42 }
43 
44 size_t ChannelCacheEntry::CRequester::num_instances;
45 
46 ChannelCacheEntry::CRequester::CRequester(const ChannelCacheEntry::shared_pointer& p)
47  :chan(p)
48 {
49  epicsAtomicIncrSizeT(&num_instances);
50 }
51 
52 ChannelCacheEntry::CRequester::~CRequester()
53 {
54  epicsAtomicDecrSizeT(&num_instances);
55 }
56 
57 // for ChannelRequester
58 void
59 ChannelCacheEntry::CRequester::channelCreated(const pvd::Status& status,
60  pva::Channel::shared_pointer const & channel)
61 {}
62 
63 void
64 ChannelCacheEntry::CRequester::channelStateChange(pva::Channel::shared_pointer const & channel,
65  pva::Channel::ConnectionState connectionState)
66 {
67  ChannelCacheEntry::shared_pointer chan(this->chan.lock());
68  if(!chan)
69  return;
70 
71  {
72  Guard G(chan->cache->cacheLock);
73 
74  assert(chan->channel.get()==channel.get());
75 
76  switch(connectionState)
77  {
78  case pva::Channel::DISCONNECTED:
79  case pva::Channel::DESTROYED:
80  // Drop from cache
81  chan->cache->entries.erase(chan->channelName);
82  // keep 'chan' as a reference so that actual destruction doesn't happen which cacheLock is held
83  break;
84  default:
85  break;
86  }
87  }
88 
89  // fanout notification
90  ChannelCacheEntry::interested_t::vector_type interested(chan->interested.lock_vector()); // Copy
91 
92  FOREACH(ChannelCacheEntry::interested_t::vector_type::const_iterator, it, end, interested)
93  {
94  GWChannel *chan = it->get();
95  pva::ChannelRequester::shared_pointer req(chan->requester.lock());
96  if(req)
97  req->channelStateChange(*it, connectionState);
98  }
99 }
100 
101 
102 struct ChannelCache::cacheClean : public epicsTimerNotify
103 {
104  ChannelCache *cache;
105  cacheClean(ChannelCache *c) : cache(c) {}
106  epicsTimerNotify::expireStatus expire(const epicsTime &currentTime)
107  {
108  // keep a reference to any cache entrys being removed so they
109  // aren't destroyed while cacheLock is held
110  std::set<ChannelCacheEntry::shared_pointer> cleaned;
111 
112  {
113  Guard G(cache->cacheLock);
114  cache->cleanerRuns++;
115 
116  ChannelCache::entries_t::iterator cur=cache->entries.begin(), next, end=cache->entries.end();
117  while(cur!=end) {
118  next = cur;
119  ++next;
120 
121  if(!cur->second->dropPoke && cur->second->interested.empty()) {
122  cleaned.insert(cur->second);
123  cache->entries.erase(cur);
124  cache->cleanerDust++;
125  } else {
126  cur->second->dropPoke = false;
127  }
128 
129  cur = next;
130  }
131  }
132  return epicsTimerNotify::expireStatus(epicsTimerNotify::restart, 30.0);
133  }
134 };
135 
136 ChannelCache::ChannelCache(const pva::ChannelProvider::shared_pointer& prov)
137  :provider(prov)
138  ,timerQueue(&epicsTimerQueueActive::allocate(1, epicsThreadPriorityCAServerLow-2))
139  ,cleaner(new cacheClean(this))
140  ,cleanerRuns(0)
141  ,cleanerDust(0)
142 {
143  if(!provider)
144  throw std::logic_error("Missing 'pva' provider");
145  assert(timerQueue);
146  cleanTimer = &timerQueue->createTimer();
147  cleanTimer->start(*cleaner, 30.0);
148 }
149 
150 ChannelCache::~ChannelCache()
151 {
152  entries_t E;
153  {
154  Guard G(cacheLock);
155 
156  cleanTimer->destroy();
157  timerQueue->release();
158  delete cleaner;
159 
160  entries_t E;
161  E.swap(entries);
162  }
163 }
164 
165 ChannelCacheEntry::shared_pointer
166 ChannelCache::lookup(const std::string& newName)
167 {
168  ChannelCacheEntry::shared_pointer ret;
169 
170  Guard G(cacheLock);
171 
172  entries_t::const_iterator it = entries.find(newName);
173 
174  if(it==entries.end()) {
175  // first request, create ChannelCacheEntry
176  //TODO: async lookup
177 
178  ChannelCacheEntry::shared_pointer ent(new ChannelCacheEntry(this, newName));
179  ent->requester.reset(new ChannelCacheEntry::CRequester(ent));
180 
181  entries[newName] = ent;
182 
183  pva::Channel::shared_pointer M;
184  {
185  // unlock to call createChannel()
186  epicsGuardRelease<epicsMutex> U(G);
187 
188  M = provider->createChannel(newName, ent->requester);
189  if(!M)
190  THROW_EXCEPTION2(std::runtime_error, "Failed to createChannel");
191  }
192  ent->channel = M;
193 
194  if(M->isConnected())
195  ret = ent; // immediate connect, mostly for unit-tests (thus delayed connect not covered)
196 
197  } else if(it->second->channel && it->second->channel->isConnected()) {
198  // another request, and hey we're connected this time
199 
200  ret = it->second;
201  it->second->dropPoke = true;
202 
203  } else {
204  // not connected yet, but a client is still interested
205  it->second->dropPoke = true;
206  }
207 
208  return ret;
209 }
Definition: chancache.h:132
Definition: chancache.h:103