pvAccessCPP 7.1.8
Loading...
Searching...
No Matches
blockingUDP.h
1
7#ifndef BLOCKINGUDP_H_
8#define BLOCKINGUDP_H_
9
10#ifdef epicsExportSharedSymbols
11# define blockingUDPEpicsExportSharedSymbols
12# undef epicsExportSharedSymbols
13#endif
14
15#include <shareLib.h>
16#include <osiSock.h>
17#include <epicsThread.h>
18
19#include <pv/noDefaultMethods.h>
20#include <pv/byteBuffer.h>
21#include <pv/lock.h>
22#include <pv/event.h>
23#include <pv/pvIntrospect.h>
24
25#ifdef blockingUDPEpicsExportSharedSymbols
26# define epicsExportSharedSymbols
27# undef blockingUDPEpicsExportSharedSymbols
28#endif
29
30#include <shareLib.h>
31
32#include <pv/remote.h>
33#include <pv/pvaConstants.h>
34#include <pv/inetAddressUtil.h>
35
36namespace epics {
37namespace pvAccess {
38
39class ClientChannelImpl;
40class BlockingUDPConnector;
41
42enum InetAddressType { inetAddressType_all, inetAddressType_unicast, inetAddressType_broadcast_multicast };
43
44class BlockingUDPTransport :
45 public Transport,
46 public TransportSendControl,
47 public epicsThreadRunable
48{
49 EPICS_NOT_COPYABLE(BlockingUDPTransport)
50public:
51 POINTER_DEFINITIONS(BlockingUDPTransport);
52
53 static size_t num_instances;
54
55private:
56 std::tr1::weak_ptr<BlockingUDPTransport> internal_this;
57 friend class BlockingUDPConnector;
58 BlockingUDPTransport(bool serverFlag,
59 ResponseHandler::shared_pointer const & responseHandler,
60 SOCKET channel, osiSockAddr &bindAddress,
61 short remoteTransportRevision);
62public:
63
64 virtual ~BlockingUDPTransport();
65
66 virtual bool isClosed() OVERRIDE FINAL {
67 return _closed.get();
68 }
69
70 virtual const osiSockAddr& getRemoteAddress() const OVERRIDE FINAL {
71 return _remoteAddress;
72 }
73
74 virtual const std::string& getRemoteName() const OVERRIDE FINAL {
75 return _remoteName;
76 }
77
78 virtual std::string getType() const OVERRIDE FINAL {
79 return std::string("udp");
80 }
81
82 virtual std::size_t getReceiveBufferSize() const OVERRIDE FINAL {
83 return _receiveBuffer.getSize();
84 }
85
86 virtual epics::pvData::int16 getPriority() const OVERRIDE FINAL {
88 }
89
90 virtual void setRemoteTransportReceiveBufferSize(
91 std::size_t /*receiveBufferSize*/) OVERRIDE FINAL {
92 // noop for UDP (limited by 64k; MAX_UDP_SEND for PVA)
93 }
94
95 virtual void setRemoteTransportSocketReceiveBufferSize(
96 std::size_t /*socketReceiveBufferSize*/) OVERRIDE FINAL {
97 // noop for UDP (limited by 64k; MAX_UDP_SEND for PVA)
98 }
99
100 virtual bool verify(epics::pvData::int32 /*timeoutMs*/) OVERRIDE FINAL {
101 // noop
102 return true;
103 }
104
105 virtual void verified(epics::pvData::Status const & /*status*/) OVERRIDE FINAL {
106 // noop
107 }
108
109 virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer const & data) OVERRIDE FINAL {
110 // noop
111 }
112
113 // NOTE: this is not yet used for UDP
114 virtual void setByteOrder(int byteOrder) OVERRIDE FINAL {
115 // called from receive thread... or before processing
116 _receiveBuffer.setEndianess(byteOrder);
117
118 // sync?!
119 _sendBuffer.setEndianess(byteOrder);
120 }
121
122 virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender) OVERRIDE FINAL;
123
124 virtual void flushSendQueue() OVERRIDE FINAL;
125
126 void start();
127
128 virtual void close() OVERRIDE FINAL;
129
130 virtual void ensureData(std::size_t size) OVERRIDE FINAL;
131
132 virtual bool directSerialize(epics::pvData::ByteBuffer* /*existingBuffer*/, const char* /*toSerialize*/,
133 std::size_t /*elementCount*/, std::size_t /*elementSize*/) OVERRIDE FINAL
134 {
135 return false;
136 }
137
138 virtual bool directDeserialize(epics::pvData::ByteBuffer* /*existingBuffer*/, char* /*deserializeTo*/,
139 std::size_t /*elementCount*/, std::size_t /*elementSize*/) OVERRIDE FINAL
140 {
141 return false;
142 }
143
144 virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity, epics::pvData::int32 payloadSize = 0) OVERRIDE FINAL;
145 virtual void endMessage() OVERRIDE FINAL;
146
147 virtual void flush(bool /*lastMessageCompleted*/) OVERRIDE FINAL {
148 // noop since all UDP requests are sent immediately
149 }
150
151 virtual void setRecipient(const osiSockAddr& sendTo) OVERRIDE FINAL {
152 _sendToEnabled = true;
153 _sendTo = sendTo;
154 }
155
156 void setLocalMulticastAddress(const osiSockAddr& sendTo) {
157 _localMulticastAddressEnabled = true;
158 _localMulticastAddress = sendTo;
159 }
160
161 bool hasLocalMulticastAddress() const {
162 return _localMulticastAddressEnabled;
163 }
164
165 const osiSockAddr& getLocalMulticastAddress() const {
166 return _localMulticastAddress;
167 }
168
169 virtual void flushSerializeBuffer() OVERRIDE FINAL {
170 // noop
171 }
172
173 virtual void ensureBuffer(std::size_t /*size*/) OVERRIDE FINAL {
174 // noop
175 }
176
177 virtual void cachedSerialize(
178 const std::tr1::shared_ptr<const epics::pvData::Field>& field, epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
179 {
180 // no cache
181 field->serialize(buffer, this);
182 }
183
184 virtual std::tr1::shared_ptr<const epics::pvData::Field>
185 cachedDeserialize(epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
186 {
187 // no cache
188 // TODO
189 return epics::pvData::getFieldCreate()->deserialize(buffer, this);
190 }
191
192 virtual bool acquire(std::tr1::shared_ptr<ClientChannelImpl> const & /*client*/) OVERRIDE FINAL
193 {
194 return false;
195 }
196
197 virtual void release(pvAccessID /*clientId*/) OVERRIDE FINAL {}
198
203 void setIgnoredAddresses(const InetAddrVector& addresses) {
204 _ignoredAddresses = addresses;
205 }
206
211 const InetAddrVector& getIgnoredAddresses() const {
212 return _ignoredAddresses;
213 }
214
219 void setTappedNIF(const InetAddrVector& addresses) {
220 _tappedNIF = addresses;
221 }
222
227 const InetAddrVector& getTappedNIF() const {
228 return _tappedNIF;
229 }
230
231 bool send(const char* buffer, size_t length, const osiSockAddr& address);
232
233 bool send(epics::pvData::ByteBuffer* buffer, const osiSockAddr& address);
234
235 bool send(epics::pvData::ByteBuffer* buffer, InetAddressType target = inetAddressType_all);
236
241 const InetAddrVector& getSendAddresses() {
242 return _sendAddresses;
243 }
244
249 const osiSockAddr* getBindAddress() const {
250 return &_bindAddress;
251 }
252
253 bool isBroadcastAddress(const osiSockAddr* address, const InetAddrVector& broadcastAddresses)
254 {
255 for (size_t i = 0; i < broadcastAddresses.size(); i++)
256 if (broadcastAddresses[i].ia.sin_addr.s_addr == address->ia.sin_addr.s_addr)
257 return true;
258 return false;
259 }
260
261 // consumes arguments
262 void setSendAddresses(InetAddrVector& addresses, std::vector<bool>& address_types) {
263 _sendAddresses.swap(addresses);
264 _isSendAddressUnicast.swap(address_types);
265 }
266
267 void join(const osiSockAddr & mcastAddr, const osiSockAddr & nifAddr);
268
269 void setMutlicastNIF(const osiSockAddr & nifAddr, bool loopback);
270
271protected:
272 AtomicBoolean _closed;
273
277 ResponseHandler::shared_pointer _responseHandler;
278
279 virtual void run() OVERRIDE FINAL;
280
281private:
282 bool processBuffer(Transport::shared_pointer const & transport, osiSockAddr& fromAddress, epics::pvData::ByteBuffer* receiveBuffer);
283
284 void close(bool waitForThreadToComplete);
285
286 // Context only used for logging in this class
287
291 SOCKET _channel;
292
293 /* When provided, this transport is used for replies (passed to handler)
294 * instead of *this. This feature is used in the situation where broadcast
295 * traffic is received on one socket, but a different socket must be used
296 * for unicast replies.
297 *
298 Transport::shared_pointer _replyTransport;
299 */
300
304 osiSockAddr _bindAddress;
305
309 osiSockAddr _remoteAddress;
310 std::string _remoteName;
311
315 InetAddrVector _sendAddresses;
316
317 std::vector<bool> _isSendAddressUnicast;
318
322 InetAddrVector _ignoredAddresses;
323
327 InetAddrVector _tappedNIF;
328
332 osiSockAddr _sendTo;
333 bool _sendToEnabled;
334
338 osiSockAddr _localMulticastAddress;
339 bool _localMulticastAddressEnabled;
340
344 epics::pvData::ByteBuffer _receiveBuffer;
345
349 epics::pvData::ByteBuffer _sendBuffer;
350
354 int _lastMessageStartPosition;
355
359 epics::pvData::Mutex _mutex;
360 epics::pvData::Mutex _sendMutex;
361
365 epics::auto_ptr<epicsThread> _thread;
366
367 epics::pvData::int8 _clientServerWithEndianFlag;
368
369};
370
371class BlockingUDPConnector{
372public:
373 POINTER_DEFINITIONS(BlockingUDPConnector);
374
375 BlockingUDPConnector(bool serverFlag) :_serverFlag(serverFlag) {}
376
380 BlockingUDPTransport::shared_pointer connect(
381 ResponseHandler::shared_pointer const & responseHandler,
382 osiSockAddr& bindAddress,
383 epics::pvData::int8 transportRevision);
384
385private:
386
390 bool _serverFlag;
391
392 EPICS_NOT_COPYABLE(BlockingUDPConnector)
393};
394
395typedef std::vector<BlockingUDPTransport::shared_pointer> BlockingUDPTransportVector;
396
397void initializeUDPTransports(
398 bool serverFlag,
399 BlockingUDPTransportVector& udpTransports,
400 const IfaceNodeVector& ifaceList,
401 const ResponseHandler::shared_pointer& responseHandler,
402 BlockingUDPTransport::shared_pointer& sendTransport,
403 epics::pvData::int32& listenPort,
404 bool autoAddressList,
405 const std::string& addressList,
406 const std::string& ignoreAddressList);
407
408
409}
410}
411
412#endif /* BLOCKINGUDP_H_ */
size_t start() const
valarray< size_t > size() const
constexpr void swap(vector &__x) noexcept
void setEndianess(int byteOrder)
std::size_t getSize() const
const epics::pvData::int16 PVA_DEFAULT_PRIORITY
Default priority (corresponds to POSIX SCHED_OTHER)
const FieldCreatePtr & getFieldCreate()
Copyright - See the COPYRIGHT that is included with this distribution.