diff --git a/NetworkingLib/CMakeLists.txt b/NetworkingLib/CMakeLists.txt index 3f853f5ba06f43fbdf00599a6115614b9f96ea62..690ffb9cd841f009b44f1b14b00c79a74f814adc 100644 --- a/NetworkingLib/CMakeLists.txt +++ b/NetworkingLib/CMakeLists.txt @@ -40,6 +40,8 @@ set(SOURCE_FILES include/Utils.h include/Closeable.h include/Error.h + include/Frame.h + include/Busyable.h src/Timer.cpp src/Time.cpp src/Networking.cpp @@ -62,6 +64,8 @@ set(PUBLIC_HEADER_FILES include/Utils.h include/Error.h include/Resolver.h + include/Busyable.h + include/Frame.h ${CMAKE_CURRENT_BINARY_DIR}/Config.h) foreach(HEADER ${PUBLIC_HEADER_FILES}) diff --git a/NetworkingLib/include/Busyable.h b/NetworkingLib/include/Busyable.h new file mode 100644 index 0000000000000000000000000000000000000000..043e499e0d55f99216218c0a59af6991b4e4cb75 --- /dev/null +++ b/NetworkingLib/include/Busyable.h @@ -0,0 +1,74 @@ +// +// Created by philipp on 18.01.18. +// + +#ifndef NETWORKINGLIB_BUSYABLE_H +#define NETWORKINGLIB_BUSYABLE_H + + +#include <mutex> +#include <atomic> +#include "Error.h" + +namespace networking +{ + +class Busyable +{ +public: + friend class BusyLock; + + /** + * @return true if busy, false if not busy + * @attention Returning false does not guarantee that a subsequent + * BusyLock instantiation with this object won't throw! + */ + bool isBusy() const noexcept + { return busy; } + +private: + std::mutex busyMutex; + std::atomic<bool> busy{false}; +}; + +class BusyLock +{ +public: + BusyLock(Busyable & busyable) + : busyable(busyable) + { + if (!busyable.busyMutex.try_lock()) + throw error::Busy{}; + + owns = true; + busyable.busy = true; + } + + ~BusyLock() + { + unlock(); + } + + BusyLock(const BusyLock &) = delete; + + BusyLock & operator=(const BusyLock &) = delete; + + void unlock() + { + if (owns) + { + busyable.busy = false; + busyable.busyMutex.unlock(); + owns = false; + } + } + +private: + Busyable & busyable; + std::atomic<bool> owns{false}; +}; + +} + + +#endif //NETWORKINGLIB_BUSYABLE_H diff --git a/NetworkingLib/include/DatagramReceiver.h b/NetworkingLib/include/DatagramReceiver.h index 5dc173d2d12b89ee0a174c0eac952bb79db9c04a..43c79d9da187a2d8494f5ab0763ce828ef6fa1c2 100644 --- a/NetworkingLib/include/DatagramReceiver.h +++ b/NetworkingLib/include/DatagramReceiver.h @@ -16,7 +16,9 @@ namespace message { template<typename Message> -class DatagramReceiver : public std::enable_shared_from_this<DatagramReceiver<Message>> +class DatagramReceiver + : public std::enable_shared_from_this<DatagramReceiver<Message>> + , private Busyable { private: struct PrivateTag @@ -48,30 +50,18 @@ public: , buffer(maxMessageSize) {} - bool isReceiving() const noexcept - { return receiving; }; - Message receive(std::string & host, std::uint16_t & port, const time::Duration & timeout) { - if (receiving) - throw error::Busy{}; - - utils::RAIIObject receivingFlagReset{[this] - { receiving = false; }}; - receiving = true; + BusyLock busyLock{*this}; newSocket(); return message::receiveDatagramFrom<Message>(net, socket, buffer, host, port, timeout); } void asyncReceive(const time::Duration & timeout, const ReceiveHandler & handler) { - if (receiving) - throw error::Busy{}; - auto self = this->shared_from_this(); auto state = std::make_shared<AsyncState>(self, handler); - receiving = true; newSocket(); message::asyncReceiveDatagramFrom<Message>( @@ -81,16 +71,18 @@ public: const std::string & senderHost, std::uint16_t senderPort) { - state->self->receiving = false; + state->busyLock.unlock(); state->handler(error, message, senderHost, senderPort); }); } - void stop() + bool isReceiving() const noexcept { - if (!receiving) - return; + return isBusy(); + }; + void stop() + { closeable::Closer<Socket>::close(socket); } @@ -99,7 +91,6 @@ private: std::uint16_t bindingPort; Socket socket; std::vector<char> buffer; - std::atomic<bool> receiving{false}; void newSocket() { @@ -111,15 +102,14 @@ private: struct AsyncState { AsyncState(Ptr self, const ReceiveHandler & handler) - : self(self) + : busyLock(*self) + , self(self) , handler(handler) - , receivingFlagReset([self] - { self->receiving = false; }) {} + BusyLock busyLock; Ptr self; ReceiveHandler handler; - utils::RAIIObject receivingFlagReset; }; }; diff --git a/NetworkingLib/include/DatagramSender.h b/NetworkingLib/include/DatagramSender.h index 33756267be06f05c726841bb349e27a1e3fe9a70..942e1f35e1203c969ec663969be884e4360fd4cd 100644 --- a/NetworkingLib/include/DatagramSender.h +++ b/NetworkingLib/include/DatagramSender.h @@ -16,7 +16,9 @@ namespace message { template<typename Message> -class DatagramSender : public std::enable_shared_from_this<DatagramSender<Message>> +class DatagramSender + : public std::enable_shared_from_this<DatagramSender<Message>> + , private Busyable { private: struct PrivateTag @@ -26,7 +28,9 @@ private: public: using Ptr = std::shared_ptr<DatagramSender>; - using SendHandler = std::function<void(const error::ErrorCode & error)>; + using SendHandler = std::function<void( const + error::ErrorCode & error + )>; using Udp = boost::asio::ip::udp; using Socket = Udp::socket; @@ -46,13 +50,7 @@ public: std::uint16_t port, const time::Duration & timeout) { - if (sending) - throw error::Busy{}; - - // Use RAII to reset the sending flag when leaving this function. - utils::RAIIObject sendingFlagReset{[this] - { sending = false; }}; - sending = true; + BusyLock busyLock{*this}; openSocket(); networking::message::sendDatagramTo(net, socket, message, ip, port, timeout); } @@ -63,41 +61,32 @@ public: const time::Duration & timeout, const SendHandler & handler) { - if (sending) - throw error::Busy{}; - auto self = this->shared_from_this(); auto state = std::make_shared<AsyncState>(self, handler); - - sending = true; openSocket(); networking::message::asyncSendDatagramTo( net, socket, message, ip, port, timeout, [state](const auto & error) { - state->self->sending = false; + state->busyLock.unlock(); state->handler(error); }); } bool isSending() const noexcept { - return sending; + return isBusy(); } void stop() { - if (!sending) - return; - closeable::Closer<Socket>::close(socket); } private: Networking & net; Socket socket; - std::atomic<bool> sending{false}; void openSocket() { @@ -111,15 +100,14 @@ private: struct AsyncState { AsyncState(Ptr self, const SendHandler & handler) - : self(self) + : busyLock(*self) + , self(self) , handler(handler) - , sendingFlagReset([self] - { self->sending = false; }) {} + BusyLock busyLock; Ptr self; SendHandler handler; - utils::RAIIObject sendingFlagReset; }; }; diff --git a/NetworkingLib/include/Frame.h b/NetworkingLib/include/Frame.h new file mode 100644 index 0000000000000000000000000000000000000000..18bca9d2cca36f146d5ce8257a3833dd96072565 --- /dev/null +++ b/NetworkingLib/include/Frame.h @@ -0,0 +1,58 @@ +// +// Created by philipp on 17.01.18. +// + +#ifndef NETWORKINGLIB_FRAME_H +#define NETWORKINGLIB_FRAME_H + +#include <cstdint> +#include <boost/asio/buffer.hpp> +#include "Utils.h" + +namespace networking +{ +namespace internal +{ + +class Frame +{ +public: + Frame(const std::uint8_t * data, std::uint32_t numDataBytes) + : data(data), numDataBytes(numDataBytes) + { + header[0] = utils::byte<0>(numDataBytes); + header[1] = utils::byte<1>(numDataBytes); + header[2] = utils::byte<2>(numDataBytes); + header[3] = utils::byte<3>(numDataBytes); + } + + Frame(const Frame &) = delete; + + Frame & operator=(const Frame &) = delete; + + Frame(Frame &&) = delete; + + Frame & operator=(Frame &&) = delete; + + auto getBuffers() const + { + return std::vector<boost::asio::const_buffer>{ + boost::asio::buffer((const void *) header, sizeof(header)), + boost::asio::buffer((const void *) data, numDataBytes)}; + } + + std::size_t getSize() const + { + return sizeof(header) + numDataBytes; + } + +private: + std::uint32_t numDataBytes; + std::uint8_t header[4]; + const std::uint8_t * data; +}; + +} +} + +#endif //NETWORKINGLIB_FRAME_H diff --git a/NetworkingLib/include/Message.h b/NetworkingLib/include/Message.h index 2147f9b6a22df1aa4b013ac44c5113ac98632440..2887fd628cfab2dfd07bebc2356b633c3df53f1a 100644 --- a/NetworkingLib/include/Message.h +++ b/NetworkingLib/include/Message.h @@ -54,33 +54,6 @@ struct Decoder<std::string> { return data; } }; -namespace internal -{ - -// Some pre-processing functions before sending the stream over the network. -// Since our message delimiter is '\0' the message itself must not contain such a character. -// Therefore we will replace all occurrences of '\0' with "\0" ("\" is the escape character). -// This implies that we must also change "\" to "\\". - -template<typename Message> -std::string encode(const Message & message) -{ - auto data = Encoder<Message>{}(message); - boost::replace_all(data, "\\", "\\\\"); - boost::replace_all(data, std::string{'\0'}, "\\0"); - return data; -} - -template<typename Message> -Message decode(std::string & data) -{ - boost::replace_all(data, "\\\\", "\\"); - boost::replace_all(data, "\\0", std::string{'\0'}); - return Decoder<Message>{}(data); -} - -} - template<typename Message> void send(Networking & net, boost::asio::ip::tcp::socket & socket, @@ -91,7 +64,7 @@ void send(Networking & net, if (timeout <= 0s) throw error::Aborted(); - networking::stream::write(net, socket, internal::encode(message), timeout); + networking::stream::write(net, socket, Encoder<Message>{}(message), timeout); }; template<typename Message> @@ -101,7 +74,7 @@ void asyncSend(Networking & net, const time::Duration & timeout, SendHandler handler) { - auto data = std::make_shared<std::string>(internal::encode(message)); + auto data = std::make_shared<std::string>(Encoder<Message>{}(message)); networking::stream::asyncWrite( net, socket, *data, timeout, [handler, data](const auto & errorCode) @@ -119,7 +92,7 @@ Message receive(Networking & net, throw error::Aborted(); auto data = networking::stream::read(net, socket, buffer, timeout); - return internal::decode<Message>(data); + return Decoder<Message>{}(data); }; template<typename Message> @@ -133,7 +106,7 @@ void asyncReceive(Networking & net, net, socket, buffer, timeout, [handler](const auto & errorCode, auto & data) { - auto message = internal::decode<Message>(data); + auto message = Decoder<Message>{}(data); handler(errorCode, message); }); }; @@ -146,7 +119,7 @@ void sendDatagramTo(Networking & net, std::uint16_t port, const time::Duration & timeout) { - networking::socket::sendTo(net, socket, internal::encode(message), host, port, timeout); + networking::socket::sendTo(net, socket, Encoder<Message>{}(message), host, port, timeout); } template<typename Message> @@ -158,7 +131,7 @@ void asyncSendDatagramTo(Networking & net, const time::Duration & timeout, SendToHandler handler) { - auto data = std::make_shared<std::string>(internal::encode(message)); + auto data = std::make_shared<std::string>(Encoder<Message>{}(message)); networking::socket::asyncSendTo( net, socket, *data, host, port, timeout, [handler, data](const auto & error) @@ -174,7 +147,7 @@ Message receiveDatagramFrom(Networking & net, const time::Duration & timeout) { auto data = networking::socket::receiveFrom(net, socket, buffer, host, port, timeout); - return internal::decode<Message>(data); + return Decoder<Message>{}(data); } template<typename Message> @@ -188,7 +161,7 @@ void asyncReceiveDatagramFrom(Networking & net, net, socket, buffer, timeout, [handler](auto error, auto & data, const auto & senderHost, auto senderPort) { - auto message = internal::decode<Message>(data); + auto message = Decoder<Message>{}(data); handler(error, message, senderHost, senderPort); }); } diff --git a/NetworkingLib/include/Resolver.h b/NetworkingLib/include/Resolver.h index 34a46f0719f369492354f9c63116e864395e0918..f43c8b675f2b708beffd07ad5a1949497a18f869 100644 --- a/NetworkingLib/include/Resolver.h +++ b/NetworkingLib/include/Resolver.h @@ -10,6 +10,7 @@ #include "Networking.h" #include "Error.h" #include "Utils.h" +#include "Busyable.h" namespace networking { @@ -58,7 +59,9 @@ private: } -class Resolver : public std::enable_shared_from_this<Resolver> +class Resolver + : public std::enable_shared_from_this<Resolver> + , private Busyable { private: struct PrivateTag @@ -97,14 +100,7 @@ public: const std::string & service, const time::Duration & timeout) { - if (resolving) - throw error::Busy{}; - - // Use RAII to reset the resolving flag when leaving this function. - utils::RAIIObject resolvingFlagReset{[this] - { resolving = false; }}; - - resolving = true; + BusyLock busyLock{*this}; resolver.open(); UnderlyingResolver::Query query{host, service}; @@ -112,9 +108,7 @@ public: auto resolveOperation = [this](auto && ... args) { resolver.async_resolve(std::forward<decltype(args)>(args)...); }; - std::tuple< - boost::system::error_code, - UnderlyingResolver::Iterator> result; + std::tuple<boost::system::error_code, UnderlyingResolver::Iterator> result; closeable::timedOperation( result, @@ -134,13 +128,9 @@ public: const time::Duration & timeout, const ResolveHandler & handler) { - if (resolving) - throw error::Busy{}; - auto self = shared_from_this(); auto state = std::make_shared<AsyncState>(self, handler); - resolving = true; resolver.open(); UnderlyingResolver::Query query{host, service}; @@ -155,7 +145,7 @@ public: timeout, [state](const auto & networkingError, const auto & boostError, auto endpointIterator) { - state->self->resolving = false; + state->busyLock.unlock(); state->handler(networkingError, state->self->endpointsFromIterator(endpointIterator)); }, query); @@ -163,21 +153,17 @@ public: void stop() { - if (!resolving) - return; - closeable::Closer<UnderlyingResolver>::close(resolver); } bool isResolving() const noexcept { - return resolving; + return isBusy(); } private: Networking & net; UnderlyingResolver resolver; - std::atomic<bool> resolving; std::vector<Endpoint> endpointsFromIterator(UnderlyingResolver::Iterator iterator) { @@ -196,15 +182,14 @@ private: struct AsyncState { AsyncState(Ptr self, const ResolveHandler & handler) - : self(self) + : busyLock(*self) + , self(self) , handler(handler) - , resolvingFlagReset([self] - { self->resolving = false; }) {} + BusyLock busyLock; Ptr self; ResolveHandler handler; - utils::RAIIObject resolvingFlagReset; }; }; diff --git a/NetworkingLib/include/ServiceClient.h b/NetworkingLib/include/ServiceClient.h index ddc31cee4dbb41ec798ea640a063803d0b15a1a9..8bb8b9538e0931d637eb6da782b9b35c9793a94d 100644 --- a/NetworkingLib/include/ServiceClient.h +++ b/NetworkingLib/include/ServiceClient.h @@ -14,6 +14,7 @@ #include "Networking.h" #include "Utils.h" #include "Error.h" +#include "Busyable.h" namespace networking { @@ -21,7 +22,9 @@ namespace service { template<typename Service> -class Client : public std::enable_shared_from_this<Client<Service>> +class Client + : public std::enable_shared_from_this<Client<Service>> + , private Busyable { private: struct PrivateTag @@ -53,13 +56,7 @@ public: const RequestMessage & request, time::Duration timeout) { - if (calling) - throw error::Busy{}; - - calling = true; - // Reset the calling flag when we leave this function using RAII. - auto callingFlagReset = utils::RAIIObject{[this] - { calling = false; }}; + BusyLock busyLock{*this}; // Close the socket on leaving. closeable::Closer<Socket> socketCloser{socket}; @@ -84,17 +81,12 @@ public: const time::Duration & timeout, const CallHandler & handler) { - if (calling) - throw error::Busy{}; - auto self = this->shared_from_this(); - // Container for our variables which are needed for the subsequent asynchronous calls to connect, receive and send. // When 'state' goes out of scope, it does cleanup. auto state = std::make_shared<AsyncState>( self, handler, timeout, time::now()); - calling = true; newSocket(); // Connect to server. @@ -129,7 +121,7 @@ public: networking::message::asyncReceive<ResponseMessage>( state->self->net, state->self->socket, state->buffer, state->timeout, [state](auto const & error, - auto & response) + auto & response) { state->handler(error, response); }); @@ -139,21 +131,17 @@ public: bool isCalling() const noexcept { - return calling; + return isBusy(); } void stop() { - if (!calling) - return; - closeable::Closer<Socket>::close(socket); } private: networking::Networking & net; Socket socket; - std::atomic<bool> calling{false}; static void updateTimeout(time::Duration & timeout, time::TimePoint & startTime) { @@ -176,20 +164,20 @@ private: const CallHandler & handler, time::Duration timeout, time::TimePoint startTime) - : self(self) + : busyLock(*self) + , self(self) , handler(handler) , timeout(timeout) , startTime(startTime) - , callingFlagReset([self]{ self->calling = false; }) , closer(self->socket) {} + BusyLock busyLock; Ptr self; CallHandler handler; time::Duration timeout; time::TimePoint startTime; boost::asio::streambuf buffer; - utils::RAIIObject callingFlagReset; closeable::Closer<Socket> closer; }; }; diff --git a/NetworkingLib/include/Socket.h b/NetworkingLib/include/Socket.h index 1680fd68eb4eb6e77d4aeea14c3f912c4a89c8f0..c61e01821f180377cb76ada52700836e9580297e 100644 --- a/NetworkingLib/include/Socket.h +++ b/NetworkingLib/include/Socket.h @@ -7,6 +7,7 @@ #include "Stream.h" #include "Resolver.h" +#include "Frame.h" namespace networking { @@ -16,11 +17,7 @@ namespace socket namespace internal { -boost::asio::const_buffers_1 stringToBuffer(const std::string & str); - -bool isLastCharacterNull(const std::vector<char> & buffer, std::size_t numBytes); - -std::string stringFromBuffer(const std::vector<char> & buffer, std::size_t numBytes); +bool stringFromBuffer(std::string & data, std::vector<char> & buffer, std::size_t numBytesTransferred); } @@ -40,7 +37,8 @@ void connect(Networking & net, std::uint16_t port, time::Duration timeout) { - using Resolver = networking::internal::CloseableResolver<boost::asio::ip::tcp>; + using namespace networking::internal; + using Resolver = CloseableResolver<boost::asio::ip::tcp>; auto startTime = time::now(); @@ -51,17 +49,9 @@ void connect(Networking & net, auto resolveOperation = [&resolver](auto && ... args) { resolver.async_resolve(std::forward<decltype(args)>(args)...); }; - std::tuple< - boost::system::error_code, - Resolver::Iterator> resolveResult; - + std::tuple<boost::system::error_code, Resolver::Iterator> resolveResult; closeable::timedOperation( - resolveResult, - net, - resolveOperation, - resolver, - timeout, - query); + resolveResult, net, resolveOperation, resolver, timeout, query); auto endpointIterator = std::get<1>(resolveResult); @@ -72,18 +62,9 @@ void connect(Networking & net, auto connectOperation = [](auto && ... args) { boost::asio::async_connect(std::forward<decltype(args)>(args)...); }; - std::tuple< - boost::system::error_code, - Resolver::Iterator> connectResult; - + std::tuple<boost::system::error_code, Resolver::Iterator> connectResult; closeable::timedOperation( - connectResult, - net, - connectOperation, - socket, - timeout, - socket, - endpointIterator); + connectResult, net, connectOperation, socket, timeout, socket, endpointIterator); } template<typename SocketService> @@ -94,7 +75,8 @@ void asyncConnect(Networking & net, const time::Duration & timeout, const ConnectHandler & handler) { - using Resolver = networking::internal::CloseableResolver<boost::asio::ip::tcp>; + using namespace networking::internal; + using Resolver = CloseableResolver<boost::asio::ip::tcp>; auto startTime = time::now(); @@ -106,10 +88,7 @@ void asyncConnect(Networking & net, { resolver->async_resolve(std::forward<decltype(args)>(args)...); }; closeable::timedAsyncOperation( - net, - resolveOperation, - *resolver, - timeout, + net, resolveOperation, *resolver, timeout, [&net, &socket, host, port, timeout, handler, resolver, startTime] (const auto & networkingError, const auto & boostError, auto endpointIterator) { @@ -127,14 +106,12 @@ void asyncConnect(Networking & net, { boost::asio::async_connect(std::forward<decltype(args)>(args)...); }; closeable::timedAsyncOperation( - net, - connectOperation, - socket, - newTimeout, + net, connectOperation, socket, newTimeout, [handler](const auto & networkingError, const auto & boostError, auto iterator) - { handler(networkingError); }, - socket, - endpointIterator); + { + handler(networkingError); + }, + socket, endpointIterator); }, query); } @@ -142,56 +119,54 @@ void asyncConnect(Networking & net, template<typename DatagramSocket> void sendTo(Networking & net, DatagramSocket & socket, - const std::string & data, + const std::string & sendData, const std::string & host, std::uint16_t port, const time::Duration & timeout) { - boost::asio::ip::udp::endpoint endpoint{boost::asio::ip::address::from_string(host), port}; + using namespace boost::asio::ip; + udp::endpoint endpoint{address::from_string(host), port}; + + using namespace networking::internal; + Frame buffer{(const std::uint8_t *) sendData.c_str(), sendData.size()}; auto asyncOperation = [&socket](auto && ... args) { socket.async_send_to(std::forward<decltype(args)>(args)...); }; - std::tuple< - boost::system::error_code, - std::size_t> result; - + std::tuple<boost::system::error_code, std::size_t> result; closeable::timedOperation( - result, - net, - asyncOperation, - socket, - timeout, - internal::stringToBuffer(data), - endpoint); - - auto bytesTransferred = std::get<1>(result); - if (bytesTransferred < data.size()) + result, net, asyncOperation, socket, timeout, buffer.getBuffers(), endpoint); + + auto numBytesTransferred = std::get<1>(result); + if (numBytesTransferred < buffer.getSize()) throw error::FailedOperation{}; }; template<typename DatagramSocket> void asyncSendTo(Networking & net, DatagramSocket & socket, - const std::string & data, + const std::string & sendData, const std::string & host, std::uint16_t port, const time::Duration & timeout, const SendHandler & handler) { - boost::asio::ip::udp::endpoint endpoint{boost::asio::ip::address::from_string(host), port}; + using namespace boost::asio::ip; + udp::endpoint endpoint{address::from_string(host), port}; + + using namespace networking::internal; + auto buffer = std::make_shared<Frame>((const std::uint8_t *) sendData.c_str(), sendData.size()); auto asyncOperation = [&socket](auto && ... args) { socket.async_send_to(std::forward<decltype(args)>(args)...); }; closeable::timedAsyncOperation( - net, - asyncOperation, - socket, - timeout, - [&data, handler](const auto & networkingError, const auto & boostError, auto bytesTransferred) + net, asyncOperation, socket, timeout, + [handler, buffer](const auto & networkingError, + const auto & boostError, + auto numBytesTransferred) { - if (bytesTransferred < data.size()) + if (numBytesTransferred < buffer->getSize()) { handler(error::codes::FAILED_OPERATION); return; @@ -199,8 +174,7 @@ void asyncSendTo(Networking & net, handler(networkingError); }, - internal::stringToBuffer(data), - endpoint); + buffer->getBuffers(), endpoint); }; template<typename DatagramSocket> @@ -211,32 +185,24 @@ std::string receiveFrom(Networking & net, std::uint16_t & senderPort, const time::Duration & timeout) { - boost::asio::ip::udp::endpoint senderEndpoint; + using namespace boost::asio::ip; + udp::endpoint senderEndpoint; auto asyncOperation = [&socket](auto && ... args) { socket.async_receive_from(std::forward<decltype(args)>(args)...); }; - std::tuple< - boost::system::error_code, - std::size_t> result; - + std::tuple<boost::system::error_code, std::size_t> result; closeable::timedOperation( - result, - net, - asyncOperation, - socket, - timeout, - boost::asio::buffer(buffer), - senderEndpoint); + result, net, asyncOperation, socket, timeout, boost::asio::buffer(buffer), senderEndpoint); senderHost = senderEndpoint.address().to_string(); senderPort = senderEndpoint.port(); - auto bytesTransferred = std::get<1>(result); - if (!internal::isLastCharacterNull(buffer, bytesTransferred)) + std::string receiveData{}; + auto numBytesTransferred = std::get<1>(result); + if (!internal::stringFromBuffer(receiveData, buffer, numBytesTransferred)) throw error::FailedOperation{}; - - return internal::stringFromBuffer(buffer, bytesTransferred); + return receiveData; } template<typename DatagramSocket> @@ -246,31 +212,27 @@ void asyncReceiveFrom(Networking & net, const time::Duration & timeout, const ReceiveHandler & handler) { - auto senderEndpoint = std::make_shared<boost::asio::ip::udp::endpoint>(); + using namespace boost::asio::ip; + auto senderEndpoint = std::make_shared<udp::endpoint>(); auto asyncOperation = [&socket](auto && ... args) { socket.async_receive_from(std::forward<decltype(args)>(args)...); }; closeable::timedAsyncOperation( - net, - asyncOperation, - socket, - timeout, - [&buffer, handler, senderEndpoint](const auto & networkingError, const auto & boostError, auto bytesTransferred) + net, asyncOperation, socket, timeout, + [&buffer, handler, senderEndpoint](const auto & networkingError, const auto & boostError, auto numBytesTransferred) { - if (!internal::isLastCharacterNull(buffer, bytesTransferred)) + std::string receiveData{}; + auto senderHost = senderEndpoint->address().to_string(); + auto senderPort = senderEndpoint->port(); + + if (!internal::stringFromBuffer(receiveData, buffer, numBytesTransferred)) { - std::string data{}; - std::string senderHost{}; - std::uint16_t senderPort{}; - handler(error::codes::FAILED_OPERATION, data, senderHost, senderPort); + handler(error::codes::FAILED_OPERATION, receiveData, senderHost, senderPort); return; } - auto senderHost = senderEndpoint->address().to_string(); - auto senderPort = senderEndpoint->port(); - auto data = internal::stringFromBuffer(buffer, bytesTransferred); - handler(networkingError, data, senderHost, senderPort); + handler(networkingError, receiveData, senderHost, senderPort); }, boost::asio::buffer(buffer), *senderEndpoint); diff --git a/NetworkingLib/include/Stream.h b/NetworkingLib/include/Stream.h index 4e7026435b032fa08ddd28025254ad966e1ce0d4..bf8a61df8a286cbf62154498bd7a1e428c72efbc 100644 --- a/NetworkingLib/include/Stream.h +++ b/NetworkingLib/include/Stream.h @@ -11,6 +11,8 @@ #include "Timer.h" #include "Error.h" #include "Closeable.h" +#include "Frame.h" +#include "Utils.h" namespace networking { @@ -20,10 +22,13 @@ namespace stream namespace internal { -boost::asio::const_buffers_1 stringToBuffer(const std::string & str); - std::string stringFromStreambuf(boost::asio::streambuf & streambuf, std::size_t numBytes); +void numDataBytesFromBuffer(boost::asio::streambuf & streambuf, + std::size_t numBytesTransferred, + std::uint32_t & numDataBytes, + std::uint32_t & numRemainingBytes); + } using WriteHandler = std::function<void(const error::ErrorCode & error)>; @@ -33,48 +38,41 @@ using ReadHandler = std::function<void(const error::ErrorCode & error, std::stri template<typename SyncWriteStream> void write(Networking & net, SyncWriteStream & stream, - const std::string & data, + const std::string & writeData, const time::Duration & timeout) { + using namespace networking::internal; + Frame buffer{(const std::uint8_t *) writeData.c_str(), writeData.size()}; + auto asyncOperation = [](auto && ... args) { boost::asio::async_write(std::forward<decltype(args)>(args)...); }; - std::tuple< - boost::system::error_code, - std::size_t> result; + std::tuple<boost::system::error_code, std::size_t> result; + closeable::timedOperation(result, net, asyncOperation, stream, timeout, stream, buffer.getBuffers()); - closeable::timedOperation( - result, - net, - asyncOperation, - stream, - timeout, - stream, - internal::stringToBuffer(data)); - - auto bytesTransferred = std::get<1>(result); - if (bytesTransferred < data.size()) + auto numBytesTransferred = std::get<1>(result); + if (numBytesTransferred < buffer.getSize()) throw error::FailedOperation{}; }; template<typename SyncWriteStream> void asyncWrite(Networking & net, SyncWriteStream & stream, - const std::string & data, + const std::string & writeData, const time::Duration & timeout, const WriteHandler & handler) { + using namespace networking::internal; + auto buffer = std::make_shared<Frame>((const std::uint8_t *) writeData.c_str(), writeData.size()); + auto asyncOperation = [](auto && ... args) { boost::asio::async_write(std::forward<decltype(args)>(args)...); }; closeable::timedAsyncOperation( - net, - asyncOperation, - stream, - timeout, - [&data, handler](const auto & networkingError, const auto & boostError, auto bytesTransferred) + net, asyncOperation, stream, timeout, + [handler, buffer](const auto & networkingError, const auto & boostError, auto numBytesTransferred) { - if (bytesTransferred < data.size()) + if (numBytesTransferred < buffer->getSize()) { handler(error::codes::FAILED_OPERATION); return; @@ -82,35 +80,41 @@ void asyncWrite(Networking & net, handler(networkingError); }, - stream, - internal::stringToBuffer(data)); + stream, buffer->getBuffers()); } template<typename SyncReadStream> std::string read(Networking & net, - SyncReadStream & stream, - boost::asio::streambuf & buffer, - const time::Duration & timeout) + SyncReadStream & stream, + boost::asio::streambuf & buffer, + time::Duration timeout) { + using namespace internal; + + auto startTime = time::now(); + auto asyncOperation = [](auto && ... args) - { boost::asio::async_read_until(std::forward<decltype(args)>(args)...); }; + { boost::asio::async_read(std::forward<decltype(args)>(args)...); }; + + std::tuple<boost::system::error_code, std::size_t> result; + + // We receive 4 bytes first, giving us the number of data bytes. + closeable::timedOperation( + result, net, asyncOperation, stream, timeout, stream, buffer, boost::asio::transfer_at_least(4)); + + auto numBytesTransferred = std::get<1>(result); + std::uint32_t numDataBytes, numRemainingBytes; + numDataBytesFromBuffer(buffer, numBytesTransferred, numDataBytes, numRemainingBytes); + if (numRemainingBytes == 0) + return stringFromStreambuf(buffer, numDataBytes); - std::tuple< - boost::system::error_code, - std::size_t> result; + timeout -= time::now() - startTime; + // Receive the actual data. closeable::timedOperation( - result, - net, - asyncOperation, - stream, - timeout, - stream, - buffer, - '\0'); - - auto bytesTransferred = std::get<1>(result); - return internal::stringFromStreambuf(buffer, bytesTransferred); + result, net, asyncOperation, stream, timeout, stream, buffer, boost::asio::transfer_at_least(numRemainingBytes)); + + return stringFromStreambuf(buffer, numDataBytes); }; template<typename SyncReadStream> @@ -120,22 +124,47 @@ void asyncRead(Networking & net, const time::Duration & timeout, const ReadHandler & handler) { + using namespace internal; + + auto startTime = time::now(); + auto asyncOperation = [](auto && ... args) - { boost::asio::async_read_until(std::forward<decltype(args)>(args)...); }; + { boost::asio::async_read(std::forward<decltype(args)>(args)...); }; closeable::timedAsyncOperation( - net, - asyncOperation, - stream, - timeout, - [&buffer, handler](const auto & networkingError, const auto & boostError, auto bytesTransferred) + net, asyncOperation, stream, timeout, + [&net, &stream, &buffer, timeout, handler, startTime, asyncOperation] + (const auto & networkingError, const auto & boostError, auto numBytesTransferred) { - auto data = internal::stringFromStreambuf(buffer, bytesTransferred); - handler(networkingError, data); + if (networkingError) + { + std::string noData{}; + handler(networkingError, noData); + return; + } + + std::uint32_t numDataBytes, numRemainingBytes; + numDataBytesFromBuffer(buffer, numBytesTransferred, numDataBytes, numRemainingBytes); + if (numRemainingBytes == 0) + { + auto receiveData = stringFromStreambuf(buffer, numDataBytes); + handler(networkingError, receiveData); + return; + } + + auto timeSpend = time::now() - startTime; + auto newTimeout = timeout - timeSpend; + + closeable::timedAsyncOperation( + net, asyncOperation, stream, newTimeout, + [&buffer, handler, numDataBytes](const auto & networkingError, const auto & boostError, auto numBytesTransferred) + { + auto receiveData = stringFromStreambuf(buffer, numDataBytes); + handler(networkingError, receiveData); + }, + stream, buffer, boost::asio::transfer_at_least(numRemainingBytes)); }, - stream, - buffer, - '\0'); + stream, buffer, boost::asio::transfer_at_least(4)); }; } diff --git a/NetworkingLib/include/Utils.h b/NetworkingLib/include/Utils.h index 3f492ce565be8d07326c7d2f9cc74f6ddf69df6d..b4bb54cd351c20c69ca6f035b31a2a945e7055b3 100644 --- a/NetworkingLib/include/Utils.h +++ b/NetworkingLib/include/Utils.h @@ -12,6 +12,19 @@ namespace networking namespace utils { +template<std::size_t index, typename Int> +inline std::uint8_t byte(Int var) +{ return (std::uint8_t) ((var >> 8*index) & 0x000000ff); } + +template<std::size_t numBytes, typename Int> +inline Int bytesToInt(const std::uint8_t * bytes) +{ + Int result = 0; + for (std::size_t i = 0; i < numBytes; i++) + result += ((Int) bytes[i]) << (i*8); + return result; +} + // Generic object which calls a callback-function on destruction. class RAIIObject { @@ -23,7 +36,12 @@ public: {} ~RAIIObject() - { onDestructionCallback(); } + { + try + { onDestructionCallback(); } + catch (...) + {} + } RAIIObject(const RAIIObject & other) = delete; diff --git a/NetworkingLib/src/Socket.cpp b/NetworkingLib/src/Socket.cpp index 64713fd2bb40fea686ea65306fc48dee2509668f..20a21e9614255542d422e29441efcccf3eff5300 100644 --- a/NetworkingLib/src/Socket.cpp +++ b/NetworkingLib/src/Socket.cpp @@ -11,23 +11,18 @@ namespace socket namespace internal { -boost::asio::const_buffers_1 stringToBuffer(const std::string & str) +bool stringFromBuffer(std::string & data, std::vector<char> & buffer, std::size_t bytesTransferred) { - // Adds '\0' to the buffer. - return boost::asio::buffer(str.c_str(), str.size() + 1); -} - -bool isLastCharacterNull(const std::vector<char> & buffer, std::size_t numBytes) -{ - if (numBytes == 0) + if (bytesTransferred < 4) return false; - return buffer[numBytes - 1] == '\0'; -} + auto numDataBytes = utils::bytesToInt<4, std::uint32_t>((const std::uint8_t *) buffer.data()); + if (bytesTransferred < 4 + numDataBytes) + return false; -std::string stringFromBuffer(const std::vector<char> & buffer, std::size_t numBytes) -{ - return std::string{buffer.begin(), buffer.begin() + numBytes - sizeof('\n')}; + data = std::string{buffer.begin() + 4, + buffer.begin() + 4 + numDataBytes}; + return true; } } diff --git a/NetworkingLib/src/Stream.cpp b/NetworkingLib/src/Stream.cpp index ffbad3344307b4d280040e692964990d4c074855..01b769bfe7ec1fafdc520cfc86f1c5e0ef569782 100644 --- a/NetworkingLib/src/Stream.cpp +++ b/NetworkingLib/src/Stream.cpp @@ -11,22 +11,26 @@ namespace stream namespace internal { -boost::asio::const_buffers_1 stringToBuffer(const std::string & str) +std::string stringFromStreambuf(boost::asio::streambuf & streambuf, std::size_t numBytes) { - // Adds '\0' to the buffer. - return boost::asio::buffer(str.c_str(), str.size() + 1); + auto buffers = streambuf.data(); + std::string data{boost::asio::buffers_begin(buffers), + boost::asio::buffers_begin(buffers) + numBytes}; + streambuf.consume(numBytes); + return data; } -std::string stringFromStreambuf(boost::asio::streambuf & streambuf, std::size_t numBytes) +void numDataBytesFromBuffer(boost::asio::streambuf & streambuf, + std::size_t numBytesTransferred, + std::uint32_t & numDataBytes, + std::uint32_t & numRemainingBytes) { - using boost::asio::buffers_begin; - if (numBytes <= 1) - return std::string{""}; - auto buffer = streambuf.data(); - auto result = std::string{buffers_begin(buffer), - buffers_begin(buffer) + numBytes - sizeof('\0')}; - streambuf.consume(numBytes); - return result; + auto numDataBytesStr = stringFromStreambuf(streambuf, 4); + numDataBytes = utils::bytesToInt<4, std::uint32_t>((const std::uint8_t *) numDataBytesStr.c_str()); + + numRemainingBytes = 0; + if (numDataBytes > numBytesTransferred - 4) + numRemainingBytes = numDataBytes - numBytesTransferred - 4; } }