pvAccessCPP  7.1.7
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
blockingUDP.h
1 
7 #ifndef BLOCKINGUDP_H_
8 #define BLOCKINGUDP_H_
9 
10 #ifdef epicsExportSharedSymbols
11 # define blockingUDPEpicsExportSharedSymbols
12 # undef epicsExportSharedSymbols
13 #endif
14 
15 #include <shareLib.h>
16 #include <osiSock.h>
17 #include <epicsThread.h>
18 
19 #include <pv/noDefaultMethods.h>
20 #include <pv/byteBuffer.h>
21 #include <pv/lock.h>
22 #include <pv/event.h>
23 #include <pv/pvIntrospect.h>
24 
25 #ifdef blockingUDPEpicsExportSharedSymbols
26 # define epicsExportSharedSymbols
27 # undef blockingUDPEpicsExportSharedSymbols
28 #endif
29 
30 #include <shareLib.h>
31 
32 #include <pv/remote.h>
33 #include <pv/pvaConstants.h>
34 #include <pv/inetAddressUtil.h>
35 
36 namespace epics {
37 namespace pvAccess {
38 
39 class ClientChannelImpl;
40 class BlockingUDPConnector;
41 
42 enum InetAddressType { inetAddressType_all, inetAddressType_unicast, inetAddressType_broadcast_multicast };
43 
44 class BlockingUDPTransport :
45  public Transport,
46  public TransportSendControl,
47  public epicsThreadRunable
48 {
49  EPICS_NOT_COPYABLE(BlockingUDPTransport)
50 public:
51  POINTER_DEFINITIONS(BlockingUDPTransport);
52 
53  static size_t num_instances;
54 
55 private:
56  std::tr1::weak_ptr<BlockingUDPTransport> internal_this;
57  friend class BlockingUDPConnector;
58  BlockingUDPTransport(bool serverFlag,
59  ResponseHandler::shared_pointer const & responseHandler,
60  SOCKET channel, osiSockAddr &bindAddress,
61  short remoteTransportRevision);
62 public:
63 
64  virtual ~BlockingUDPTransport();
65 
66  virtual bool isClosed() OVERRIDE FINAL {
67  return _closed.get();
68  }
69 
70  virtual const osiSockAddr& getRemoteAddress() const OVERRIDE FINAL {
71  return _remoteAddress;
72  }
73 
74  virtual const std::string& getRemoteName() const OVERRIDE FINAL {
75  return _remoteName;
76  }
77 
78  virtual std::string getType() const OVERRIDE FINAL {
79  return std::string("udp");
80  }
81 
82  virtual std::size_t getReceiveBufferSize() const OVERRIDE FINAL {
83  return _receiveBuffer.getSize();
84  }
85 
86  virtual epics::pvData::int16 getPriority() const OVERRIDE FINAL {
87  return PVA_DEFAULT_PRIORITY;
88  }
89 
90  virtual void setRemoteTransportReceiveBufferSize(
91  std::size_t /*receiveBufferSize*/) OVERRIDE FINAL {
92  // noop for UDP (limited by 64k; MAX_UDP_SEND for PVA)
93  }
94 
95  virtual void setRemoteTransportSocketReceiveBufferSize(
96  std::size_t /*socketReceiveBufferSize*/) OVERRIDE FINAL {
97  // noop for UDP (limited by 64k; MAX_UDP_SEND for PVA)
98  }
99 
100  virtual bool verify(epics::pvData::int32 /*timeoutMs*/) OVERRIDE FINAL {
101  // noop
102  return true;
103  }
104 
105  virtual void verified(epics::pvData::Status const & /*status*/) OVERRIDE FINAL {
106  // noop
107  }
108 
109  virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer const & data) OVERRIDE FINAL {
110  // noop
111  }
112 
113  // NOTE: this is not yet used for UDP
114  virtual void setByteOrder(int byteOrder) OVERRIDE FINAL {
115  // called from receive thread... or before processing
116  _receiveBuffer.setEndianess(byteOrder);
117 
118  // sync?!
119  _sendBuffer.setEndianess(byteOrder);
120  }
121 
122  virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender) OVERRIDE FINAL;
123 
124  virtual void flushSendQueue() OVERRIDE FINAL;
125 
126  void start();
127 
128  virtual void close() OVERRIDE FINAL;
129 
130  virtual void ensureData(std::size_t size) OVERRIDE FINAL;
131 
132  virtual bool directSerialize(epics::pvData::ByteBuffer* /*existingBuffer*/, const char* /*toSerialize*/,
133  std::size_t /*elementCount*/, std::size_t /*elementSize*/) OVERRIDE FINAL
134  {
135  return false;
136  }
137 
138  virtual bool directDeserialize(epics::pvData::ByteBuffer* /*existingBuffer*/, char* /*deserializeTo*/,
139  std::size_t /*elementCount*/, std::size_t /*elementSize*/) OVERRIDE FINAL
140  {
141  return false;
142  }
143 
144  virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity, epics::pvData::int32 payloadSize = 0) OVERRIDE FINAL;
145  virtual void endMessage() OVERRIDE FINAL;
146 
147  virtual void flush(bool /*lastMessageCompleted*/) OVERRIDE FINAL {
148  // noop since all UDP requests are sent immediately
149  }
150 
151  virtual void setRecipient(const osiSockAddr& sendTo) OVERRIDE FINAL {
152  _sendToEnabled = true;
153  _sendTo = sendTo;
154  }
155 
156  void setLocalMulticastAddress(const osiSockAddr& sendTo) {
157  _localMulticastAddressEnabled = true;
158  _localMulticastAddress = sendTo;
159  }
160 
161  bool hasLocalMulticastAddress() const {
162  return _localMulticastAddressEnabled;
163  }
164 
165  const osiSockAddr& getLocalMulticastAddress() const {
166  return _localMulticastAddress;
167  }
168 
169  virtual void flushSerializeBuffer() OVERRIDE FINAL {
170  // noop
171  }
172 
173  virtual void ensureBuffer(std::size_t /*size*/) OVERRIDE FINAL {
174  // noop
175  }
176 
177  virtual void cachedSerialize(
178  const std::tr1::shared_ptr<const epics::pvData::Field>& field, epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
179  {
180  // no cache
181  field->serialize(buffer, this);
182  }
183 
184  virtual std::tr1::shared_ptr<const epics::pvData::Field>
185  cachedDeserialize(epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
186  {
187  // no cache
188  // TODO
189  return epics::pvData::getFieldCreate()->deserialize(buffer, this);
190  }
191 
192  virtual bool acquire(std::tr1::shared_ptr<ClientChannelImpl> const & /*client*/) OVERRIDE FINAL
193  {
194  return false;
195  }
196 
197  virtual void release(pvAccessID /*clientId*/) OVERRIDE FINAL {}
198 
203  void setIgnoredAddresses(const InetAddrVector& addresses) {
204  _ignoredAddresses = addresses;
205  }
206 
211  const InetAddrVector& getIgnoredAddresses() const {
212  return _ignoredAddresses;
213  }
214 
219  void setTappedNIF(const InetAddrVector& addresses) {
220  _tappedNIF = addresses;
221  }
222 
227  const InetAddrVector& getTappedNIF() const {
228  return _tappedNIF;
229  }
230 
231  bool send(const char* buffer, size_t length, const osiSockAddr& address);
232 
233  bool send(epics::pvData::ByteBuffer* buffer, const osiSockAddr& address);
234 
235  bool send(epics::pvData::ByteBuffer* buffer, InetAddressType target = inetAddressType_all);
236 
241  const InetAddrVector& getSendAddresses() {
242  return _sendAddresses;
243  }
244 
249  const osiSockAddr* getBindAddress() const {
250  return &_bindAddress;
251  }
252 
253  bool isBroadcastAddress(const osiSockAddr* address, const InetAddrVector& broadcastAddresses)
254  {
255  for (size_t i = 0; i < broadcastAddresses.size(); i++)
256  if (broadcastAddresses[i].ia.sin_addr.s_addr == address->ia.sin_addr.s_addr)
257  return true;
258  return false;
259  }
260 
261  // consumes arguments
262  void setSendAddresses(InetAddrVector& addresses, std::vector<bool>& address_types) {
263  _sendAddresses.swap(addresses);
264  _isSendAddressUnicast.swap(address_types);
265  }
266 
267  void join(const osiSockAddr & mcastAddr, const osiSockAddr & nifAddr);
268 
269  void setMutlicastNIF(const osiSockAddr & nifAddr, bool loopback);
270 
271 protected:
272  AtomicBoolean _closed;
273 
277  ResponseHandler::shared_pointer _responseHandler;
278 
279  virtual void run() OVERRIDE FINAL;
280 
281 private:
282  bool processBuffer(Transport::shared_pointer const & transport, osiSockAddr& fromAddress, epics::pvData::ByteBuffer* receiveBuffer);
283 
284  void close(bool waitForThreadToComplete);
285 
286  // Context only used for logging in this class
287 
291  SOCKET _channel;
292 
293  /* When provided, this transport is used for replies (passed to handler)
294  * instead of *this. This feature is used in the situation where broadcast
295  * traffic is received on one socket, but a different socket must be used
296  * for unicast replies.
297  *
298  Transport::shared_pointer _replyTransport;
299  */
300 
304  osiSockAddr _bindAddress;
305 
309  osiSockAddr _remoteAddress;
310  std::string _remoteName;
311 
315  InetAddrVector _sendAddresses;
316 
317  std::vector<bool> _isSendAddressUnicast;
318 
322  InetAddrVector _ignoredAddresses;
323 
327  InetAddrVector _tappedNIF;
328 
332  osiSockAddr _sendTo;
333  bool _sendToEnabled;
334 
338  osiSockAddr _localMulticastAddress;
339  bool _localMulticastAddressEnabled;
340 
344  epics::pvData::ByteBuffer _receiveBuffer;
345 
349  epics::pvData::ByteBuffer _sendBuffer;
350 
354  int _lastMessageStartPosition;
355 
359  epics::pvData::Mutex _mutex;
360  epics::pvData::Mutex _sendMutex;
361 
365  epics::auto_ptr<epicsThread> _thread;
366 
367  epics::pvData::int8 _clientServerWithEndianFlag;
368 
369 };
370 
371 class BlockingUDPConnector{
372 public:
373  POINTER_DEFINITIONS(BlockingUDPConnector);
374 
375  BlockingUDPConnector(bool serverFlag) :_serverFlag(serverFlag) {}
376 
380  BlockingUDPTransport::shared_pointer connect(
381  ResponseHandler::shared_pointer const & responseHandler,
382  osiSockAddr& bindAddress,
383  epics::pvData::int8 transportRevision);
384 
385 private:
386 
390  bool _serverFlag;
391 
392  EPICS_NOT_COPYABLE(BlockingUDPConnector)
393 };
394 
395 typedef std::vector<BlockingUDPTransport::shared_pointer> BlockingUDPTransportVector;
396 
397 void initializeUDPTransports(
398  bool serverFlag,
399  BlockingUDPTransportVector& udpTransports,
400  const IfaceNodeVector& ifaceList,
401  const ResponseHandler::shared_pointer& responseHandler,
402  BlockingUDPTransport::shared_pointer& sendTransport,
403  epics::pvData::int32& listenPort,
404  bool autoAddressList,
405  const std::string& addressList,
406  const std::string& ignoreAddressList);
407 
408 
409 }
410 }
411 
412 #endif /* BLOCKINGUDP_H_ */
size_t start() const
size_t size() const
const epics::pvData::int16 PVA_DEFAULT_PRIORITY
Default priority (corresponds to POSIX SCHED_OTHER)
Definition: pvaConstants.h:70
void swap(vector &__x) noexcept
basic_string< char > string