pvAccessCPP  7.1.7
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
codec.h
1 
7 #ifndef CODEC_H_
8 #define CODEC_H_
9 
10 #include <set>
11 #include <map>
12 #include <deque>
13 
14 #include <shareLib.h>
15 #include <osiSock.h>
16 #include <epicsTime.h>
17 #include <epicsThread.h>
18 #include <epicsVersion.h>
19 #include <epicsAtomic.h>
20 
21 #include <pv/byteBuffer.h>
22 #include <pv/pvType.h>
23 #include <pv/lock.h>
24 #include <pv/timer.h>
25 #include <pv/event.h>
26 #include <pv/likely.h>
27 
28 #include <pv/pvaConstants.h>
29 #include <pv/remote.h>
30 #include <pv/security.h>
31 #include <pv/transportRegistry.h>
32 #include <pv/introspectionRegistry.h>
33 #include <pv/inetAddressUtil.h>
34 
35 /* C++11 keywords
36  @code
37  struct Base {
38  virtual void foo();
39  };
40  struct Class : public Base {
41  virtual void foo() OVERRIDE FINAL FINAL;
42  };
43  @endcode
44  */
45 #ifndef FINAL
46 # if __cplusplus>=201103L
47 # define FINAL final
48 # else
49 # define FINAL
50 # endif
51 #endif
52 #ifndef OVERRIDE
53 # if __cplusplus>=201103L
54 # define OVERRIDE override
55 # else
56 # define OVERRIDE
57 # endif
58 #endif
59 
60 
61 namespace epics {
62 namespace pvAccess {
63 
64 class ServerChannel;
65 
66 namespace detail {
67 
68 template<typename T>
69 class AtomicValue
70 {
71  T val;
72 public:
73  AtomicValue() :val(0) {}
74  inline T getAndSet(T newval)
75  {
76  int oldval;
77  // epicsAtomic doesn't have unconditional swap
78  do {
79  oldval = epics::atomic::get(val);
80  } while(epics::atomic::compareAndSwap(val, oldval, newval)!=oldval);
81  return oldval;
82  }
83  inline T get() {
84  return epics::atomic::get(val);
85  }
86 };
87 // treat bool as int
88 template<>
89 class AtomicValue<bool>
90 {
91  AtomicValue<int> realval;
92 public:
93  inline bool getAndSet(bool newval)
94  {
95  return this->realval.getAndSet(newval?1:0)!=0;
96  }
97  inline bool get() {
98  return !!this->realval.get();
99  }
100 };
101 
102 
103 class io_exception: public std::runtime_error {
104 public:
105  explicit io_exception(const std::string &s): std::runtime_error(s) {}
106 };
107 
108 
109 class invalid_data_stream_exception: public std::runtime_error {
110 public:
111  explicit invalid_data_stream_exception(
112  const std::string &s): std::runtime_error(s) {}
113 };
114 
115 
116 class connection_closed_exception: public std::runtime_error {
117 public:
118  explicit connection_closed_exception(const std::string &s): std::runtime_error(s) {}
119 };
120 
121 
122 enum ReadMode { NORMAL, SPLIT, SEGMENTED };
123 
124 enum WriteMode { PROCESS_SEND_QUEUE, WAIT_FOR_READY_SIGNAL };
125 
126 
127 class epicsShareClass AbstractCodec :
128  public TransportSendControl,
129  public Transport
130 {
131 public:
132 
133  static const std::size_t MAX_MESSAGE_PROCESS;
134  static const std::size_t MAX_MESSAGE_SEND;
135  static const std::size_t MAX_ENSURE_SIZE;
136  static const std::size_t MAX_ENSURE_DATA_SIZE;
137  static const std::size_t MAX_ENSURE_BUFFER_SIZE;
138  static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE;
139 
140  AbstractCodec(
141  bool serverFlag,
142  size_t sendBufferSize,
143  size_t receiveBufferSize,
144  int32_t socketSendBufferSize,
145  bool blockingProcessQueue);
146 
147  virtual void processControlMessage() = 0;
148  virtual void processApplicationMessage() = 0;
149  virtual const osiSockAddr* getLastReadBufferSocketAddress() = 0;
150  virtual void invalidDataStreamHandler() = 0;
151  virtual void readPollOne()=0;
152  virtual void writePollOne() = 0;
153  virtual void scheduleSend() = 0;
154  virtual void sendCompleted() = 0;
155  virtual bool terminated() = 0;
156  virtual int write(epics::pvData::ByteBuffer* src) = 0;
157  virtual int read(epics::pvData::ByteBuffer* dst) = 0;
158  virtual bool isOpen() = 0;
159 
160 
161  virtual ~AbstractCodec()
162  {
163  }
164 
165  virtual void ensureData(std::size_t size) OVERRIDE FINAL;
166  virtual void startMessage(
167  epics::pvData::int8 command,
168  std::size_t ensureCapacity = 0,
169  epics::pvData::int32 payloadSize = 0) OVERRIDE FINAL;
170  void putControlMessage(
171  epics::pvData::int8 command,
172  epics::pvData::int32 data);
173  virtual void endMessage() OVERRIDE FINAL;
174  virtual void ensureBuffer(std::size_t size) OVERRIDE FINAL;
175  virtual void flushSerializeBuffer() OVERRIDE FINAL;
176  virtual void flush(bool lastMessageCompleted) OVERRIDE FINAL;
177  void processWrite();
178  void processRead();
179  void processSendQueue();
180  virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender) OVERRIDE FINAL;
181  void enqueueSendRequest(TransportSender::shared_pointer const & sender,
182  std::size_t requiredBufferSize);
183  void setSenderThread();
184  virtual void setRecipient(osiSockAddr const & sendTo) OVERRIDE FINAL;
185  virtual void setByteOrder(int byteOrder) OVERRIDE FINAL;
186 
187  static std::size_t alignedValue(std::size_t value, std::size_t alignment);
188 
189  virtual bool directSerialize(
190  epics::pvData::ByteBuffer * /*existingBuffer*/,
191  const char* /*toSerialize*/,
192  std::size_t /*elementCount*/, std::size_t /*elementSize*/) OVERRIDE;
193 
194 
195  virtual bool directDeserialize(epics::pvData::ByteBuffer * /*existingBuffer*/,
196  char* /*deserializeTo*/,
197  std::size_t /*elementCount*/, std::size_t /*elementSize*/) OVERRIDE;
198 
199  bool sendQueueEmpty() const {
200  return _sendQueue.empty();
201  }
202 
203  epics::pvData::int8 getRevision() const {
204  epicsGuard<epicsMutex> G(_mutex);
205  int8_t myver = _clientServerFlag ? PVA_SERVER_PROTOCOL_REVISION : PVA_CLIENT_PROTOCOL_REVISION;
206  return myver < _version ? myver : _version;
207  }
208 
209 protected:
210 
211  virtual void sendBufferFull(int tries) = 0;
212  void send(epics::pvData::ByteBuffer *buffer);
213  void flushSendBuffer();
214 
215  virtual void setRxTimeout(bool ena) {}
216 
217  ReadMode _readMode;
218  int8_t _version;
219  int8_t _flags;
220  int8_t _command;
221  int32_t _payloadSize; // TODO why not size_t?
222  epics::pvData::int32 _remoteTransportSocketReceiveBufferSize;
223  //TODO initialize union
224  osiSockAddr _sendTo;
225  epicsThreadId _senderThread;
226  WriteMode _writeMode;
227  bool _writeOpReady;
228 
229  epics::pvData::ByteBuffer _socketBuffer;
230  epics::pvData::ByteBuffer _sendBuffer;
231 
232  fair_queue<TransportSender> _sendQueue;
233 
234 private:
235 
236  void processHeader();
237  void processReadNormal();
238  void postProcessApplicationMessage();
239  void processReadSegmented();
240  bool readToBuffer(std::size_t requiredBytes, bool persistent);
241  void endMessage(bool hasMoreSegments);
242  void processSender(
243  epics::pvAccess::TransportSender::shared_pointer const & sender);
244 
245  std::size_t _storedPayloadSize;
246  std::size_t _storedPosition;
247  std::size_t _storedLimit;
248  std::size_t _startPosition;
249 
250  const std::size_t _maxSendPayloadSize;
251  std::size_t _lastMessageStartPosition;
252  std::size_t _lastSegmentedMessageType;
253  int8_t _lastSegmentedMessageCommand;
254  std::size_t _nextMessagePayloadOffset;
255 
256  epics::pvData::int8 _byteOrderFlag;
257 protected:
258  const epics::pvData::int8 _clientServerFlag;
259 private:
260 
261 public:
262  mutable epics::pvData::Mutex _mutex;
263 };
264 
265 
266 class BlockingTCPTransportCodec:
267  public AbstractCodec,
268  public AuthenticationPluginControl,
269  public std::tr1::enable_shared_from_this<BlockingTCPTransportCodec>
270 {
271 
272 public:
273 
274  POINTER_DEFINITIONS(BlockingTCPTransportCodec);
275 
276  static size_t num_instances;
277 
278  BlockingTCPTransportCodec(
279  bool serverFlag,
280  Context::shared_pointer const & context,
281  SOCKET channel,
282  ResponseHandler::shared_pointer const & responseHandler,
283  size_t sendBufferSize,
284  size_t receiveBufferSize,
285  epics::pvData::int16 priority);
286  virtual ~BlockingTCPTransportCodec();
287 
288  virtual void readPollOne() OVERRIDE FINAL;
289  virtual void writePollOne() OVERRIDE FINAL;
290  virtual void scheduleSend() OVERRIDE FINAL {}
291  virtual void sendCompleted() OVERRIDE FINAL {}
292  virtual void close() OVERRIDE FINAL;
293  virtual void waitJoin() OVERRIDE FINAL;
294  virtual bool terminated() OVERRIDE FINAL;
295  virtual bool isOpen() OVERRIDE FINAL;
296  virtual void start();
297 
298  virtual int read(epics::pvData::ByteBuffer* dst) OVERRIDE FINAL;
299  virtual int write(epics::pvData::ByteBuffer* src) OVERRIDE FINAL;
300  virtual const osiSockAddr* getLastReadBufferSocketAddress() OVERRIDE FINAL {
301  return &_socketAddress;
302  }
303  virtual void invalidDataStreamHandler() OVERRIDE FINAL;
304 
305  virtual std::string getType() const OVERRIDE FINAL {
306  return std::string("tcp");
307  }
308 
309  virtual void processControlMessage() OVERRIDE FINAL {
310  if (_command == CMD_SET_ENDIANESS)
311  {
312  // check 7-th bit
313  setByteOrder(_flags < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE);
314  }
315  }
316 
317 
318  virtual void processApplicationMessage() OVERRIDE FINAL {
319  _responseHandler->handleResponse(&_socketAddress, shared_from_this(),
320  _version, _command, _payloadSize, &_socketBuffer);
321  }
322 
323 
324  virtual const osiSockAddr& getRemoteAddress() const OVERRIDE FINAL {
325  return _socketAddress;
326  }
327 
328  virtual const std::string& getRemoteName() const OVERRIDE FINAL {
329  return _socketName;
330  }
331 
332 
333  virtual std::size_t getReceiveBufferSize() const OVERRIDE FINAL {
334  return _socketBuffer.getSize();
335  }
336 
337 
338  virtual epics::pvData::int16 getPriority() const OVERRIDE FINAL {
339  return _priority;
340  }
341 
342 
343  virtual void setRemoteTransportReceiveBufferSize(
344  std::size_t remoteTransportReceiveBufferSize) OVERRIDE FINAL {
345  _remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize;
346  }
347 
348 
349  virtual void setRemoteTransportSocketReceiveBufferSize(
350  std::size_t socketReceiveBufferSize) OVERRIDE FINAL {
351  _remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize;
352  }
353 
354 
355  std::tr1::shared_ptr<const epics::pvData::Field>
356  virtual cachedDeserialize(epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
357  {
358  return _incomingIR.deserialize(buffer, this);
359  }
360 
361 
362  virtual void cachedSerialize(
363  const std::tr1::shared_ptr<const epics::pvData::Field>& field,
364  epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
365  {
366  _outgoingIR.serialize(field, buffer, this);
367  }
368 
369 
370  virtual void flushSendQueue() OVERRIDE FINAL { }
371 
372 
373  virtual bool isClosed() OVERRIDE FINAL {
374  return !isOpen();
375  }
376 
377 
378  void activate() {
379  Transport::shared_pointer thisSharedPtr = shared_from_this();
380  _context->getTransportRegistry()->install(thisSharedPtr);
381 
382  start();
383  }
384 
385  virtual bool verify(epics::pvData::int32 timeoutMs) OVERRIDE;
386 
387  virtual void verified(epics::pvData::Status const & status) OVERRIDE;
388 
389  virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer const & data) OVERRIDE FINAL;
390 
391  virtual void sendSecurityPluginMessage(epics::pvData::PVStructure::const_shared_pointer const & data) OVERRIDE FINAL;
392 
393 private:
394  void receiveThread();
395  void sendThread();
396 
397 protected:
398  virtual void setRxTimeout(bool ena) OVERRIDE FINAL;
399 
400  virtual void sendBufferFull(int tries) OVERRIDE FINAL;
401 
406  virtual void internalClose();
407 
408 private:
409  AtomicValue<bool> _isOpen;
410  epics::pvData::Thread _readThread, _sendThread;
411  const SOCKET _channel;
412 protected:
413  osiSockAddr _socketAddress;
414  std::string _socketName;
415 protected:
416  Context::shared_pointer _context;
417 
418  IntrospectionRegistry _incomingIR;
419  IntrospectionRegistry _outgoingIR;
420 
421  // active authentication exchange, if any
422  std::string _authSessionName;
423  AuthenticationSession::shared_pointer _authSession;
424 public:
425  // final info, after authentication complete.
426  PeerInfo::const_shared_pointer _peerInfo;
427 
428 private:
429 
430  ResponseHandler::shared_pointer _responseHandler;
431  size_t _remoteTransportReceiveBufferSize;
432  epics::pvData::int16 _priority;
433 
434 protected:
435  bool _verified;
436  epics::pvData::Event _verifiedEvent;
437 };
438 
439 class BlockingServerTCPTransportCodec :
440  public BlockingTCPTransportCodec,
441  public TransportSender {
442 
443 public:
444  POINTER_DEFINITIONS(BlockingServerTCPTransportCodec);
445 
446 protected:
447  BlockingServerTCPTransportCodec(
448  Context::shared_pointer const & context,
449  SOCKET channel,
450  ResponseHandler::shared_pointer const & responseHandler,
451  int32_t sendBufferSize,
452  int32_t receiveBufferSize );
453 
454 public:
455  static shared_pointer create(
456  Context::shared_pointer const & context,
457  SOCKET channel,
458  ResponseHandler::shared_pointer const & responseHandler,
459  int sendBufferSize,
460  int receiveBufferSize)
461  {
462  shared_pointer thisPointer(
463  new BlockingServerTCPTransportCodec(
464  context, channel, responseHandler,
465  sendBufferSize, receiveBufferSize)
466  );
467  thisPointer->activate();
468  return thisPointer;
469  }
470 
471 public:
472 
473  virtual bool acquire(std::tr1::shared_ptr<ClientChannelImpl> const & /*client*/) OVERRIDE FINAL
474  {
475  return false;
476  }
477 
478  virtual void release(pvAccessID /*clientId*/) OVERRIDE FINAL {}
479 
480  pvAccessID preallocateChannelSID();
481 
482  void depreallocateChannelSID(pvAccessID /*sid*/) {}
483 
484  void registerChannel(
485  pvAccessID sid,
486  std::tr1::shared_ptr<ServerChannel> const & channel);
487 
488  void unregisterChannel(pvAccessID sid);
489 
490  std::tr1::shared_ptr<ServerChannel> getChannel(pvAccessID sid);
491 
492  void getChannels(std::vector<std::tr1::shared_ptr<ServerChannel> >& channels) const;
493 
494  size_t getChannelCount() const;
495 
496  virtual bool verify(epics::pvData::int32 timeoutMs) OVERRIDE FINAL {
497 
498  TransportSender::shared_pointer transportSender =
499  std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
500  enqueueSendRequest(transportSender);
501 
502  bool verifiedStatus = BlockingTCPTransportCodec::verify(timeoutMs);
503 
504  enqueueSendRequest(transportSender);
505 
506  return verifiedStatus;
507  }
508 
509  virtual void verified(epics::pvData::Status const & status) OVERRIDE FINAL {
510  {
511  epicsGuard<epicsMutex> G(_mutex);
512  _verificationStatus = status;
513  }
514  BlockingTCPTransportCodec::verified(status);
515  }
516 
517  void authNZInitialize(const std::string& securityPluginName,
518  const epics::pvData::PVStructure::shared_pointer& data);
519 
520  virtual void authenticationCompleted(epics::pvData::Status const & status,
521  const std::tr1::shared_ptr<PeerInfo>& peer) OVERRIDE FINAL;
522 
523  virtual void send(epics::pvData::ByteBuffer* buffer,
524  TransportSendControl* control) OVERRIDE FINAL;
525 
526  virtual ~BlockingServerTCPTransportCodec() OVERRIDE FINAL;
527 
528 protected:
529 
530  void destroyAllChannels();
531  virtual void internalClose() OVERRIDE FINAL;
532 
533 private:
534 
538  pvAccessID _lastChannelSID;
539 
540  typedef std::map<pvAccessID, std::tr1::shared_ptr<ServerChannel> > _channels_t;
544  _channels_t _channels;
545 
546  mutable epics::pvData::Mutex _channelsMutex;
547 
548  epics::pvData::Status _verificationStatus;
549 
550  bool _verifyOrVerified;
551 
552  std::vector<std::string> advertisedAuthPlugins;
553 
554 };
555 
556 class BlockingClientTCPTransportCodec :
557  public BlockingTCPTransportCodec,
558  public TransportSender,
559  public epics::pvData::TimerCallback {
560 
561 public:
562  POINTER_DEFINITIONS(BlockingClientTCPTransportCodec);
563 
564 protected:
565  BlockingClientTCPTransportCodec(
566  Context::shared_pointer const & context,
567  SOCKET channel,
568  ResponseHandler::shared_pointer const & responseHandler,
569  int32_t sendBufferSize,
570  int32_t receiveBufferSize,
571  std::tr1::shared_ptr<ClientChannelImpl> const & client,
572  epics::pvData::int8 remoteTransportRevision,
573  float heartbeatInterval,
574  int16_t priority);
575 
576 public:
577  static shared_pointer create(
578  Context::shared_pointer const & context,
579  SOCKET channel,
580  ResponseHandler::shared_pointer const & responseHandler,
581  int32_t sendBufferSize,
582  int32_t receiveBufferSize,
583  std::tr1::shared_ptr<ClientChannelImpl> const & client,
584  int8_t remoteTransportRevision,
585  float heartbeatInterval,
586  int16_t priority )
587  {
588  shared_pointer thisPointer(
589  new BlockingClientTCPTransportCodec(
590  context, channel, responseHandler,
591  sendBufferSize, receiveBufferSize,
592  client, remoteTransportRevision,
593  heartbeatInterval, priority)
594  );
595  thisPointer->activate();
596  return thisPointer;
597  }
598 
599 public:
600 
601  virtual void start() OVERRIDE FINAL;
602 
603  virtual ~BlockingClientTCPTransportCodec() OVERRIDE FINAL;
604 
605  virtual void timerStopped() OVERRIDE FINAL {
606  // noop
607  }
608 
609  virtual void callback() OVERRIDE FINAL;
610 
611  virtual bool acquire(std::tr1::shared_ptr<ClientChannelImpl> const & client) OVERRIDE FINAL;
612 
613  virtual void release(pvAccessID clientId) OVERRIDE FINAL;
614 
615  virtual void send(epics::pvData::ByteBuffer* buffer,
616  TransportSendControl* control) OVERRIDE FINAL;
617 
618  void authNZInitialize(const std::vector<std::string>& offeredSecurityPlugins);
619 
620  virtual void authenticationCompleted(epics::pvData::Status const & status,
621  const std::tr1::shared_ptr<PeerInfo>& peer) OVERRIDE FINAL;
622 
623  virtual void verified(epics::pvData::Status const & status) OVERRIDE FINAL;
624 protected:
625 
626  virtual void internalClose() OVERRIDE FINAL;
627 
628 private:
629 
633  // TODO consider using TR1 hash map
634  typedef std::map<pvAccessID, std::tr1::weak_ptr<ClientChannelImpl> > TransportClientMap_t;
635  TransportClientMap_t _owners;
636 
640  const double _connectionTimeout;
641 
642  bool _verifyOrEcho;
643 
644  // are we queued to send verify or echo?
645  bool sendQueued;
646 
650  void closedNotifyClients();
651 };
652 
653 }
654 }
655 }
656 
657 #endif /* CODEC_H_ */
Read access is allowed but write access is not allowed.
Definition: pvAccess.h:79
size_t start() const
size_t size() const
constexpr __tuple_element_t< __i, tuple< _Elements... > > & get(tuple< _Elements... > &__t) noexcept
runtime_error(const string &__arg) _GLIBCXX_TXN_SAFE
basic_string< char > string