pvAccessCPP 7.1.8
Loading...
Searching...
No Matches
codec.h
1
7#ifndef CODEC_H_
8#define CODEC_H_
9
10#include <set>
11#include <map>
12#include <deque>
13
14#include <shareLib.h>
15#include <osiSock.h>
16#include <epicsTime.h>
17#include <epicsThread.h>
18#include <epicsVersion.h>
19#include <epicsAtomic.h>
20
21#include <pv/byteBuffer.h>
22#include <pv/pvType.h>
23#include <pv/lock.h>
24#include <pv/timer.h>
25#include <pv/event.h>
26#include <pv/likely.h>
27
28#include <pv/pvaConstants.h>
29#include <pv/remote.h>
30#include <pv/security.h>
31#include <pv/transportRegistry.h>
32#include <pv/introspectionRegistry.h>
33#include <pv/inetAddressUtil.h>
34
35/* C++11 keywords
36 @code
37 struct Base {
38 virtual void foo();
39 };
40 struct Class : public Base {
41 virtual void foo() OVERRIDE FINAL FINAL;
42 };
43 @endcode
44 */
45#ifndef FINAL
46# if __cplusplus>=201103L
47# define FINAL final
48# else
49# define FINAL
50# endif
51#endif
52#ifndef OVERRIDE
53# if __cplusplus>=201103L
54# define OVERRIDE override
55# else
56# define OVERRIDE
57# endif
58#endif
59
60
61namespace epics {
62namespace pvAccess {
63
64class ServerChannel;
65
66namespace detail {
67
68template<typename T>
69class AtomicValue
70{
71 T val;
72public:
73 AtomicValue() :val(0) {}
74 inline T getAndSet(T newval)
75 {
76 int oldval;
77 // epicsAtomic doesn't have unconditional swap
78 do {
79 oldval = epics::atomic::get(val);
80 } while(epics::atomic::compareAndSwap(val, oldval, newval)!=oldval);
81 return oldval;
82 }
83 inline T get() {
84 return epics::atomic::get(val);
85 }
86};
87// treat bool as int
88template<>
89class AtomicValue<bool>
90{
91 AtomicValue<int> realval;
92public:
93 inline bool getAndSet(bool newval)
94 {
95 return this->realval.getAndSet(newval?1:0)!=0;
96 }
97 inline bool get() {
98 return !!this->realval.get();
99 }
100};
101
102
103class io_exception: public std::runtime_error {
104public:
105 explicit io_exception(const std::string &s): std::runtime_error(s) {}
106};
107
108
109class invalid_data_stream_exception: public std::runtime_error {
110public:
111 explicit invalid_data_stream_exception(
112 const std::string &s): std::runtime_error(s) {}
113};
114
115
116class connection_closed_exception: public std::runtime_error {
117public:
118 explicit connection_closed_exception(const std::string &s): std::runtime_error(s) {}
119};
120
121
122enum ReadMode { NORMAL, SPLIT, SEGMENTED };
123
124enum WriteMode { PROCESS_SEND_QUEUE, WAIT_FOR_READY_SIGNAL };
125
126
127class epicsShareClass AbstractCodec :
128 public TransportSendControl,
129 public Transport
130{
131public:
132
133 static const std::size_t MAX_MESSAGE_PROCESS;
134 static const std::size_t MAX_MESSAGE_SEND;
135 static const std::size_t MAX_ENSURE_SIZE;
136 static const std::size_t MAX_ENSURE_DATA_SIZE;
137 static const std::size_t MAX_ENSURE_BUFFER_SIZE;
138 static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE;
139
140 AbstractCodec(
141 bool serverFlag,
142 size_t sendBufferSize,
143 size_t receiveBufferSize,
144 int32_t socketSendBufferSize,
145 bool blockingProcessQueue);
146
147 virtual void processControlMessage() = 0;
148 virtual void processApplicationMessage() = 0;
149 virtual const osiSockAddr* getLastReadBufferSocketAddress() = 0;
150 virtual void invalidDataStreamHandler() = 0;
151 virtual void readPollOne()=0;
152 virtual void writePollOne() = 0;
153 virtual void scheduleSend() = 0;
154 virtual void sendCompleted() = 0;
155 virtual bool terminated() = 0;
156 virtual int write(epics::pvData::ByteBuffer* src) = 0;
157 virtual int read(epics::pvData::ByteBuffer* dst) = 0;
158 virtual bool isOpen() = 0;
159
160
161 virtual ~AbstractCodec()
162 {
163 }
164
165 virtual void ensureData(std::size_t size) OVERRIDE FINAL;
166 virtual void startMessage(
167 epics::pvData::int8 command,
168 std::size_t ensureCapacity = 0,
169 epics::pvData::int32 payloadSize = 0) OVERRIDE FINAL;
170 void putControlMessage(
171 epics::pvData::int8 command,
172 epics::pvData::int32 data);
173 virtual void endMessage() OVERRIDE FINAL;
174 virtual void ensureBuffer(std::size_t size) OVERRIDE FINAL;
175 virtual void flushSerializeBuffer() OVERRIDE FINAL;
176 virtual void flush(bool lastMessageCompleted) OVERRIDE FINAL;
177 void processWrite();
178 void processRead();
179 void processSendQueue();
180 virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender) OVERRIDE FINAL;
181 void enqueueSendRequest(TransportSender::shared_pointer const & sender,
182 std::size_t requiredBufferSize);
183 void setSenderThread();
184 virtual void setRecipient(osiSockAddr const & sendTo) OVERRIDE FINAL;
185 virtual void setByteOrder(int byteOrder) OVERRIDE FINAL;
186
187 static std::size_t alignedValue(std::size_t value, std::size_t alignment);
188
189 virtual bool directSerialize(
190 epics::pvData::ByteBuffer * /*existingBuffer*/,
191 const char* /*toSerialize*/,
192 std::size_t /*elementCount*/, std::size_t /*elementSize*/) OVERRIDE;
193
194
195 virtual bool directDeserialize(epics::pvData::ByteBuffer * /*existingBuffer*/,
196 char* /*deserializeTo*/,
197 std::size_t /*elementCount*/, std::size_t /*elementSize*/) OVERRIDE;
198
199 bool sendQueueEmpty() const {
200 return _sendQueue.empty();
201 }
202
203 epics::pvData::int8 getRevision() const {
204 epicsGuard<epicsMutex> G(_mutex);
205 int8_t myver = _clientServerFlag ? PVA_SERVER_PROTOCOL_REVISION : PVA_CLIENT_PROTOCOL_REVISION;
206 return myver < _version ? myver : _version;
207 }
208
209protected:
210
211 virtual void sendBufferFull(int tries) = 0;
212 void send(epics::pvData::ByteBuffer *buffer);
213 void flushSendBuffer();
214
215 virtual void setRxTimeout(bool ena) {}
216
217 ReadMode _readMode;
218 int8_t _version;
219 int8_t _flags;
220 int8_t _command;
221 int32_t _payloadSize; // TODO why not size_t?
222 epics::pvData::int32 _remoteTransportSocketReceiveBufferSize;
223 //TODO initialize union
224 osiSockAddr _sendTo;
225 epicsThreadId _senderThread;
226 WriteMode _writeMode;
227 bool _writeOpReady;
228
229 epics::pvData::ByteBuffer _socketBuffer;
230 epics::pvData::ByteBuffer _sendBuffer;
231
232 fair_queue<TransportSender> _sendQueue;
233
234private:
235
236 void processHeader();
237 void processReadNormal();
238 void postProcessApplicationMessage();
239 void processReadSegmented();
240 bool readToBuffer(std::size_t requiredBytes, bool persistent);
241 void endMessage(bool hasMoreSegments);
242 void processSender(
243 epics::pvAccess::TransportSender::shared_pointer const & sender);
244
245 std::size_t _storedPayloadSize;
246 std::size_t _storedPosition;
247 std::size_t _storedLimit;
248 std::size_t _startPosition;
249
250 const std::size_t _maxSendPayloadSize;
251 std::size_t _lastMessageStartPosition;
252 std::size_t _lastSegmentedMessageType;
253 int8_t _lastSegmentedMessageCommand;
254 std::size_t _nextMessagePayloadOffset;
255
256 epics::pvData::int8 _byteOrderFlag;
257protected:
258 const epics::pvData::int8 _clientServerFlag;
259private:
260
261public:
262 mutable epics::pvData::Mutex _mutex;
263};
264
265
266class BlockingTCPTransportCodec:
267 public AbstractCodec,
268 public AuthenticationPluginControl,
269 public std::tr1::enable_shared_from_this<BlockingTCPTransportCodec>
270{
271
272public:
273
274 POINTER_DEFINITIONS(BlockingTCPTransportCodec);
275
276 static size_t num_instances;
277
278 BlockingTCPTransportCodec(
279 bool serverFlag,
280 Context::shared_pointer const & context,
281 SOCKET channel,
282 ResponseHandler::shared_pointer const & responseHandler,
283 size_t sendBufferSize,
284 size_t receiveBufferSize,
285 epics::pvData::int16 priority);
286 virtual ~BlockingTCPTransportCodec();
287
288 virtual void readPollOne() OVERRIDE FINAL;
289 virtual void writePollOne() OVERRIDE FINAL;
290 virtual void scheduleSend() OVERRIDE FINAL {}
291 virtual void sendCompleted() OVERRIDE FINAL {}
292 virtual void close() OVERRIDE FINAL;
293 virtual void waitJoin() OVERRIDE FINAL;
294 virtual bool terminated() OVERRIDE FINAL;
295 virtual bool isOpen() OVERRIDE FINAL;
296 virtual void start();
297
298 virtual int read(epics::pvData::ByteBuffer* dst) OVERRIDE FINAL;
299 virtual int write(epics::pvData::ByteBuffer* src) OVERRIDE FINAL;
300 virtual const osiSockAddr* getLastReadBufferSocketAddress() OVERRIDE FINAL {
301 return &_socketAddress;
302 }
303 virtual void invalidDataStreamHandler() OVERRIDE FINAL;
304
305 virtual std::string getType() const OVERRIDE FINAL {
306 return std::string("tcp");
307 }
308
309 virtual void processControlMessage() OVERRIDE FINAL {
310 if (_command == CMD_SET_ENDIANESS)
311 {
312 // check 7-th bit
313 setByteOrder(_flags < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE);
314 }
315 }
316
317
318 virtual void processApplicationMessage() OVERRIDE FINAL {
319 _responseHandler->handleResponse(&_socketAddress, shared_from_this(),
320 _version, _command, _payloadSize, &_socketBuffer);
321 }
322
323
324 virtual const osiSockAddr& getRemoteAddress() const OVERRIDE FINAL {
325 return _socketAddress;
326 }
327
328 virtual const std::string& getRemoteName() const OVERRIDE FINAL {
329 return _socketName;
330 }
331
332
333 virtual std::size_t getReceiveBufferSize() const OVERRIDE FINAL {
334 return _socketBuffer.getSize();
335 }
336
337
338 virtual epics::pvData::int16 getPriority() const OVERRIDE FINAL {
339 return _priority;
340 }
341
342
343 virtual void setRemoteTransportReceiveBufferSize(
344 std::size_t remoteTransportReceiveBufferSize) OVERRIDE FINAL {
345 _remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize;
346 }
347
348
349 virtual void setRemoteTransportSocketReceiveBufferSize(
350 std::size_t socketReceiveBufferSize) OVERRIDE FINAL {
351 _remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize;
352 }
353
354
355 std::tr1::shared_ptr<const epics::pvData::Field>
356 virtual cachedDeserialize(epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
357 {
358 return _incomingIR.deserialize(buffer, this);
359 }
360
361
362 virtual void cachedSerialize(
363 const std::tr1::shared_ptr<const epics::pvData::Field>& field,
364 epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
365 {
366 _outgoingIR.serialize(field, buffer, this);
367 }
368
369
370 virtual void flushSendQueue() OVERRIDE FINAL { }
371
372
373 virtual bool isClosed() OVERRIDE FINAL {
374 return !isOpen();
375 }
376
377
378 void activate() {
379 Transport::shared_pointer thisSharedPtr = shared_from_this();
380 _context->getTransportRegistry()->install(thisSharedPtr);
381
382 start();
383 }
384
385 virtual bool verify(epics::pvData::int32 timeoutMs) OVERRIDE;
386
387 virtual void verified(epics::pvData::Status const & status) OVERRIDE;
388
389 virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer const & data) OVERRIDE FINAL;
390
391 virtual void sendSecurityPluginMessage(epics::pvData::PVStructure::const_shared_pointer const & data) OVERRIDE FINAL;
392
393private:
394 void receiveThread();
395 void sendThread();
396
397protected:
398 virtual void setRxTimeout(bool ena) OVERRIDE FINAL;
399
400 virtual void sendBufferFull(int tries) OVERRIDE FINAL;
401
406 virtual void internalClose();
407
408private:
409 AtomicValue<bool> _isOpen;
410 epics::pvData::Thread _readThread, _sendThread;
411 const SOCKET _channel;
412protected:
413 osiSockAddr _socketAddress;
414 std::string _socketName;
415protected:
416 Context::shared_pointer _context;
417
418 IntrospectionRegistry _incomingIR;
419 IntrospectionRegistry _outgoingIR;
420
421 // active authentication exchange, if any
422 std::string _authSessionName;
423 AuthenticationSession::shared_pointer _authSession;
424public:
425 // final info, after authentication complete.
426 PeerInfo::const_shared_pointer _peerInfo;
427
428private:
429
430 ResponseHandler::shared_pointer _responseHandler;
431 size_t _remoteTransportReceiveBufferSize;
432 epics::pvData::int16 _priority;
433
434protected:
435 bool _verified;
436 epics::pvData::Event _verifiedEvent;
437};
438
439class BlockingServerTCPTransportCodec :
440 public BlockingTCPTransportCodec,
441 public TransportSender {
442
443public:
444 POINTER_DEFINITIONS(BlockingServerTCPTransportCodec);
445
446protected:
447 BlockingServerTCPTransportCodec(
448 Context::shared_pointer const & context,
449 SOCKET channel,
450 ResponseHandler::shared_pointer const & responseHandler,
451 int32_t sendBufferSize,
452 int32_t receiveBufferSize );
453
454public:
455 static shared_pointer create(
456 Context::shared_pointer const & context,
457 SOCKET channel,
458 ResponseHandler::shared_pointer const & responseHandler,
459 int sendBufferSize,
460 int receiveBufferSize)
461 {
462 shared_pointer thisPointer(
463 new BlockingServerTCPTransportCodec(
464 context, channel, responseHandler,
465 sendBufferSize, receiveBufferSize)
466 );
467 thisPointer->activate();
468 return thisPointer;
469 }
470
471public:
472
473 virtual bool acquire(std::tr1::shared_ptr<ClientChannelImpl> const & /*client*/) OVERRIDE FINAL
474 {
475 return false;
476 }
477
478 virtual void release(pvAccessID /*clientId*/) OVERRIDE FINAL {}
479
480 pvAccessID preallocateChannelSID();
481
482 void depreallocateChannelSID(pvAccessID /*sid*/) {}
483
484 void registerChannel(
485 pvAccessID sid,
486 std::tr1::shared_ptr<ServerChannel> const & channel);
487
488 void unregisterChannel(pvAccessID sid);
489
490 std::tr1::shared_ptr<ServerChannel> getChannel(pvAccessID sid);
491
492 void getChannels(std::vector<std::tr1::shared_ptr<ServerChannel> >& channels) const;
493
494 size_t getChannelCount() const;
495
496 virtual bool verify(epics::pvData::int32 timeoutMs) OVERRIDE FINAL {
497
498 TransportSender::shared_pointer transportSender =
499 std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
500 enqueueSendRequest(transportSender);
501
502 bool verifiedStatus = BlockingTCPTransportCodec::verify(timeoutMs);
503
504 enqueueSendRequest(transportSender);
505
506 return verifiedStatus;
507 }
508
509 virtual void verified(epics::pvData::Status const & status) OVERRIDE FINAL {
510 {
511 epicsGuard<epicsMutex> G(_mutex);
512 _verificationStatus = status;
513 }
514 BlockingTCPTransportCodec::verified(status);
515 }
516
517 void authNZInitialize(const std::string& securityPluginName,
518 const epics::pvData::PVStructure::shared_pointer& data);
519
520 virtual void authenticationCompleted(epics::pvData::Status const & status,
521 const std::tr1::shared_ptr<PeerInfo>& peer) OVERRIDE FINAL;
522
523 virtual void send(epics::pvData::ByteBuffer* buffer,
524 TransportSendControl* control) OVERRIDE FINAL;
525
526 virtual ~BlockingServerTCPTransportCodec() OVERRIDE FINAL;
527
528protected:
529
530 void destroyAllChannels();
531 virtual void internalClose() OVERRIDE FINAL;
532
533private:
534
538 pvAccessID _lastChannelSID;
539
540 typedef std::map<pvAccessID, std::tr1::shared_ptr<ServerChannel> > _channels_t;
544 _channels_t _channels;
545
546 mutable epics::pvData::Mutex _channelsMutex;
547
548 epics::pvData::Status _verificationStatus;
549
550 bool _verifyOrVerified;
551
552 std::vector<std::string> advertisedAuthPlugins;
553
554};
555
556class BlockingClientTCPTransportCodec :
557 public BlockingTCPTransportCodec,
558 public TransportSender,
559 public epics::pvData::TimerCallback {
560
561public:
562 POINTER_DEFINITIONS(BlockingClientTCPTransportCodec);
563
564protected:
565 BlockingClientTCPTransportCodec(
566 Context::shared_pointer const & context,
567 SOCKET channel,
568 ResponseHandler::shared_pointer const & responseHandler,
569 int32_t sendBufferSize,
570 int32_t receiveBufferSize,
571 std::tr1::shared_ptr<ClientChannelImpl> const & client,
572 epics::pvData::int8 remoteTransportRevision,
573 float heartbeatInterval,
574 int16_t priority);
575
576public:
577 static shared_pointer create(
578 Context::shared_pointer const & context,
579 SOCKET channel,
580 ResponseHandler::shared_pointer const & responseHandler,
581 int32_t sendBufferSize,
582 int32_t receiveBufferSize,
583 std::tr1::shared_ptr<ClientChannelImpl> const & client,
584 int8_t remoteTransportRevision,
585 float heartbeatInterval,
586 int16_t priority )
587 {
588 shared_pointer thisPointer(
589 new BlockingClientTCPTransportCodec(
590 context, channel, responseHandler,
591 sendBufferSize, receiveBufferSize,
592 client, remoteTransportRevision,
593 heartbeatInterval, priority)
594 );
595 thisPointer->activate();
596 return thisPointer;
597 }
598
599public:
600
601 virtual void start() OVERRIDE FINAL;
602
603 virtual ~BlockingClientTCPTransportCodec() OVERRIDE FINAL;
604
605 virtual void timerStopped() OVERRIDE FINAL {
606 // noop
607 }
608
609 virtual void callback() OVERRIDE FINAL;
610
611 virtual bool acquire(std::tr1::shared_ptr<ClientChannelImpl> const & client) OVERRIDE FINAL;
612
613 virtual void release(pvAccessID clientId) OVERRIDE FINAL;
614
615 virtual void send(epics::pvData::ByteBuffer* buffer,
616 TransportSendControl* control) OVERRIDE FINAL;
617
618 void authNZInitialize(const std::vector<std::string>& offeredSecurityPlugins);
619
620 virtual void authenticationCompleted(epics::pvData::Status const & status,
621 const std::tr1::shared_ptr<PeerInfo>& peer) OVERRIDE FINAL;
622
623 virtual void verified(epics::pvData::Status const & status) OVERRIDE FINAL;
624protected:
625
626 virtual void internalClose() OVERRIDE FINAL;
627
628private:
629
633 // TODO consider using TR1 hash map
634 typedef std::map<pvAccessID, std::tr1::weak_ptr<ClientChannelImpl> > TransportClientMap_t;
635 TransportClientMap_t _owners;
636
640 const double _connectionTimeout;
641
642 bool _verifyOrEcho;
643
644 // are we queued to send verify or echo?
645 bool sendQueued;
646
650 void closedNotifyClients();
651};
652
653}
654}
655}
656
657#endif /* CODEC_H_ */
valarray< size_t > size() const
runtime_error(const string &__arg) _GLIBCXX_TXN_SAFE
epics::pvData::FieldConstPtr deserialize(epics::pvData::ByteBuffer *buffer, epics::pvData::DeserializableControl *control)
Deserializes introspection interface.
void serialize(epics::pvData::FieldConstPtr const &field, epics::pvData::ByteBuffer *buffer, epics::pvData::SerializableControl *control)
Serializes introspection interface.
std::size_t getSize() const
@ read
Read access is allowed but write access is not allowed.
Definition pvAccess.h:79
Copyright - See the COPYRIGHT that is included with this distribution.