pva2pva  1.4.1
 All Classes Functions Variables Pages
server.cpp
1 #include <stdio.h>
2 
3 #include <epicsAtomic.h>
4 #include <epicsString.h>
5 #include <epicsTimer.h>
6 
7 #include <pv/logger.h>
8 #include <pv/pvIntrospect.h> /* for pvdVersion.h */
9 #include <pv/epicsException.h>
10 #include <pv/serverContext.h>
11 #include <pv/logger.h>
12 
13 #define epicsExportSharedSymbols
14 #include "helper.h"
15 #include "pva2pva.h"
16 #include "server.h"
17 
18 #if defined(PVDATA_VERSION_INT)
19 #if PVDATA_VERSION_INT > VERSION_INT(7,0,0,0)
20 # define USE_MSTATS
21 #endif
22 #endif
23 
24 namespace pva = epics::pvAccess;
25 namespace pvd = epics::pvData;
26 
27 std::tr1::shared_ptr<pva::ChannelProvider>
28 GWServerChannelProvider::getChannelProvider()
29 {
30  return shared_from_this();
31 }
32 
33 // Called from UDP search thread with no locks held
34 // Called from TCP threads (for search w/ TCP)
35 pva::ChannelFind::shared_pointer
36 GWServerChannelProvider::channelFind(std::string const & channelName,
37  pva::ChannelFindRequester::shared_pointer const & channelFindRequester)
38 {
39  pva::ChannelFind::shared_pointer ret;
40  bool found = false;
41 
42  if(!channelName.empty())
43  {
44  LOG(pva::logLevelDebug, "Searching for '%s'", channelName.c_str());
45  ChannelCacheEntry::shared_pointer ent(cache.lookup(channelName));
46  if(ent) {
47  found = true;
48  ret = shared_from_this();
49  }
50  }
51 
52  // unlock for callback
53 
54  channelFindRequester->channelFindResult(pvd::Status::Ok, ret, found);
55 
56  return ret;
57 }
58 
59 // The return value of this function is ignored
60 // The newly created channel is given to the ChannelRequester
61 pva::Channel::shared_pointer
62 GWServerChannelProvider::createChannel(std::string const & channelName,
63  pva::ChannelRequester::shared_pointer const & channelRequester,
64  short priority, std::string const & addressx)
65 {
66  GWChannel::shared_pointer ret;
67  std::string address = channelRequester->getRequesterName();
68 
69  if(!channelName.empty())
70  {
71  Guard G(cache.cacheLock);
72 
73  ChannelCacheEntry::shared_pointer ent(cache.lookup(channelName)); // recursively locks cacheLock
74 
75  if(ent)
76  {
77  ret.reset(new GWChannel(ent, shared_from_this(), channelRequester, address));
78  ent->interested.insert(ret);
79  ret->weakref = ret;
80  }
81  }
82 
83  if(!ret) {
84  pvd::Status S(pvd::Status::STATUSTYPE_ERROR, "Not found");
85  channelRequester->channelCreated(S, ret);
86  } else {
87  channelRequester->channelCreated(pvd::Status::Ok, ret);
88  channelRequester->channelStateChange(ret, pva::Channel::CONNECTED);
89  }
90 
91  return ret; // ignored by caller
92 }
93 
94 void GWServerChannelProvider::destroy() {}
95 
96 GWServerChannelProvider::GWServerChannelProvider(const pva::ChannelProvider::shared_pointer& prov)
97  :cache(prov)
98 {}
99 
100 GWServerChannelProvider::~GWServerChannelProvider() {}
101 
102 void ServerConfig::drop(const char *client, const char *channel)
103 {
104  if(!client)
105  client= "";
106  if(!channel)
107  channel = "";
108  // TODO: channel glob match
109 
110  FOREACH(clients_t::const_iterator, it, end, clients)
111  {
112  if(client[0]!='\0' && client[0]!='*' && it->first!=client)
113  continue;
114 
115  const GWServerChannelProvider::shared_pointer& prov(it->second);
116 
117  ChannelCacheEntry::shared_pointer entry;
118 
119  // find the channel, if it's there
120  {
121  Guard G(prov->cache.cacheLock);
122 
123  ChannelCache::entries_t::iterator it = prov->cache.entries.find(channel);
124  if(it==prov->cache.entries.end())
125  continue;
126 
127  std::cout<<"Drop from "<<it->first<<" : "<<it->second->channelName<<"\n";
128 
129  entry = it->second;
130  prov->cache.entries.erase(it); // drop out of cache (TODO: not required)
131  }
132 
133  // trigger client side disconnect (recursively calls call CRequester::channelStateChange())
134  // TODO: shouldn't need this
135  entry->channel->destroy();
136 
137  }
138 }
139 
140 void ServerConfig::status_server(int lvl, const char *server)
141 {
142  if(!server)
143  server = "";
144 
145  FOREACH(servers_t::const_iterator, it, end, servers)
146  {
147  if(server[0]!='\0' && server[0]!='*' && it->first!=server)
148  continue;
149 
150  const pva::ServerContext::shared_pointer& serv(it->second);
151  std::cout<<"==> Server: "<<it->first<<"\n";
152  serv->printInfo(std::cout);
153  std::cout<<"<== Server: "<<it->first<<"\n\n";
154  // TODO: print client list somehow
155  }
156 }
157 
158 void ServerConfig::status_client(int lvl, const char *client, const char *channel)
159 {
160  if(!client)
161  client= "";
162  if(!channel)
163  channel = "";
164 
165  bool iswild = strchr(channel, '?') || strchr(channel, '*');
166 
167  FOREACH(clients_t::const_iterator, it, end, clients)
168  {
169  if(client[0]!='\0' && client[0]!='*' && it->first!=client)
170  continue;
171 
172  const GWServerChannelProvider::shared_pointer& prov(it->second);
173 
174  std::cout<<"==> Client: "<<it->first<<"\n";
175 
176  ChannelCache::entries_t entries;
177 
178  size_t ncache, ncleaned, ndust;
179  {
180  Guard G(prov->cache.cacheLock);
181 
182  ncache = prov->cache.entries.size();
183  ncleaned = prov->cache.cleanerRuns;
184  ndust = prov->cache.cleanerDust;
185 
186  if(lvl>0) {
187  if(!iswild) { // no string or some glob pattern
188  entries = prov->cache.entries; // copy of std::map
189  } else { // just one channel
190  ChannelCache::entries_t::iterator it(prov->cache.entries.find(channel));
191  if(it!=prov->cache.entries.end())
192  entries[it->first] = it->second;
193  }
194  }
195  }
196 
197  std::cout<<"Cache has "<<ncache<<" channels. Cleaned "
198  <<ncleaned<<" times closing "<<ndust<<" channels\n";
199 
200  if(lvl<=0)
201  continue;
202 
203  FOREACH(ChannelCache::entries_t::const_iterator, it2, end2, entries)
204  {
205  const std::string& channame = it2->first;
206  if(iswild && !epicsStrGlobMatch(channame.c_str(), channel))
207  continue;
208 
209  ChannelCacheEntry& E = *it2->second;
210  ChannelCacheEntry::mon_entries_t::lock_vector_type mons;
211  size_t nsrv, nmon;
212  bool dropflag;
213  const char *chstate;
214  {
215  Guard G(E.mutex());
216  chstate = pva::Channel::ConnectionStateNames[E.channel->getConnectionState()];
217  nsrv = E.interested.size();
218  nmon = E.mon_entries.size();
219  dropflag = E.dropPoke;
220 
221  if(lvl>1)
222  mons = E.mon_entries.lock_vector();
223  }
224 
225  std::cout<<chstate
226  <<" Client Channel '"<<channame
227  <<"' used by "<<nsrv<<" Server channel(s) with "
228  <<nmon<<" unique subscription(s) "
229  <<(dropflag?'!':'_')<<"\n";
230 
231  if(lvl<=1)
232  continue;
233 
234  FOREACH(ChannelCacheEntry::mon_entries_t::lock_vector_type::const_iterator, it2, end2, mons) {
235  MonitorCacheEntry& ME = *it2->second;
236 
237  MonitorCacheEntry::interested_t::vector_type usrs;
238  size_t nsrvmon;
239 #ifdef USE_MSTATS
240  pvd::Monitor::Stats mstats;
241 #endif
242  bool hastype, hasdata, isdone;
243  {
244  Guard G(ME.mutex());
245 
246  nsrvmon = ME.interested.size();
247  hastype = !!ME.typedesc;
248  hasdata = !!ME.lastelem;
249  isdone = ME.done;
250 
251 #ifdef USE_MSTATS
252  if(ME.mon)
253  ME.mon->getStats(mstats);
254 #endif
255 
256  if(lvl>2)
257  usrs = ME.interested.lock_vector();
258  }
259 
260  // TODO: how to describe pvRequest in a compact way...
261  std::cout<<" Client Monitor used by "<<nsrvmon<<" Server monitors, "
262  <<"Has "<<(hastype?"":"not ")
263  <<"opened, Has "<<(hasdata?"":"not ")
264  <<"recv'd some data, Has "<<(isdone?"":"not ")<<"finalized\n"
265  " "<< epicsAtomicGetSizeT(&ME.nwakeups)<<" wakeups "
266  <<epicsAtomicGetSizeT(&ME.nevents)<<" events\n";
267 #ifdef USE_MSTATS
268  if(mstats.nempty || mstats.nfilled || mstats.noutstanding)
269  std::cout<<" US monitor queue "<<mstats.nfilled
270  <<" filled, "<<mstats.noutstanding
271  <<" outstanding, "<<mstats.nempty<<" empty\n";
272 #endif
273  if(lvl<=2)
274  continue;
275 
276  FOREACH(MonitorCacheEntry::interested_t::vector_type::const_iterator, it3, end3, usrs) {
277  MonitorUser& MU = **it3;
278 
279  size_t nempty, nfilled, nused, total;
280  std::string remote;
281  bool isrunning;
282  {
283  Guard G(MU.mutex());
284 
285  nempty = MU.empty.size();
286  nfilled = MU.filled.size();
287  nused = MU.inuse.size();
288  isrunning = MU.running;
289 
290  GWChannel::shared_pointer srvchan(MU.srvchan.lock());
291  if(srvchan)
292  remote = srvchan->address;
293  else
294  remote = "<unknown>";
295  }
296  total = nempty + nfilled + nused;
297 
298  std::cout<<" Server monitor from "
299  <<remote
300  <<(isrunning?"":" Paused")
301  <<" buffer "<<nfilled<<"/"<<total
302  <<" out "<<nused<<"/"<<total
303  <<" "<<epicsAtomicGetSizeT(&MU.nwakeups)<<" wakeups "
304  <<epicsAtomicGetSizeT(&MU.nevents)<<" events "
305  <<epicsAtomicGetSizeT(&MU.ndropped)<<" drops\n";
306  }
307  }
308 
309 
310 
311  }
312 
313 
314  std::cout<<"<== Client: "<<it->first<<"\n\n";
315  }
316 }
epics::pvData::MonitorElement::shared_pointer lastelem
Definition: chancache.h:46
Definition: chancache.h:103
size_t size() const
Definition: weakmap.h:147
lock_vector_type lock_vector() const
Definition: weakmap.h:259
vector_type lock_vector() const
Definition: weakset.h:268
size_t size() const
Definition: weakset.h:168
Definition: chancache.h:22