16 #include <epicsTime.h>
17 #include <epicsThread.h>
18 #include <epicsVersion.h>
19 #include <epicsAtomic.h>
21 #include <pv/byteBuffer.h>
22 #include <pv/pvType.h>
26 #include <pv/likely.h>
28 #include <pv/pvaConstants.h>
29 #include <pv/remote.h>
30 #include <pv/security.h>
31 #include <pv/transportRegistry.h>
32 #include <pv/introspectionRegistry.h>
33 #include <pv/inetAddressUtil.h>
46 # if __cplusplus>=201103L
53 # if __cplusplus>=201103L
54 # define OVERRIDE override
73 AtomicValue() :val(0) {}
74 inline T getAndSet(T newval)
80 }
while(epics::atomic::compareAndSwap(val, oldval, newval)!=oldval);
89 class AtomicValue<bool>
91 AtomicValue<int> realval;
93 inline bool getAndSet(
bool newval)
95 return this->realval.getAndSet(newval?1:0)!=0;
98 return !!this->realval.get();
111 explicit invalid_data_stream_exception(
122 enum ReadMode { NORMAL, SPLIT, SEGMENTED };
124 enum WriteMode { PROCESS_SEND_QUEUE, WAIT_FOR_READY_SIGNAL };
127 class epicsShareClass AbstractCodec :
128 public TransportSendControl,
133 static const std::size_t MAX_MESSAGE_PROCESS;
134 static const std::size_t MAX_MESSAGE_SEND;
135 static const std::size_t MAX_ENSURE_SIZE;
136 static const std::size_t MAX_ENSURE_DATA_SIZE;
137 static const std::size_t MAX_ENSURE_BUFFER_SIZE;
138 static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE;
142 size_t sendBufferSize,
143 size_t receiveBufferSize,
144 int32_t socketSendBufferSize,
145 bool blockingProcessQueue);
147 virtual void processControlMessage() = 0;
148 virtual void processApplicationMessage() = 0;
149 virtual const osiSockAddr* getLastReadBufferSocketAddress() = 0;
150 virtual void invalidDataStreamHandler() = 0;
151 virtual void readPollOne()=0;
152 virtual void writePollOne() = 0;
153 virtual void scheduleSend() = 0;
154 virtual void sendCompleted() = 0;
155 virtual bool terminated() = 0;
156 virtual int write(epics::pvData::ByteBuffer* src) = 0;
157 virtual int read(epics::pvData::ByteBuffer* dst) = 0;
158 virtual bool isOpen() = 0;
161 virtual ~AbstractCodec()
165 virtual void ensureData(std::size_t
size) OVERRIDE FINAL;
166 virtual void startMessage(
167 epics::pvData::int8 command,
168 std::size_t ensureCapacity = 0,
169 epics::pvData::int32 payloadSize = 0) OVERRIDE FINAL;
170 void putControlMessage(
171 epics::pvData::int8 command,
172 epics::pvData::int32 data);
173 virtual
void endMessage() OVERRIDE FINAL;
174 virtual
void ensureBuffer(std::
size_t size) OVERRIDE FINAL;
175 virtual
void flushSerializeBuffer() OVERRIDE FINAL;
176 virtual
void flush(
bool lastMessageCompleted) OVERRIDE FINAL;
179 void processSendQueue();
180 virtual
void enqueueSendRequest(TransportSender::shared_pointer const & sender) OVERRIDE FINAL;
181 void enqueueSendRequest(TransportSender::shared_pointer const & sender,
182 std::
size_t requiredBufferSize);
183 void setSenderThread();
184 virtual
void setRecipient(osiSockAddr const & sendTo) OVERRIDE FINAL;
185 virtual
void setByteOrder(
int byteOrder) OVERRIDE FINAL;
187 static std::
size_t alignedValue(std::
size_t value, std::
size_t alignment);
189 virtual
bool directSerialize(
190 epics::pvData::ByteBuffer * ,
192 std::
size_t , std::
size_t ) OVERRIDE;
195 virtual
bool directDeserialize(epics::pvData::ByteBuffer * ,
197 std::
size_t , std::
size_t ) OVERRIDE;
199 bool sendQueueEmpty()
const {
200 return _sendQueue.empty();
203 epics::pvData::int8 getRevision()
const {
204 epicsGuard<epicsMutex> G(_mutex);
205 int8_t myver = _clientServerFlag ? PVA_SERVER_PROTOCOL_REVISION : PVA_CLIENT_PROTOCOL_REVISION;
206 return myver < _version ? myver : _version;
211 virtual void sendBufferFull(
int tries) = 0;
212 void send(epics::pvData::ByteBuffer *buffer);
213 void flushSendBuffer();
215 virtual void setRxTimeout(
bool ena) {}
221 int32_t _payloadSize;
222 epics::pvData::int32 _remoteTransportSocketReceiveBufferSize;
225 epicsThreadId _senderThread;
226 WriteMode _writeMode;
229 epics::pvData::ByteBuffer _socketBuffer;
230 epics::pvData::ByteBuffer _sendBuffer;
232 fair_queue<TransportSender> _sendQueue;
236 void processHeader();
237 void processReadNormal();
238 void postProcessApplicationMessage();
239 void processReadSegmented();
240 bool readToBuffer(std::size_t requiredBytes,
bool persistent);
241 void endMessage(
bool hasMoreSegments);
243 epics::pvAccess::TransportSender::shared_pointer
const & sender);
245 std::size_t _storedPayloadSize;
246 std::size_t _storedPosition;
247 std::size_t _storedLimit;
248 std::size_t _startPosition;
250 const std::size_t _maxSendPayloadSize;
251 std::size_t _lastMessageStartPosition;
252 std::size_t _lastSegmentedMessageType;
253 int8_t _lastSegmentedMessageCommand;
254 std::size_t _nextMessagePayloadOffset;
256 epics::pvData::int8 _byteOrderFlag;
258 const epics::pvData::int8 _clientServerFlag;
262 mutable epics::pvData::Mutex _mutex;
266 class BlockingTCPTransportCodec:
267 public AbstractCodec,
268 public AuthenticationPluginControl,
269 public std::tr1::enable_shared_from_this<BlockingTCPTransportCodec>
274 POINTER_DEFINITIONS(BlockingTCPTransportCodec);
276 static size_t num_instances;
278 BlockingTCPTransportCodec(
280 Context::shared_pointer
const & context,
282 ResponseHandler::shared_pointer
const & responseHandler,
283 size_t sendBufferSize,
284 size_t receiveBufferSize,
285 epics::pvData::int16 priority);
286 virtual ~BlockingTCPTransportCodec();
288 virtual void readPollOne() OVERRIDE FINAL;
289 virtual
void writePollOne() OVERRIDE FINAL;
290 virtual
void scheduleSend() OVERRIDE FINAL {}
291 virtual void sendCompleted() OVERRIDE FINAL {}
292 virtual void close() OVERRIDE FINAL;
293 virtual
void waitJoin() OVERRIDE FINAL;
294 virtual
bool terminated() OVERRIDE FINAL;
295 virtual
bool isOpen() OVERRIDE FINAL;
296 virtual
void start();
298 virtual
int read(epics::pvData::ByteBuffer* dst) OVERRIDE FINAL;
299 virtual
int write(epics::pvData::ByteBuffer* src) OVERRIDE FINAL;
300 virtual const osiSockAddr* getLastReadBufferSocketAddress() OVERRIDE FINAL {
301 return &_socketAddress;
303 virtual void invalidDataStreamHandler() OVERRIDE FINAL;
305 virtual std::
string getType() const OVERRIDE FINAL {
309 virtual void processControlMessage() OVERRIDE FINAL {
310 if (_command == CMD_SET_ENDIANESS)
313 setByteOrder(_flags < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE);
318 virtual void processApplicationMessage() OVERRIDE FINAL {
319 _responseHandler->handleResponse(&_socketAddress, shared_from_this(),
320 _version, _command, _payloadSize, &_socketBuffer);
324 virtual const osiSockAddr& getRemoteAddress() const OVERRIDE FINAL {
325 return _socketAddress;
328 virtual const std::string& getRemoteName() const OVERRIDE FINAL {
333 virtual std::size_t getReceiveBufferSize() const OVERRIDE FINAL {
334 return _socketBuffer.getSize();
338 virtual epics::pvData::int16 getPriority() const OVERRIDE FINAL {
343 virtual void setRemoteTransportReceiveBufferSize(
344 std::size_t remoteTransportReceiveBufferSize) OVERRIDE FINAL {
345 _remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize;
349 virtual void setRemoteTransportSocketReceiveBufferSize(
350 std::size_t socketReceiveBufferSize) OVERRIDE FINAL {
351 _remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize;
355 std::tr1::shared_ptr<const epics::pvData::Field>
356 virtual cachedDeserialize(epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
358 return _incomingIR.deserialize(buffer,
this);
362 virtual void cachedSerialize(
363 const std::tr1::shared_ptr<const epics::pvData::Field>& field,
364 epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
366 _outgoingIR.serialize(field, buffer,
this);
370 virtual void flushSendQueue() OVERRIDE FINAL { }
373 virtual bool isClosed() OVERRIDE FINAL {
379 Transport::shared_pointer thisSharedPtr = shared_from_this();
380 _context->getTransportRegistry()->install(thisSharedPtr);
385 virtual bool verify(epics::pvData::int32 timeoutMs) OVERRIDE;
387 virtual void verified(epics::pvData::Status
const & status) OVERRIDE;
389 virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer
const & data) OVERRIDE FINAL;
391 virtual void sendSecurityPluginMessage(epics::pvData::PVStructure::const_shared_pointer
const & data) OVERRIDE FINAL;
394 void receiveThread();
398 virtual void setRxTimeout(
bool ena) OVERRIDE FINAL;
400 virtual void sendBufferFull(
int tries) OVERRIDE FINAL;
406 virtual void internalClose();
409 AtomicValue<bool> _isOpen;
410 epics::pvData::Thread _readThread, _sendThread;
411 const SOCKET _channel;
413 osiSockAddr _socketAddress;
416 Context::shared_pointer _context;
418 IntrospectionRegistry _incomingIR;
419 IntrospectionRegistry _outgoingIR;
423 AuthenticationSession::shared_pointer _authSession;
426 PeerInfo::const_shared_pointer _peerInfo;
430 ResponseHandler::shared_pointer _responseHandler;
431 size_t _remoteTransportReceiveBufferSize;
432 epics::pvData::int16 _priority;
436 epics::pvData::Event _verifiedEvent;
439 class BlockingServerTCPTransportCodec :
440 public BlockingTCPTransportCodec,
441 public TransportSender {
444 POINTER_DEFINITIONS(BlockingServerTCPTransportCodec);
447 BlockingServerTCPTransportCodec(
448 Context::shared_pointer
const & context,
450 ResponseHandler::shared_pointer
const & responseHandler,
451 int32_t sendBufferSize,
452 int32_t receiveBufferSize );
455 static shared_pointer create(
456 Context::shared_pointer
const & context,
458 ResponseHandler::shared_pointer
const & responseHandler,
460 int receiveBufferSize)
462 shared_pointer thisPointer(
463 new BlockingServerTCPTransportCodec(
464 context, channel, responseHandler,
465 sendBufferSize, receiveBufferSize)
467 thisPointer->activate();
473 virtual bool acquire(std::tr1::shared_ptr<ClientChannelImpl>
const & ) OVERRIDE FINAL
478 virtual void release(pvAccessID ) OVERRIDE FINAL {}
480 pvAccessID preallocateChannelSID();
482 void depreallocateChannelSID(pvAccessID ) {}
484 void registerChannel(
486 std::tr1::shared_ptr<ServerChannel>
const & channel);
488 void unregisterChannel(pvAccessID sid);
490 std::tr1::shared_ptr<ServerChannel> getChannel(pvAccessID sid);
492 void getChannels(
std::vector<std::tr1::shared_ptr<ServerChannel> >& channels)
const;
494 size_t getChannelCount()
const;
496 virtual bool verify(epics::pvData::int32 timeoutMs) OVERRIDE FINAL {
498 TransportSender::shared_pointer transportSender =
499 std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
500 enqueueSendRequest(transportSender);
502 bool verifiedStatus = BlockingTCPTransportCodec::verify(timeoutMs);
504 enqueueSendRequest(transportSender);
506 return verifiedStatus;
509 virtual void verified(epics::pvData::Status
const & status) OVERRIDE FINAL {
511 epicsGuard<epicsMutex> G(_mutex);
512 _verificationStatus = status;
514 BlockingTCPTransportCodec::verified(status);
517 void authNZInitialize(
const std::string& securityPluginName,
518 const epics::pvData::PVStructure::shared_pointer& data);
520 virtual void authenticationCompleted(epics::pvData::Status
const & status,
521 const std::tr1::shared_ptr<PeerInfo>& peer) OVERRIDE FINAL;
523 virtual void send(epics::pvData::ByteBuffer* buffer,
524 TransportSendControl* control) OVERRIDE FINAL;
526 virtual ~BlockingServerTCPTransportCodec() OVERRIDE FINAL;
530 void destroyAllChannels();
531 virtual
void internalClose() OVERRIDE FINAL;
538 pvAccessID _lastChannelSID;
540 typedef std::map<pvAccessID, std::tr1::shared_ptr<ServerChannel> > _channels_t;
544 _channels_t _channels;
546 mutable epics::pvData::Mutex _channelsMutex;
548 epics::pvData::Status _verificationStatus;
550 bool _verifyOrVerified;
552 std::vector<std::
string> advertisedAuthPlugins;
556 class BlockingClientTCPTransportCodec :
557 public BlockingTCPTransportCodec,
558 public TransportSender,
559 public epics::pvData::TimerCallback {
562 POINTER_DEFINITIONS(BlockingClientTCPTransportCodec);
565 BlockingClientTCPTransportCodec(
566 Context::shared_pointer
const & context,
568 ResponseHandler::shared_pointer
const & responseHandler,
569 int32_t sendBufferSize,
570 int32_t receiveBufferSize,
571 std::tr1::shared_ptr<ClientChannelImpl>
const & client,
572 epics::pvData::int8 remoteTransportRevision,
573 float heartbeatInterval,
577 static shared_pointer create(
578 Context::shared_pointer
const & context,
580 ResponseHandler::shared_pointer
const & responseHandler,
581 int32_t sendBufferSize,
582 int32_t receiveBufferSize,
583 std::tr1::shared_ptr<ClientChannelImpl>
const & client,
584 int8_t remoteTransportRevision,
585 float heartbeatInterval,
588 shared_pointer thisPointer(
589 new BlockingClientTCPTransportCodec(
590 context, channel, responseHandler,
591 sendBufferSize, receiveBufferSize,
592 client, remoteTransportRevision,
593 heartbeatInterval, priority)
595 thisPointer->activate();
601 virtual void start() OVERRIDE FINAL;
603 virtual ~BlockingClientTCPTransportCodec() OVERRIDE FINAL;
605 virtual
void timerStopped() OVERRIDE FINAL {
609 virtual void callback() OVERRIDE FINAL;
611 virtual
bool acquire(std::tr1::shared_ptr<ClientChannelImpl> const & client) OVERRIDE FINAL;
613 virtual
void release(pvAccessID clientId) OVERRIDE FINAL;
615 virtual
void send(epics::pvData::ByteBuffer* buffer,
616 TransportSendControl* control) OVERRIDE FINAL;
618 void authNZInitialize(const std::vector<std::
string>& offeredSecurityPlugins);
620 virtual
void authenticationCompleted(epics::pvData::Status const & status,
621 const std::tr1::shared_ptr<PeerInfo>& peer) OVERRIDE FINAL;
623 virtual
void verified(epics::pvData::Status const & status) OVERRIDE FINAL;
626 virtual
void internalClose() OVERRIDE FINAL;
634 typedef std::map<pvAccessID, std::tr1::weak_ptr<ClientChannelImpl> > TransportClientMap_t;
635 TransportClientMap_t _owners;
640 const
double _connectionTimeout;
650 void closedNotifyClients();
Read access is allowed but write access is not allowed.
constexpr __tuple_element_t< __i, tuple< _Elements... > > & get(tuple< _Elements... > &__t) noexcept
runtime_error(const string &__arg) _GLIBCXX_TXN_SAFE
basic_string< char > string