17#include <epicsThread.h>
18#include <epicsVersion.h>
19#include <epicsAtomic.h>
21#include <pv/byteBuffer.h>
28#include <pv/pvaConstants.h>
30#include <pv/security.h>
31#include <pv/transportRegistry.h>
32#include <pv/introspectionRegistry.h>
33#include <pv/inetAddressUtil.h>
46# if __cplusplus>=201103L
53# if __cplusplus>=201103L
54# define OVERRIDE override
73 AtomicValue() :val(0) {}
74 inline T getAndSet(T newval)
79 oldval = epics::atomic::get(val);
80 }
while(epics::atomic::compareAndSwap(val, oldval, newval)!=oldval);
84 return epics::atomic::get(val);
89class AtomicValue<bool>
91 AtomicValue<int> realval;
93 inline bool getAndSet(
bool newval)
95 return this->realval.getAndSet(newval?1:0)!=0;
98 return !!this->realval.get();
111 explicit invalid_data_stream_exception(
118 explicit connection_closed_exception(
const std::string &s):
std::
runtime_error(s) {}
122enum ReadMode { NORMAL, SPLIT, SEGMENTED };
124enum WriteMode { PROCESS_SEND_QUEUE, WAIT_FOR_READY_SIGNAL };
127class epicsShareClass AbstractCodec :
128 public TransportSendControl,
133 static const std::size_t MAX_MESSAGE_PROCESS;
134 static const std::size_t MAX_MESSAGE_SEND;
135 static const std::size_t MAX_ENSURE_SIZE;
136 static const std::size_t MAX_ENSURE_DATA_SIZE;
137 static const std::size_t MAX_ENSURE_BUFFER_SIZE;
138 static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE;
142 size_t sendBufferSize,
143 size_t receiveBufferSize,
144 int32_t socketSendBufferSize,
145 bool blockingProcessQueue);
147 virtual void processControlMessage() = 0;
148 virtual void processApplicationMessage() = 0;
149 virtual const osiSockAddr* getLastReadBufferSocketAddress() = 0;
150 virtual void invalidDataStreamHandler() = 0;
151 virtual void readPollOne()=0;
152 virtual void writePollOne() = 0;
153 virtual void scheduleSend() = 0;
154 virtual void sendCompleted() = 0;
155 virtual bool terminated() = 0;
158 virtual bool isOpen() = 0;
161 virtual ~AbstractCodec()
165 virtual void ensureData(std::size_t
size) OVERRIDE FINAL;
166 virtual void startMessage(
168 std::size_t ensureCapacity = 0,
170 void putControlMessage(
171 epics::pvData::int8 command,
172 epics::pvData::int32 data);
173 virtual
void endMessage() OVERRIDE FINAL;
174 virtual
void ensureBuffer(
std::
size_t size) OVERRIDE FINAL;
175 virtual
void flushSerializeBuffer() OVERRIDE FINAL;
176 virtual
void flush(
bool lastMessageCompleted) OVERRIDE FINAL;
179 void processSendQueue();
180 virtual
void enqueueSendRequest(TransportSender::shared_pointer const & sender) OVERRIDE FINAL;
181 void enqueueSendRequest(TransportSender::shared_pointer const & sender,
182 std::
size_t requiredBufferSize);
183 void setSenderThread();
184 virtual
void setRecipient(osiSockAddr const & sendTo) OVERRIDE FINAL;
185 virtual
void setByteOrder(
int byteOrder) OVERRIDE FINAL;
187 static
std::
size_t alignedValue(
std::
size_t value,
std::
size_t alignment);
189 virtual
bool directSerialize(
190 epics::pvData::ByteBuffer * ,
192 std::
size_t ,
std::
size_t ) OVERRIDE;
195 virtual
bool directDeserialize(
epics::pvData::ByteBuffer * ,
197 std::
size_t ,
std::
size_t ) OVERRIDE;
199 bool sendQueueEmpty()
const {
200 return _sendQueue.empty();
204 epicsGuard<epicsMutex> G(_mutex);
205 int8_t myver = _clientServerFlag ? PVA_SERVER_PROTOCOL_REVISION : PVA_CLIENT_PROTOCOL_REVISION;
206 return myver < _version ? myver : _version;
211 virtual void sendBufferFull(
int tries) = 0;
213 void flushSendBuffer();
215 virtual void setRxTimeout(
bool ena) {}
221 int32_t _payloadSize;
225 epicsThreadId _senderThread;
226 WriteMode _writeMode;
232 fair_queue<TransportSender> _sendQueue;
236 void processHeader();
237 void processReadNormal();
238 void postProcessApplicationMessage();
239 void processReadSegmented();
240 bool readToBuffer(std::size_t requiredBytes,
bool persistent);
241 void endMessage(
bool hasMoreSegments);
243 epics::pvAccess::TransportSender::shared_pointer
const & sender);
245 std::size_t _storedPayloadSize;
246 std::size_t _storedPosition;
247 std::size_t _storedLimit;
248 std::size_t _startPosition;
250 const std::size_t _maxSendPayloadSize;
251 std::size_t _lastMessageStartPosition;
252 std::size_t _lastSegmentedMessageType;
253 int8_t _lastSegmentedMessageCommand;
254 std::size_t _nextMessagePayloadOffset;
262 mutable epics::pvData::Mutex _mutex;
266class BlockingTCPTransportCodec:
267 public AbstractCodec,
268 public AuthenticationPluginControl,
269 public std::tr1::enable_shared_from_this<BlockingTCPTransportCodec>
274 POINTER_DEFINITIONS(BlockingTCPTransportCodec);
276 static size_t num_instances;
278 BlockingTCPTransportCodec(
280 Context::shared_pointer
const & context,
282 ResponseHandler::shared_pointer
const & responseHandler,
283 size_t sendBufferSize,
284 size_t receiveBufferSize,
286 virtual ~BlockingTCPTransportCodec();
288 virtual void readPollOne() OVERRIDE FINAL;
289 virtual
void writePollOne() OVERRIDE FINAL;
290 virtual
void scheduleSend() OVERRIDE FINAL {}
291 virtual void sendCompleted() OVERRIDE FINAL {}
292 virtual void close() OVERRIDE FINAL;
293 virtual
void waitJoin() OVERRIDE FINAL;
294 virtual
bool terminated() OVERRIDE FINAL;
295 virtual
bool isOpen() OVERRIDE FINAL;
296 virtual
void start();
298 virtual
int read(
epics::pvData::ByteBuffer* dst) OVERRIDE FINAL;
299 virtual
int write(
epics::pvData::ByteBuffer* src) OVERRIDE FINAL;
300 virtual const osiSockAddr* getLastReadBufferSocketAddress() OVERRIDE FINAL {
301 return &_socketAddress;
303 virtual void invalidDataStreamHandler() OVERRIDE FINAL;
305 virtual
std::
string getType() const OVERRIDE FINAL {
306 return std::string(
"tcp");
309 virtual void processControlMessage() OVERRIDE FINAL {
310 if (_command == CMD_SET_ENDIANESS)
313 setByteOrder(_flags < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE);
318 virtual void processApplicationMessage() OVERRIDE FINAL {
319 _responseHandler->handleResponse(&_socketAddress, shared_from_this(),
320 _version, _command, _payloadSize, &_socketBuffer);
324 virtual const osiSockAddr& getRemoteAddress() const OVERRIDE FINAL {
325 return _socketAddress;
328 virtual const std::string& getRemoteName() const OVERRIDE FINAL {
333 virtual std::size_t getReceiveBufferSize() const OVERRIDE FINAL {
334 return _socketBuffer.
getSize();
343 virtual void setRemoteTransportReceiveBufferSize(
344 std::size_t remoteTransportReceiveBufferSize) OVERRIDE FINAL {
345 _remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize;
349 virtual void setRemoteTransportSocketReceiveBufferSize(
350 std::size_t socketReceiveBufferSize) OVERRIDE FINAL {
351 _remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize;
355 std::tr1::shared_ptr<const epics::pvData::Field>
362 virtual void cachedSerialize(
363 const std::tr1::shared_ptr<const epics::pvData::Field>& field,
366 _outgoingIR.
serialize(field, buffer,
this);
370 virtual void flushSendQueue() OVERRIDE FINAL { }
373 virtual bool isClosed() OVERRIDE FINAL {
379 Transport::shared_pointer thisSharedPtr = shared_from_this();
380 _context->getTransportRegistry()->install(thisSharedPtr);
389 virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer
const & data) OVERRIDE FINAL;
391 virtual void sendSecurityPluginMessage(epics::pvData::PVStructure::const_shared_pointer
const & data) OVERRIDE FINAL;
394 void receiveThread();
398 virtual void setRxTimeout(
bool ena) OVERRIDE FINAL;
400 virtual void sendBufferFull(
int tries) OVERRIDE FINAL;
406 virtual void internalClose();
409 AtomicValue<bool> _isOpen;
411 const SOCKET _channel;
413 osiSockAddr _socketAddress;
414 std::string _socketName;
416 Context::shared_pointer _context;
418 IntrospectionRegistry _incomingIR;
419 IntrospectionRegistry _outgoingIR;
422 std::string _authSessionName;
423 AuthenticationSession::shared_pointer _authSession;
426 PeerInfo::const_shared_pointer _peerInfo;
430 ResponseHandler::shared_pointer _responseHandler;
431 size_t _remoteTransportReceiveBufferSize;
439class BlockingServerTCPTransportCodec :
440 public BlockingTCPTransportCodec,
441 public TransportSender {
444 POINTER_DEFINITIONS(BlockingServerTCPTransportCodec);
447 BlockingServerTCPTransportCodec(
448 Context::shared_pointer
const & context,
450 ResponseHandler::shared_pointer
const & responseHandler,
451 int32_t sendBufferSize,
452 int32_t receiveBufferSize );
455 static shared_pointer create(
456 Context::shared_pointer
const & context,
458 ResponseHandler::shared_pointer
const & responseHandler,
460 int receiveBufferSize)
462 shared_pointer thisPointer(
463 new BlockingServerTCPTransportCodec(
464 context, channel, responseHandler,
465 sendBufferSize, receiveBufferSize)
467 thisPointer->activate();
473 virtual bool acquire(std::tr1::shared_ptr<ClientChannelImpl>
const & ) OVERRIDE FINAL
478 virtual void release(pvAccessID ) OVERRIDE FINAL {}
480 pvAccessID preallocateChannelSID();
482 void depreallocateChannelSID(pvAccessID ) {}
484 void registerChannel(
486 std::tr1::shared_ptr<ServerChannel>
const & channel);
488 void unregisterChannel(pvAccessID sid);
490 std::tr1::shared_ptr<ServerChannel> getChannel(pvAccessID sid);
492 void getChannels(
std::vector<std::tr1::shared_ptr<ServerChannel> >& channels)
const;
494 size_t getChannelCount()
const;
498 TransportSender::shared_pointer transportSender =
499 std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
500 enqueueSendRequest(transportSender);
502 bool verifiedStatus = BlockingTCPTransportCodec::verify(timeoutMs);
504 enqueueSendRequest(transportSender);
506 return verifiedStatus;
511 epicsGuard<epicsMutex> G(_mutex);
512 _verificationStatus = status;
514 BlockingTCPTransportCodec::verified(status);
517 void authNZInitialize(
const std::string& securityPluginName,
518 const epics::pvData::PVStructure::shared_pointer& data);
521 const std::tr1::shared_ptr<PeerInfo>& peer) OVERRIDE FINAL;
524 TransportSendControl* control) OVERRIDE FINAL;
526 virtual ~BlockingServerTCPTransportCodec() OVERRIDE FINAL;
530 void destroyAllChannels();
531 virtual
void internalClose() OVERRIDE FINAL;
538 pvAccessID _lastChannelSID;
540 typedef
std::map<pvAccessID,
std::tr1::shared_ptr<ServerChannel> > _channels_t;
544 _channels_t _channels;
546 mutable
epics::pvData::Mutex _channelsMutex;
548 epics::pvData::Status _verificationStatus;
550 bool _verifyOrVerified;
552 std::vector<
std::
string> advertisedAuthPlugins;
556class BlockingClientTCPTransportCodec :
557 public BlockingTCPTransportCodec,
558 public TransportSender,
559 public
epics::pvData::TimerCallback {
562 POINTER_DEFINITIONS(BlockingClientTCPTransportCodec);
565 BlockingClientTCPTransportCodec(
566 Context::shared_pointer
const & context,
568 ResponseHandler::shared_pointer
const & responseHandler,
569 int32_t sendBufferSize,
570 int32_t receiveBufferSize,
571 std::tr1::shared_ptr<ClientChannelImpl>
const & client,
573 float heartbeatInterval,
577 static shared_pointer create(
578 Context::shared_pointer
const & context,
580 ResponseHandler::shared_pointer
const & responseHandler,
581 int32_t sendBufferSize,
582 int32_t receiveBufferSize,
583 std::tr1::shared_ptr<ClientChannelImpl>
const & client,
584 int8_t remoteTransportRevision,
585 float heartbeatInterval,
588 shared_pointer thisPointer(
589 new BlockingClientTCPTransportCodec(
590 context, channel, responseHandler,
591 sendBufferSize, receiveBufferSize,
592 client, remoteTransportRevision,
593 heartbeatInterval, priority)
595 thisPointer->activate();
601 virtual void start() OVERRIDE FINAL;
603 virtual ~BlockingClientTCPTransportCodec() OVERRIDE FINAL;
605 virtual
void timerStopped() OVERRIDE FINAL {
609 virtual void callback() OVERRIDE FINAL;
611 virtual
bool acquire(
std::tr1::shared_ptr<ClientChannelImpl> const & client) OVERRIDE FINAL;
613 virtual
void release(pvAccessID clientId) OVERRIDE FINAL;
615 virtual
void send(
epics::pvData::ByteBuffer* buffer,
616 TransportSendControl* control) OVERRIDE FINAL;
618 void authNZInitialize(const
std::vector<
std::
string>& offeredSecurityPlugins);
620 virtual
void authenticationCompleted(
epics::pvData::Status const & status,
621 const
std::tr1::shared_ptr<PeerInfo>& peer) OVERRIDE FINAL;
623 virtual
void verified(
epics::pvData::Status const & status) OVERRIDE FINAL;
626 virtual
void internalClose() OVERRIDE FINAL;
634 typedef
std::map<pvAccessID,
std::tr1::weak_ptr<ClientChannelImpl> > TransportClientMap_t;
635 TransportClientMap_t _owners;
640 const
double _connectionTimeout;
650 void closedNotifyClients();
valarray< size_t > size() const
runtime_error(const string &__arg) _GLIBCXX_TXN_SAFE
epics::pvData::FieldConstPtr deserialize(epics::pvData::ByteBuffer *buffer, epics::pvData::DeserializableControl *control)
Deserializes introspection interface.
void serialize(epics::pvData::FieldConstPtr const &field, epics::pvData::ByteBuffer *buffer, epics::pvData::SerializableControl *control)
Serializes introspection interface.
std::size_t getSize() const
@ read
Read access is allowed but write access is not allowed.
Copyright - See the COPYRIGHT that is included with this distribution.