pva2pva  1.4.1
 All Classes Functions Variables Pages
channel.cpp
1 
2 #include <epicsAtomic.h>
3 
4 #include <epicsTimer.h>
5 #include <epicsMutex.h>
6 #include <epicsGuard.h>
7 #include <epicsEndian.h>
8 
9 #include <pv/iocshelper.h>
10 
11 #include <pv/pvAccess.h>
12 
13 #define epicsExportSharedSymbols
14 #include "helper.h"
15 #include "pva2pva.h"
16 #include "channel.h"
17 
18 namespace pva = epics::pvAccess;
19 namespace pvd = epics::pvData;
20 
21 int p2pReadOnly = 0;
22 
23 size_t GWChannel::num_instances;
24 
25 GWChannel::GWChannel(const ChannelCacheEntry::shared_pointer& e,
26  const epics::pvAccess::ChannelProvider::weak_pointer& srvprov,
27  const epics::pvAccess::ChannelRequester::weak_pointer &r,
28  const std::string& addr)
29  :entry(e)
30  ,requester(r)
31  ,address(addr)
32  ,server_provder(srvprov)
33 {
34  epicsAtomicIncrSizeT(&num_instances);
35 }
36 
37 GWChannel::~GWChannel()
38 {
39  epicsAtomicDecrSizeT(&num_instances);
40 }
41 
42 std::string
43 GWChannel::getRequesterName()
44 {
45  return "GWChannel";
46 }
47 
48 void
49 GWChannel::destroy()
50 {}
51 
52 std::tr1::shared_ptr<pva::ChannelProvider>
53 GWChannel::getProvider()
54 {
55  return pva::ChannelProvider::shared_pointer(server_provder);
56 }
57 
58 std::string
59 GWChannel::getRemoteAddress()
60 {
61  // pass through address of origin server (information leak?)
62  return entry->channel->getRemoteAddress();
63 }
64 
65 pva::Channel::ConnectionState
66 GWChannel::getConnectionState()
67 {
68  return entry->channel->getConnectionState();
69 }
70 
71 std::string
72 GWChannel::getChannelName()
73 {
74  return entry->channelName;
75 }
76 
77 std::tr1::shared_ptr<pva::ChannelRequester>
78 GWChannel::getChannelRequester()
79 {
80  return pva::ChannelRequester::shared_pointer(requester);
81 }
82 
83 
84 void
85 GWChannel::getField(pva::GetFieldRequester::shared_pointer const & requester,
86  std::string const & subField)
87 {
88  //TODO: cache for top level field?
89  entry->channel->getField(requester, subField);
90 }
91 
92 pva::AccessRights
93 GWChannel::getAccessRights(pvd::PVField::shared_pointer const & pvField)
94 {
95  return entry->channel->getAccessRights(pvField);
96 }
97 
98 pva::ChannelProcess::shared_pointer
99 GWChannel::createChannelProcess(
100  pva::ChannelProcessRequester::shared_pointer const & channelProcessRequester,
101  pvd::PVStructure::shared_pointer const & pvRequest)
102 {
103  if(!p2pReadOnly)
104  return entry->channel->createChannelProcess(channelProcessRequester, pvRequest);
105  else
106  return Channel::createChannelProcess(channelProcessRequester, pvRequest);
107 }
108 
109 pva::ChannelGet::shared_pointer
110 GWChannel::createChannelGet(
111  pva::ChannelGetRequester::shared_pointer const & channelGetRequester,
112  pvd::PVStructure::shared_pointer const & pvRequest)
113 {
114  return entry->channel->createChannelGet(channelGetRequester, pvRequest);
115 }
116 
117 pva::ChannelPut::shared_pointer
118 GWChannel::createChannelPut(
119  pva::ChannelPutRequester::shared_pointer const & channelPutRequester,
120  pvd::PVStructure::shared_pointer const & pvRequest)
121 {
122  //TODO: allow ChannelPut::get()
123  if(!p2pReadOnly)
124  return entry->channel->createChannelPut(channelPutRequester, pvRequest);
125  else
126  return Channel::createChannelPut(channelPutRequester, pvRequest);
127 }
128 
129 pva::ChannelPutGet::shared_pointer
130 GWChannel::createChannelPutGet(
131  pva::ChannelPutGetRequester::shared_pointer const & channelPutGetRequester,
132  pvd::PVStructure::shared_pointer const & pvRequest)
133 {
134  if(!p2pReadOnly)
135  return entry->channel->createChannelPutGet(channelPutGetRequester, pvRequest);
136  else
137  return Channel::createChannelPutGet(channelPutGetRequester, pvRequest);
138 }
139 
140 pva::ChannelRPC::shared_pointer
141 GWChannel::createChannelRPC(
142  pva::ChannelRPCRequester::shared_pointer const & channelRPCRequester,
143  pvd::PVStructure::shared_pointer const & pvRequest)
144 {
145  if(!p2pReadOnly)
146  return entry->channel->createChannelRPC(channelRPCRequester, pvRequest);
147  else
148  return Channel::createChannelRPC(channelRPCRequester, pvRequest);
149 }
150 
151 namespace {
152 struct noclean {
153  void operator()(MonitorCacheEntry *) {}
154 };
155 }
156 
157 pvd::Monitor::shared_pointer
158 GWChannel::createMonitor(
159  pvd::MonitorRequester::shared_pointer const & monitorRequester,
160  pvd::PVStructure::shared_pointer const & pvRequest)
161 {
162  ChannelCacheEntry::pvrequest_t ser;
163  // serialize request struct to string using host byte order (only used for local comparison)
164  pvd::serializeToVector(pvRequest.get(), EPICS_BYTE_ORDER, ser);
165 
166  MonitorCacheEntry::shared_pointer ment;
167  MonitorUser::shared_pointer mon;
168 
169  pvd::Status startresult;
170  pvd::StructureConstPtr typedesc;
171 
172  try {
173  {
174  Guard G(entry->mutex());
175 
176  // TODO: no-cache/no-share flag in pvRequest
177 
178  ment = entry->mon_entries.find(ser);
179  if(!ment) {
180  ment.reset(new MonitorCacheEntry(entry.get(), pvRequest));
181  entry->mon_entries[ser] = ment; // ref. wrapped
182  ment->weakref = ment;
183 
184  // We've added an incomplete entry (no Monitor)
185  // so MonitorUser must check validity before de-ref.
186  // in this case we use !!typedesc as this also indicates
187  // that the upstream monitor is connected
188  pvd::MonitorPtr M;
189  {
190  UnGuard U(G);
191 
192  M = entry->channel->createMonitor(ment, pvRequest);
193  }
194  ment->mon = M;
195  }
196  }
197 
198  Guard G(ment->mutex());
199 
200  mon.reset(new MonitorUser(ment));
201  ment->interested.insert(mon);
202  mon->weakref = mon;
203  mon->srvchan = shared_pointer(weakref);
204  mon->req = monitorRequester;
205 
206  typedesc = ment->typedesc;
207  startresult = ment->startresult;
208 
209  } catch(std::exception& e) {
210  mon.reset();
211  std::cerr<<"Exception in GWChannel::createMonitor()\n"
212  "is "<<e.what()<<"\n";
213  startresult = pvd::Status(pvd::Status::STATUSTYPE_FATAL, "Error during GWChannel setup");
214  }
215 
216  // unlock for callback
217 
218  if(typedesc || !startresult.isSuccess()) {
219  // upstream monitor already connected, or never will be.
220  monitorRequester->monitorConnect(startresult, mon, typedesc);
221  }
222 
223  return mon;
224 }
225 
226 pva::ChannelArray::shared_pointer
227 GWChannel::createChannelArray(
228  pva::ChannelArrayRequester::shared_pointer const & channelArrayRequester,
229  pvd::PVStructure::shared_pointer const & pvRequest)
230 {
231  return entry->channel->createChannelArray(channelArrayRequester, pvRequest);
232 }
233 
234 
235 void
236 GWChannel::printInfo(std::ostream& out)
237 {
238  out<<"GWChannel for "<<entry->channelName<<"\n";
239 }
240 
241 
242 void registerReadOnly()
243 {
244  epics::iocshVariable<int, &p2pReadOnly>("p2pReadOnly");
245 }
Definition: chancache.h:22