From 1885bf69330a78606ab578846835d69c3513616a Mon Sep 17 00:00:00 2001 From: Hoop77 <p.badenhoop@gmx.de> Date: Mon, 15 Jan 2018 13:28:49 +0100 Subject: [PATCH] MAJOR CHANGE - messages can now be of variable size --- NetworkingLib/CMakeLists.txt | 13 +- NetworkingLib/include/Closeable.h | 19 +- NetworkingLib/include/DatagramReceiver.h | 100 +++++---- NetworkingLib/include/DatagramSender.h | 52 ++--- NetworkingLib/include/Message.h | 109 +++++++--- NetworkingLib/include/Resolver.h | 102 +++++----- NetworkingLib/include/ServiceClient.h | 123 +++++------ NetworkingLib/include/ServiceServer.h | 32 +-- NetworkingLib/include/Socket.h | 249 +++++++++++++++++------ NetworkingLib/include/Stream.h | 119 +++++++++-- NetworkingLib/src/Networking.cpp | 7 +- NetworkingLib/src/Socket.cpp | 35 ++++ NetworkingLib/src/Stream.cpp | 33 +++ NetworkingLib/src/Time.cpp | 10 +- NetworkingLib/src/Timer.cpp | 9 +- NetworkingLib/test/PlatoonMessage.h | 83 +++++--- NetworkingLib/test/Test.cpp | 56 +++-- 17 files changed, 761 insertions(+), 390 deletions(-) create mode 100644 NetworkingLib/src/Socket.cpp create mode 100644 NetworkingLib/src/Stream.cpp diff --git a/NetworkingLib/CMakeLists.txt b/NetworkingLib/CMakeLists.txt index fb17fdbd..3f853f5b 100644 --- a/NetworkingLib/CMakeLists.txt +++ b/NetworkingLib/CMakeLists.txt @@ -26,8 +26,8 @@ set(CMAKE_CXX_FLAGS -pthread) # ========================= ########################### set(SOURCE_FILES - src/Networking.cpp include/Networking.h + include/Socket.h include/Stream.h include/Message.h include/ServiceClient.h @@ -36,8 +36,15 @@ set(SOURCE_FILES include/DatagramSender.h include/Time.h include/Timer.h + include/Resolver.h + include/Utils.h + include/Closeable.h + include/Error.h src/Timer.cpp - src/Time.cpp) + src/Time.cpp + src/Networking.cpp + src/Socket.cpp + src/Stream.cpp) add_library(NetworkingLib STATIC ${SOURCE_FILES}) set(PUBLIC_HEADER_FILES @@ -147,7 +154,7 @@ set(TEST_SOURCE_FILES test/PlatoonMessage.h test/PlatoonService.h test/Main.cpp - test/TestUtils.h include/Utils.h include/Error.h include/Socket.h include/Closeable.h include/Resolver.h) + test/TestUtils.h) add_executable(Test ${TEST_SOURCE_FILES}) target_link_libraries(Test ${Boost_LIBRARIES}) diff --git a/NetworkingLib/include/Closeable.h b/NetworkingLib/include/Closeable.h index 691438a8..9ce28502 100644 --- a/NetworkingLib/include/Closeable.h +++ b/NetworkingLib/include/Closeable.h @@ -77,8 +77,13 @@ struct IsOpen<std::shared_ptr<Closeable>> } }; -template<typename AsyncOperation, typename... AsyncOperationArgs, typename Closeable> -void timedOperation(Networking & net, +template< + typename ResultTuple, + typename AsyncOperation, + typename... AsyncOperationArgs, + typename Closeable> +void timedOperation(ResultTuple & result, + Networking & net, AsyncOperation asyncOperation, Closeable & closeable, const time::Duration & timeout, @@ -101,10 +106,12 @@ void timedOperation(Networking & net, // Run asynchronous connect. asyncOperation( std::forward<AsyncOperationArgs>(args)..., - [&closeableError, timer](const boost::system::error_code & error, auto && ... handlerArgs) + [&closeableError, timer, &result](const boost::system::error_code & error, auto && ... remainingHandlerArgs) { timer->stop(); - // update closeableError variable + // Create a tuple to store the results. + result = std::make_tuple(error, remainingHandlerArgs...); + // Update closeableError variable. closeableError = error; }); @@ -146,7 +153,7 @@ void timedAsyncOperation(Networking & net, asyncOperation( std::forward<AsyncOperationArgs>(asyncOperationArgs)..., - [&closeable, timer, handler](const boost::system::error_code & opError, auto && ... handlerArgs) + [&closeable, timer, handler](const boost::system::error_code & opError, auto && ... remainingHandlerArgs) { timer->stop(); @@ -156,7 +163,7 @@ void timedAsyncOperation(Networking & net, else if (opError) errorCode = error::codes::FAILED_OPERATION; - handler(errorCode, std::forward<decltype(handlerArgs)>(handlerArgs)...); + handler(errorCode, opError, std::forward<decltype(remainingHandlerArgs)>(remainingHandlerArgs)...); }); } diff --git a/NetworkingLib/include/DatagramReceiver.h b/NetworkingLib/include/DatagramReceiver.h index 00938a3e..517d9186 100644 --- a/NetworkingLib/include/DatagramReceiver.h +++ b/NetworkingLib/include/DatagramReceiver.h @@ -1,9 +1,9 @@ // -// Created by philipp on 29.12.17. +// Created by philipp on 15.01.18. // -#ifndef NETWORKINGLIB_ASYNCDATAGRAMRECEIVER_H -#define NETWORKINGLIB_ASYNCDATAGRAMRECEIVER_H +#ifndef NETWORKINGLIB_DATAGRAMRECEIVER_H +#define NETWORKINGLIB_DATAGRAMRECEIVER_H #include "Networking.h" #include "Stream.h" @@ -26,60 +26,64 @@ private: public: using Ptr = std::shared_ptr<DatagramReceiver<Message>>; - using Buffer = typename Message::Buffer; - using Udp = boost::asio::ip::udp; using Endpoint = Udp::endpoint; using Socket = Udp::socket; using ReceiveHandler = std::function< void(const error::ErrorCode & error, - const Message & message, - const std::string & senderHost, - std::uint16_t senderPort)>; + Message & message, + const std::string & host, + std::uint16_t port)>; + + static Ptr create(Networking & net, std::uint16_t bindingPort, std::size_t maxMessageSize = 512) + { + return std::make_shared<DatagramReceiver<Message>>(PrivateTag{}, net, bindingPort, maxMessageSize); + } - DatagramReceiver(PrivateTag, Networking & net, std::uint16_t bindingPort) + DatagramReceiver(PrivateTag, Networking & net, std::uint16_t bindingPort, std::size_t maxMessageSize) : net(net) , bindingPort(bindingPort) , socket(net.getIoService(), Udp::v4()) - , buffer(Message::emptyBuffer()) + , buffer(maxMessageSize) {} - static Ptr create(Networking & net, std::uint16_t bindingPort) - { - return std::make_shared<DatagramReceiver<Message>>(PrivateTag{}, net, bindingPort); - } - bool isReceiving() const noexcept { return receiving; }; - void asyncReceive(const time::Duration & timeout, ReceiveHandler receiveHandler) + Message receive(std::string & host, std::uint16_t & port, const time::Duration & timeout) + { + if (receiving) + throw error::Busy{}; + + utils::RAIIObject receivingFlagReset{[this] + { receiving = false; }}; + receiving = true; + newSocket(); + return message::receiveDatagramFrom<Message>(net, socket, buffer, host, port, timeout); + } + + void asyncReceive(const time::Duration & timeout, const ReceiveHandler & handler) { if (receiving) throw error::Busy{}; - try - { - receiving = true; - newSocket(); - - auto self = this->shared_from_this(); - message::asyncReceiveDatagramFrom<Message>( - net, socket, timeout, - [self, receiveHandler](const auto & error, - const auto & message, - const auto & senderHost, - auto senderPort) - { - self->finishReceiving(); - receiveHandler(error, message, senderHost, senderPort); - }); - } - catch (...) - { - finishReceiving(); - throw error::FailedOperation{}; - } + auto self = this->shared_from_this(); + auto state = std::make_shared<AsyncState>(self, handler); + + receiving = true; + newSocket(); + + message::asyncReceiveDatagramFrom<Message>( + net, socket, buffer, timeout, + [state](const error::ErrorCode& error, + protocol::PlatoonMessage & message, + const std::string & senderHost, + std::uint16_t senderPort) + { + state->self->receiving = false; + state->handler(error, message, senderHost, senderPort); + }); } void stop() @@ -94,7 +98,7 @@ private: Networking & net; std::uint16_t bindingPort; Socket socket; - Buffer buffer; + std::vector<char> buffer; std::atomic<bool> receiving{false}; void newSocket() @@ -104,14 +108,22 @@ private: socket.bind(Endpoint(Udp::v4(), bindingPort)); } - void finishReceiving() + struct AsyncState { - receiving = false; - closeable::Closer<Socket>::close(socket); - } + AsyncState(Ptr self, const ReceiveHandler & handler) + : self(self) + , handler(handler) + , receivingFlagReset([self] + { self->receiving = false; }) + {} + + Ptr self; + ReceiveHandler handler; + utils::RAIIObject receivingFlagReset; + }; }; } } -#endif //NETWORKINGLIB_ASYNCDATAGRAMRECEIVER_H +#endif //NETWORKINGLIB_DATAGRAMRECEIVER_H diff --git a/NetworkingLib/include/DatagramSender.h b/NetworkingLib/include/DatagramSender.h index f76dc690..8fc9afe4 100644 --- a/NetworkingLib/include/DatagramSender.h +++ b/NetworkingLib/include/DatagramSender.h @@ -49,11 +49,10 @@ public: if (sending) throw error::Busy{}; - sending = true; - // Use RAII to reset the sending flag when leaving this function. utils::RAIIObject sendingFlagReset{[this] { sending = false; }}; + sending = true; openSocket(); networking::message::sendDatagramTo(net, socket, message, host, port, timeout); } @@ -62,30 +61,24 @@ public: const std::string & host, std::uint16_t port, const time::Duration & timeout, - const SendHandler & sendHandler) + const SendHandler & handler) { if (sending) throw error::Busy{}; - try - { - sending = true; - openSocket(); - - auto self = this->shared_from_this(); - networking::message::asyncSendDatagramTo( - net, socket, message, host, port, timeout, - [self, sendHandler](const auto & error) - { - self->finishSending(); - sendHandler(error); - }); - } - catch (...) - { - finishSending(); - throw error::FailedOperation{}; - } + auto self = this->shared_from_this(); + auto state = std::make_shared<AsyncState>(self, handler); + + sending = true; + openSocket(); + + networking::message::asyncSendDatagramTo( + net, socket, message, host, port, timeout, + [state](const auto & error) + { + state->self->sending = false; + state->handler(error); + }); } bool isSending() const noexcept @@ -115,10 +108,19 @@ private: } } - void finishSending() + struct AsyncState { - sending = false; - } + AsyncState(Ptr self, const SendHandler & handler) + : self(self) + , handler(handler) + , sendingFlagReset([self] + { self->sending = false; }) + {} + + Ptr self; + SendHandler handler; + utils::RAIIObject sendingFlagReset; + }; }; } diff --git a/NetworkingLib/include/Message.h b/NetworkingLib/include/Message.h index 7508e92f..2147f9b6 100644 --- a/NetworkingLib/include/Message.h +++ b/NetworkingLib/include/Message.h @@ -13,6 +13,7 @@ #include "Stream.h" #include "Networking.h" #include "Socket.h" +#include <boost/algorithm/string/replace.hpp> namespace networking { @@ -22,17 +23,64 @@ namespace message using SendHandler = std::function<void(const error::ErrorCode & code)>; template<typename Message> -using ReceiveHandler = std::function<void(const error::ErrorCode & code, const Message & message)>; +using ReceiveHandler = std::function<void(const error::ErrorCode & code, Message & message)>; using SendToHandler = std::function<void(const error::ErrorCode & code)>; template<typename Message> using ReceiveFromHandler = std::function< void(const error::ErrorCode & code, - const Message & message, + Message & message, const std::string & senderHost, std::uint16_t senderPort)>; +template<typename Message> +struct Encoder; + +template<> +struct Encoder<std::string> +{ + std::string operator()(const std::string & message) const + { return message; } +}; + +template<typename Message> +struct Decoder; + +template<> +struct Decoder<std::string> +{ + std::string operator()(const std::string & data) const + { return data; } +}; + +namespace internal +{ + +// Some pre-processing functions before sending the stream over the network. +// Since our message delimiter is '\0' the message itself must not contain such a character. +// Therefore we will replace all occurrences of '\0' with "\0" ("\" is the escape character). +// This implies that we must also change "\" to "\\". + +template<typename Message> +std::string encode(const Message & message) +{ + auto data = Encoder<Message>{}(message); + boost::replace_all(data, "\\", "\\\\"); + boost::replace_all(data, std::string{'\0'}, "\\0"); + return data; +} + +template<typename Message> +Message decode(std::string & data) +{ + boost::replace_all(data, "\\\\", "\\"); + boost::replace_all(data, "\\0", std::string{'\0'}); + return Decoder<Message>{}(data); +} + +} + template<typename Message> void send(Networking & net, boost::asio::ip::tcp::socket & socket, @@ -43,7 +91,7 @@ void send(Networking & net, if (timeout <= 0s) throw error::Aborted(); - networking::stream::write(net, socket, boost::asio::buffer(message.data()), timeout); + networking::stream::write(net, socket, internal::encode(message), timeout); }; template<typename Message> @@ -53,40 +101,40 @@ void asyncSend(Networking & net, const time::Duration & timeout, SendHandler handler) { + auto data = std::make_shared<std::string>(internal::encode(message)); networking::stream::asyncWrite( - net, socket, boost::asio::buffer(message.data()), timeout, - [handler](const auto & errorCode, auto ...) - { - handler(errorCode); - }); + net, socket, *data, timeout, + [handler, data](const auto & errorCode) + { handler(errorCode); }); }; template<typename Message> Message receive(Networking & net, boost::asio::ip::tcp::socket & socket, + boost::asio::streambuf & buffer, const time::Duration & timeout) { using namespace std::chrono_literals; if (timeout <= 0s) throw error::Aborted(); - auto buffer = Message::emptyBuffer(); - networking::stream::read(net, socket, boost::asio::buffer(buffer), timeout); - return Message(buffer); + auto data = networking::stream::read(net, socket, buffer, timeout); + return internal::decode<Message>(data); }; template<typename Message> void asyncReceive(Networking & net, boost::asio::ip::tcp::socket & socket, + boost::asio::streambuf & buffer, const time::Duration & timeout, ReceiveHandler<Message> handler) { - auto buffer = std::make_shared<typename Message::Buffer>(Message::emptyBuffer()); networking::stream::asyncRead( - net, socket, boost::asio::buffer(*buffer), timeout, - [buffer, handler](const auto & errorCode, auto ...) + net, socket, buffer, timeout, + [handler](const auto & errorCode, auto & data) { - handler(errorCode, Message(*buffer)); + auto message = internal::decode<Message>(data); + handler(errorCode, message); }); }; @@ -98,7 +146,7 @@ void sendDatagramTo(Networking & net, std::uint16_t port, const time::Duration & timeout) { - networking::socket::sendTo(net, socket, boost::asio::buffer(message.data()), host, port, timeout); + networking::socket::sendTo(net, socket, internal::encode(message), host, port, timeout); } template<typename Message> @@ -110,43 +158,38 @@ void asyncSendDatagramTo(Networking & net, const time::Duration & timeout, SendToHandler handler) { + auto data = std::make_shared<std::string>(internal::encode(message)); networking::socket::asyncSendTo( - net, socket, boost::asio::buffer(message.data()), host, port, timeout, - [handler](const auto & error, auto ...) - { - handler(error); - }); + net, socket, *data, host, port, timeout, + [handler, data](const auto & error) + { handler(error); }); } template<typename Message> Message receiveDatagramFrom(Networking & net, boost::asio::ip::udp::socket & socket, + std::vector<char> & buffer, std::string & host, std::uint16_t & port, const time::Duration & timeout) { - auto data = Message::emptyBuffer(); - networking::socket::receiveFrom(net, socket, boost::asio::buffer(data), host, port, timeout); - return Message(data); + auto data = networking::socket::receiveFrom(net, socket, buffer, host, port, timeout); + return internal::decode<Message>(data); } template<typename Message> void asyncReceiveDatagramFrom(Networking & net, boost::asio::ip::udp::socket & socket, + std::vector<char> & buffer, const time::Duration & timeout, ReceiveFromHandler<Message> handler) { - auto buffer = std::make_shared<typename Message::Buffer>(Message::emptyBuffer()); networking::socket::asyncReceiveFrom( - net, socket, boost::asio::buffer(*buffer), timeout, - [buffer, handler](auto error, auto bytesTransferred, const auto & senderHost, auto senderPort) + net, socket, buffer, timeout, + [handler](auto error, auto & data, const auto & senderHost, auto senderPort) { - Message msg{*buffer}; - - if (!error && bytesTransferred < buffer->size()) - error = networking::error::codes::FAILED_OPERATION; - - handler(error, msg, senderHost, senderPort); + auto message = internal::decode<Message>(data); + handler(error, message, senderHost, senderPort); }); } diff --git a/NetworkingLib/include/Resolver.h b/NetworkingLib/include/Resolver.h index 97b05fde..34a46f07 100644 --- a/NetworkingLib/include/Resolver.h +++ b/NetworkingLib/include/Resolver.h @@ -20,7 +20,7 @@ namespace internal // This class is used for internal purposes. Please use the 'Resolver' class. // The boost::asio resolvers do not feature a close mechanism which we need in order to perform operations with timeouts. template<typename Protocol> -class CloseableResolver +class CloseableResolver : public Protocol::resolver { public: using Resolver = typename Protocol::resolver; @@ -30,22 +30,9 @@ public: using ResolveHandler = std::function<void(const boost::system::error_code & error)>; CloseableResolver(boost::asio::io_service & ioService) - : resolver(ioService) + : Resolver(ioService) {} - void async_resolve(const Query & query, - Iterator & endpointIterator, - ResolveHandler handler) - { - resolver.async_resolve( - query, - [&endpointIterator, handler](const auto & error, auto iterator) - { - endpointIterator = iterator; - handler(error); - }); - } - void open() { opened = true; @@ -62,12 +49,11 @@ public: return; opened = false; - resolver.cancel(); + Resolver::cancel(); } private: std::atomic<bool> opened{true}; - Resolver resolver; }; } @@ -122,11 +108,23 @@ public: resolver.open(); UnderlyingResolver::Query query{host, service}; - UnderlyingResolver::Iterator endpointIterator; auto resolveOperation = [this](auto && ... args) { resolver.async_resolve(std::forward<decltype(args)>(args)...); }; - closeable::timedOperation(net, resolveOperation, resolver, timeout, query, endpointIterator); + + std::tuple< + boost::system::error_code, + UnderlyingResolver::Iterator> result; + + closeable::timedOperation( + result, + net, + resolveOperation, + resolver, + timeout, + query); + + auto endpointIterator = std::get<1>(result); return endpointsFromIterator(endpointIterator); } @@ -134,39 +132,33 @@ public: void asyncResolve(const std::string & host, const std::string & service, const time::Duration & timeout, - ResolveHandler handler) + const ResolveHandler & handler) { if (resolving) throw error::Busy{}; - try - { - resolving = true; - resolver.open(); - - UnderlyingResolver::Query query{host, service}; - auto endpointIterator = std::make_shared<UnderlyingResolver::Iterator>(); - - auto self = shared_from_this(); - - auto resolveOperation = [this](auto && ... args) - { resolver.async_resolve(std::forward<decltype(args)>(args)...); }; - - closeable::timedAsyncOperation( - net, resolveOperation, resolver, timeout, - [self, endpointIterator, handler](const auto & error) - { - self->resolving = false; - auto endpoints = self->endpointsFromIterator(*endpointIterator); - handler(error, endpoints); - }, - query, *endpointIterator); - } - catch (...) - { - resolving = false; - throw error::FailedOperation{}; - } + auto self = shared_from_this(); + auto state = std::make_shared<AsyncState>(self, handler); + + resolving = true; + resolver.open(); + + UnderlyingResolver::Query query{host, service}; + + auto resolveOperation = [this](auto && ... args) + { resolver.async_resolve(std::forward<decltype(args)>(args)...); }; + + closeable::timedAsyncOperation( + net, + resolveOperation, + resolver, + timeout, + [state](const auto & networkingError, const auto & boostError, auto endpointIterator) + { + state->self->resolving = false; + state->handler(networkingError, state->self->endpointsFromIterator(endpointIterator)); + }, + query); } void stop() @@ -200,6 +192,20 @@ private: return endpoints; } + + struct AsyncState + { + AsyncState(Ptr self, const ResolveHandler & handler) + : self(self) + , handler(handler) + , resolvingFlagReset([self] + { self->resolving = false; }) + {} + + Ptr self; + ResolveHandler handler; + utils::RAIIObject resolvingFlagReset; + }; }; } diff --git a/NetworkingLib/include/ServiceClient.h b/NetworkingLib/include/ServiceClient.h index c55af6e2..ddc31cee 100644 --- a/NetworkingLib/include/ServiceClient.h +++ b/NetworkingLib/include/ServiceClient.h @@ -36,7 +36,7 @@ public: using Socket = typename boost::asio::ip::tcp::socket; - using CallHandler = std::function<void(const error::ErrorCode & error, const ResponseMessage & response)>; + using CallHandler = std::function<void(const error::ErrorCode & error, ResponseMessage & response)>; static Ptr create(Networking & net) { @@ -74,7 +74,8 @@ public: message::send(net, socket, request, timeout); updateTimeout(timeout, startTime); // Receive the response. - return message::receive<ResponseMessage>(net, socket, timeout); + boost::asio::streambuf buffer; + return message::receive<ResponseMessage>(net, socket, buffer, timeout); } void asyncCall(const std::string & host, @@ -86,63 +87,54 @@ public: if (calling) throw error::Busy{}; - try - { - newSocket(); - auto self = this->shared_from_this(); + auto self = this->shared_from_this(); - // Container for our variables which are needed for the subsequent asynchronous calls to connect, receive and send. - // When 'state' goes out of scope, it does cleanup and calls the handler and therefore makes use of the RAII concept. - auto state = std::make_shared<AsyncState>( - self, handler, false, ResponseMessage{ResponseMessage::emptyBuffer()}, error::codes::UNSPECIFIED, - timeout, time::now()); + // Container for our variables which are needed for the subsequent asynchronous calls to connect, receive and send. + // When 'state' goes out of scope, it does cleanup. + auto state = std::make_shared<AsyncState>( + self, handler, timeout, time::now()); - calling = true; + calling = true; + newSocket(); - // Connect to server. - networking::socket::asyncConnect( - net, socket, host, port, state->timeout, - [self, state, request](const auto & error, auto && ... rest) + // Connect to server. + networking::socket::asyncConnect( + net, socket, host, port, state->timeout, + [state, request](const auto & error) + { + if (error) { - // Only execute the handler if we're running on the networking thread. - state->shouldCallHandler = true; + ResponseMessage noResponse{}; + state->handler(error, noResponse); + return; + } - if (error) - { - state->error = error; - return; - } + Client<Service>::updateTimeout(state->timeout, state->startTime); - Client<Service>::updateTimeout(state->timeout, state->startTime); - - // Send the request. - networking::message::asyncSend( - self->net, self->socket, request, state->timeout, - [self, state](const auto & error) + // Send the request. + networking::message::asyncSend( + state->self->net, state->self->socket, request, state->timeout, + [state](const auto & error) + { + if (error) { - if (error) + ResponseMessage noResponse{}; + state->handler(error, noResponse); + return; + } + + Client<Service>::updateTimeout(state->timeout, state->startTime); + + // Receive the response. + networking::message::asyncReceive<ResponseMessage>( + state->self->net, state->self->socket, state->buffer, state->timeout, + [state](auto const & error, + auto & response) { - state->error = error; - return; - } - - Client<Service>::updateTimeout(state->timeout, state->startTime); - - // Receive the response. - networking::message::asyncReceive<ResponseMessage>( - self->net, self->socket, state->timeout, - [self, state](auto const & error, const auto & response) - { - state->error = error; - state->responseMessage = response; - }); - }); - }); - } - catch (...) - { - throw error::FailedOperation{}; - } + state->handler(error, response); + }); + }); + }); } bool isCalling() const noexcept @@ -181,41 +173,24 @@ private: struct AsyncState { AsyncState(Ptr self, - CallHandler handler, - bool shouldCallHandler, - ResponseMessage responseMessage, - error::ErrorCode error, + const CallHandler & handler, time::Duration timeout, time::TimePoint startTime) : self(self) , handler(handler) - , shouldCallHandler(shouldCallHandler) - , responseMessage(responseMessage) - , error(error) , timeout(timeout) , startTime(startTime) + , callingFlagReset([self]{ self->calling = false; }) + , closer(self->socket) {} Ptr self; CallHandler handler; - bool shouldCallHandler; - ResponseMessage responseMessage; - error::ErrorCode error; time::Duration timeout; time::TimePoint startTime; - - ~AsyncState() - { - closeable::Closer<Socket>::close(self->socket); - self->calling = false; - if (shouldCallHandler) - { - try - { handler(error, responseMessage); } - catch (...) - {} - } - } + boost::asio::streambuf buffer; + utils::RAIIObject callingFlagReset; + closeable::Closer<Socket> closer; }; }; diff --git a/NetworkingLib/include/ServiceServer.h b/NetworkingLib/include/ServiceServer.h index af3918ea..7c218444 100644 --- a/NetworkingLib/include/ServiceServer.h +++ b/NetworkingLib/include/ServiceServer.h @@ -34,7 +34,7 @@ public: using RequestReceivedHandler = std::function<ResponseMessage(const Endpoint & clientEndpoint, - const RequestMessage & requestMessage)>; + RequestMessage & requestMessage)>; static Ptr create(Networking & net, uint16_t bindingPort) { @@ -57,10 +57,7 @@ public: running = true; - try - { accept(requestReceivedHandler); } - catch (...) - { throw error::FailedOperation{}; } + accept(requestReceivedHandler); } void stop() @@ -81,12 +78,12 @@ private: acceptor = Acceptor(net.getIoService(), Endpoint{Tcp::v4(), bindingPort}); auto self = this->shared_from_this(); - auto state = std::make_shared<AsyncState>(net, handler); + auto state = std::make_shared<AsyncState>(self, handler); acceptor.async_accept( state->socket, - [self, state](const auto & acceptError) + [state](const auto & acceptError) { - if (!self->running) + if (!state->self->running) return; if (!acceptError) @@ -94,8 +91,8 @@ private: using namespace std::chrono_literals; networking::message::asyncReceive<RequestMessage>( - self->net, state->socket, 3s, - [self, state](const auto & errorCode, const auto & request) + state->self->net, state->socket, state->buffer, 3s, + [state](const auto & errorCode, auto & request) { // If a receive has timed out we treat it like we've never // received any message (and therefor we do not call the handler). @@ -105,8 +102,8 @@ private: auto response = state->requestReceivedHandler(state->socket.remote_endpoint(), request); networking::message::asyncSend( - self->net, state->socket, response, 3s, - [self, state](const auto & errorCode) + state->self->net, state->socket, response, 3s, + [state](const auto & errorCode) { // We cannot be sure that the message is going to be received at the other side anyway, // so we don't handle anything sending-wise. @@ -115,20 +112,23 @@ private: } // The next accept event will be put on the event queue. - self->accept(state->requestReceivedHandler); + state->self->accept(state->requestReceivedHandler); }); } private: struct AsyncState { - AsyncState(Networking & net, const RequestReceivedHandler & requestReceivedHandler) - : socket(net.getIoService()), - requestReceivedHandler(requestReceivedHandler) + AsyncState(Ptr self, const RequestReceivedHandler & requestReceivedHandler) + : self(self) + , socket(self->net.getIoService()) + , requestReceivedHandler(requestReceivedHandler) {} + Ptr self; Socket socket; RequestReceivedHandler requestReceivedHandler; + boost::asio::streambuf buffer; }; }; diff --git a/NetworkingLib/include/Socket.h b/NetworkingLib/include/Socket.h index 47aa1a76..d8c3975e 100644 --- a/NetworkingLib/include/Socket.h +++ b/NetworkingLib/include/Socket.h @@ -13,6 +13,26 @@ namespace networking namespace socket { +namespace internal +{ + +boost::asio::const_buffers_1 stringToBuffer(const std::string & str); + +bool isLastCharacterNull(const std::vector<char> & buffer, std::size_t numBytes); + +std::string stringFromBuffer(const std::vector<char> & buffer, std::size_t numBytes); + +} + +using ConnectHandler = std::function<void(const error::ErrorCode & error)>; + +using SendHandler = std::function<void(const error::ErrorCode & error)>; + +using ReceiveHandler = std::function<void(const error::ErrorCode & error, + std::string & data, + const std::string & host, + std::uint16_t port)>; + template<typename SocketService> void connect(Networking & net, SocketService & socket, @@ -20,18 +40,30 @@ void connect(Networking & net, std::uint16_t port, time::Duration timeout) { - using Resolver = internal::CloseableResolver<boost::asio::ip::tcp>; + using Resolver = networking::internal::CloseableResolver<boost::asio::ip::tcp>; auto startTime = time::now(); // Resolve host. Resolver resolver{net.getIoService()}; Resolver::Query query{host, std::to_string(port)}; - Resolver::Iterator endpointIterator; auto resolveOperation = [&resolver](auto && ... args) { resolver.async_resolve(std::forward<decltype(args)>(args)...); }; - closeable::timedOperation(net, resolveOperation, resolver, timeout, query, endpointIterator); + + std::tuple< + boost::system::error_code, + Resolver::Iterator> resolveResult; + + closeable::timedOperation( + resolveResult, + net, + resolveOperation, + resolver, + timeout, + query); + + auto endpointIterator = std::get<1>(resolveResult); // Update timeout. auto timeSpend = time::now() - startTime; @@ -39,10 +71,22 @@ void connect(Networking & net, auto connectOperation = [](auto && ... args) { boost::asio::async_connect(std::forward<decltype(args)>(args)...); }; - closeable::timedOperation(net, connectOperation, socket, timeout, socket, endpointIterator); + + std::tuple< + boost::system::error_code, + Resolver::Iterator> connectResult; + + closeable::timedOperation( + connectResult, + net, + connectOperation, + socket, + timeout, + socket, + endpointIterator); } -template<typename SocketService, typename ConnectHandler> +template<typename SocketService> void asyncConnect(Networking & net, SocketService & socket, const std::string & host, @@ -50,26 +94,28 @@ void asyncConnect(Networking & net, const time::Duration & timeout, const ConnectHandler & handler) { - using Resolver = internal::CloseableResolver<boost::asio::ip::tcp>; + using Resolver = networking::internal::CloseableResolver<boost::asio::ip::tcp>; auto startTime = time::now(); // Resolve host. auto resolver = std::make_shared<Resolver>(net.getIoService()); Resolver::Query query{host, std::to_string(port)}; - auto endpointIterator = std::make_shared<Resolver::Iterator>(); auto resolveOperation = [&resolver](auto && ... args) { resolver->async_resolve(std::forward<decltype(args)>(args)...); }; closeable::timedAsyncOperation( - net, resolveOperation, resolver, timeout, - [&net, &socket, host, port, timeout, handler, resolver, endpointIterator, startTime] - (const auto & error) + net, + resolveOperation, + resolver, + timeout, + [&net, &socket, host, port, timeout, handler, resolver, startTime] + (const auto & networkingError, const auto & boostError, auto endpointIterator) { - if (error) + if (networkingError) { - handler(error); + handler(networkingError); return; } @@ -79,86 +125,157 @@ void asyncConnect(Networking & net, auto connectOperation = [](auto && ... args) { boost::asio::async_connect(std::forward<decltype(args)>(args)...); }; - closeable::timedAsyncOperation(net, connectOperation, socket, newTimeout, handler, socket, *endpointIterator); + + closeable::timedAsyncOperation( + net, + connectOperation, + socket, + newTimeout, + [handler](const auto & networkingError, const auto & boostError, auto iterator) + { handler(networkingError); }, + socket, + endpointIterator); }, - query, *endpointIterator); + query); } -template< - typename DatagramSocket, - typename MutableBuffers> -void receiveFrom(Networking & net, +template<typename DatagramSocket> +void sendTo(Networking & net, + DatagramSocket & socket, + const std::string & data, + const std::string & host, + std::uint16_t port, + const time::Duration & timeout) +{ + boost::asio::ip::udp::endpoint endpoint{boost::asio::ip::address::from_string(host), port}; + + auto asyncOperation = [&socket](auto && ... args) + { socket.async_send_to(std::forward<decltype(args)>(args)...); }; + + std::tuple< + boost::system::error_code, + std::size_t> result; + + closeable::timedOperation( + result, + net, + asyncOperation, + socket, + timeout, + internal::stringToBuffer(data), + endpoint); + + auto bytesTransferred = std::get<1>(result); + if (bytesTransferred < data.size()) + throw error::FailedOperation{}; +}; + +template<typename DatagramSocket> +void asyncSendTo(Networking & net, DatagramSocket & socket, - const MutableBuffers & buffer, - std::string & senderHost, - std::uint16_t & senderPort, - const time::Duration & timeout) + const std::string & data, + const std::string & host, + std::uint16_t port, + const time::Duration & timeout, + const SendHandler & handler) +{ + boost::asio::ip::udp::endpoint endpoint{boost::asio::ip::address::from_string(host), port}; + + auto asyncOperation = [&socket](auto && ... args) + { socket.async_send_to(std::forward<decltype(args)>(args)...); }; + + closeable::timedAsyncOperation( + net, + asyncOperation, + socket, + timeout, + [&data, handler](const auto & networkingError, const auto & boostError, auto bytesTransferred) + { + if (bytesTransferred < data.size()) + { + handler(error::codes::FAILED_OPERATION); + return; + } + + handler(networkingError); + }, + internal::stringToBuffer(data), + endpoint); +}; + +template<typename DatagramSocket> +std::string receiveFrom(Networking & net, + DatagramSocket & socket, + std::vector<char> buffer, + std::string & senderHost, + std::uint16_t & senderPort, + const time::Duration & timeout) { boost::asio::ip::udp::endpoint senderEndpoint; + auto asyncOperation = [&socket](auto && ... args) { socket.async_receive_from(std::forward<decltype(args)>(args)...); }; - closeable::timedOperation(net, asyncOperation, socket, timeout, buffer, senderEndpoint); + + std::tuple< + boost::system::error_code, + std::size_t> result; + + closeable::timedOperation( + result, + net, + asyncOperation, + socket, + timeout, + boost::asio::buffer(buffer), + senderEndpoint); + senderHost = senderEndpoint.address().to_string(); senderPort = senderEndpoint.port(); + + auto bytesTransferred = std::get<1>(result); + if (!internal::isLastCharacterNull(buffer, bytesTransferred)) + throw error::FailedOperation{}; + + return internal::stringFromBuffer(buffer, bytesTransferred); } -template< - typename DatagramSocket, - typename MutableBuffers, - typename ReceiveHandler> +template<typename DatagramSocket> void asyncReceiveFrom(Networking & net, DatagramSocket & socket, - const MutableBuffers & buffer, + std::vector<char> & buffer, const time::Duration & timeout, ReceiveHandler handler) { auto senderEndpoint = std::make_shared<boost::asio::ip::udp::endpoint>(); + auto asyncOperation = [&socket](auto && ... args) { socket.async_receive_from(std::forward<decltype(args)>(args)...); }; + closeable::timedAsyncOperation( - net, asyncOperation, socket, timeout, - [handler, senderEndpoint](auto && ... args) + net, + asyncOperation, + socket, + timeout, + [&buffer, handler, senderEndpoint](const auto & networkingError, const auto & boostError, auto bytesTransferred) { + if (!internal::isLastCharacterNull(buffer, bytesTransferred)) + { + std::string data{}; + std::string senderHost{}; + std::uint16_t senderPort{}; + handler(error::codes::FAILED_OPERATION, data, senderHost, senderPort); + return; + } + auto senderHost = senderEndpoint->address().to_string(); auto senderPort = senderEndpoint->port(); - handler(std::forward<decltype(args)>(args)..., senderHost, senderPort); - }, buffer, *senderEndpoint); + auto data = internal::stringFromBuffer(buffer, bytesTransferred); + handler(networkingError, data, senderHost, senderPort); + }, + boost::asio::buffer(buffer), + *senderEndpoint); } -template< - typename DatagramSocket, - typename MutableBuffers> -void sendTo(Networking & net, - DatagramSocket & socket, - const MutableBuffers & buffer, - const std::string & host, - std::uint16_t port, - const time::Duration & timeout) -{ - boost::asio::ip::udp::endpoint endpoint{boost::asio::ip::address::from_string(host), port}; - auto asyncOperation = [&socket](auto && ... args) - { socket.async_send_to(std::forward<decltype(args)>(args)...); }; - closeable::timedOperation(net, asyncOperation, socket, timeout, buffer, endpoint); -}; - -template< - typename DatagramSocket, - typename MutableBuffers, - typename SendHandler> -void asyncSendTo(Networking & net, - DatagramSocket & socket, - const MutableBuffers & buffer, - const std::string & host, - std::uint16_t port, - const time::Duration & timeout, - SendHandler handler) -{ - boost::asio::ip::udp::endpoint endpoint{boost::asio::ip::address::from_string(host), port}; - auto asyncOperation = [&socket](auto && ... args) - { socket.async_send_to(std::forward<decltype(args)>(args)...); }; - closeable::timedAsyncOperation(net, asyncOperation, socket, timeout, handler, buffer, endpoint); -}; - } } diff --git a/NetworkingLib/include/Stream.h b/NetworkingLib/include/Stream.h index d807854d..4e702643 100644 --- a/NetworkingLib/include/Stream.h +++ b/NetworkingLib/include/Stream.h @@ -17,50 +17,125 @@ namespace networking namespace stream { -template<typename SyncWriteStream, typename ConstBufferSequence> +namespace internal +{ + +boost::asio::const_buffers_1 stringToBuffer(const std::string & str); + +std::string stringFromStreambuf(boost::asio::streambuf & streambuf, std::size_t numBytes); + +} + +using WriteHandler = std::function<void(const error::ErrorCode & error)>; + +using ReadHandler = std::function<void(const error::ErrorCode & error, std::string & data)>; + +template<typename SyncWriteStream> void write(Networking & net, SyncWriteStream & stream, - const ConstBufferSequence & buffer, + const std::string & data, const time::Duration & timeout) { auto asyncOperation = [](auto && ... args) { boost::asio::async_write(std::forward<decltype(args)>(args)...); }; - closeable::timedOperation(net, asyncOperation, stream, timeout, stream, buffer); -}; -template<typename SyncReadStream, typename MutableBufferSequence> -void read(Networking & net, - SyncReadStream & stream, - const MutableBufferSequence & buffer, - const time::Duration & timeout) -{ - auto asyncOperation = [](auto && ... args) - { boost::asio::async_read(std::forward<decltype(args)>(args)...); }; - closeable::timedOperation(net, asyncOperation, stream, timeout, stream, buffer); + std::tuple< + boost::system::error_code, + std::size_t> result; + + closeable::timedOperation( + result, + net, + asyncOperation, + stream, + timeout, + stream, + internal::stringToBuffer(data)); + + auto bytesTransferred = std::get<1>(result); + if (bytesTransferred < data.size()) + throw error::FailedOperation{}; }; -template<typename SyncWriteStream, typename MutableBufferSequence, typename WriteHandler> +template<typename SyncWriteStream> void asyncWrite(Networking & net, SyncWriteStream & stream, - MutableBufferSequence buffer, + const std::string & data, const time::Duration & timeout, - WriteHandler handler) + const WriteHandler & handler) { auto asyncOperation = [](auto && ... args) { boost::asio::async_write(std::forward<decltype(args)>(args)...); }; - closeable::timedAsyncOperation(net, asyncOperation, stream, timeout, handler, stream, buffer); + + closeable::timedAsyncOperation( + net, + asyncOperation, + stream, + timeout, + [&data, handler](const auto & networkingError, const auto & boostError, auto bytesTransferred) + { + if (bytesTransferred < data.size()) + { + handler(error::codes::FAILED_OPERATION); + return; + } + + handler(networkingError); + }, + stream, + internal::stringToBuffer(data)); } -template<typename SyncReadStream, typename MutableBufferSequence, typename ReadHandler> +template<typename SyncReadStream> +std::string read(Networking & net, + SyncReadStream & stream, + boost::asio::streambuf & buffer, + const time::Duration & timeout) +{ + auto asyncOperation = [](auto && ... args) + { boost::asio::async_read_until(std::forward<decltype(args)>(args)...); }; + + std::tuple< + boost::system::error_code, + std::size_t> result; + + closeable::timedOperation( + result, + net, + asyncOperation, + stream, + timeout, + stream, + buffer, + '\0'); + + auto bytesTransferred = std::get<1>(result); + return internal::stringFromStreambuf(buffer, bytesTransferred); +}; + +template<typename SyncReadStream> void asyncRead(Networking & net, SyncReadStream & stream, - const MutableBufferSequence & buffer, + boost::asio::streambuf & buffer, const time::Duration & timeout, - ReadHandler handler) + const ReadHandler & handler) { auto asyncOperation = [](auto && ... args) - { boost::asio::async_read(std::forward<decltype(args)>(args)...); }; - closeable::timedAsyncOperation(net, asyncOperation, stream, timeout, handler, stream, buffer); + { boost::asio::async_read_until(std::forward<decltype(args)>(args)...); }; + + closeable::timedAsyncOperation( + net, + asyncOperation, + stream, + timeout, + [&buffer, handler](const auto & networkingError, const auto & boostError, auto bytesTransferred) + { + auto data = internal::stringFromStreambuf(buffer, bytesTransferred); + handler(networkingError, data); + }, + stream, + buffer, + '\0'); }; } diff --git a/NetworkingLib/src/Networking.cpp b/NetworkingLib/src/Networking.cpp index e0dbe192..424741b0 100644 --- a/NetworkingLib/src/Networking.cpp +++ b/NetworkingLib/src/Networking.cpp @@ -4,7 +4,8 @@ #include "../include/Networking.h" -using namespace networking; +namespace networking +{ Networking::Networking() { @@ -23,6 +24,8 @@ Networking::Networking() catch (...) { // Ignore exceptions raised by handlers. + // TODO: remove this! + throw; } } }); @@ -63,4 +66,4 @@ void Networking::callLater(const Handler & handler) ioService.post(handler); } - +} \ No newline at end of file diff --git a/NetworkingLib/src/Socket.cpp b/NetworkingLib/src/Socket.cpp new file mode 100644 index 00000000..e4ba8eeb --- /dev/null +++ b/NetworkingLib/src/Socket.cpp @@ -0,0 +1,35 @@ +// +// Created by philipp on 15.01.18. +// + +#include "../include/Socket.h" + +namespace networking +{ +namespace socket +{ +namespace internal +{ + +boost::asio::const_buffers_1 stringToBuffer(const std::string & str) +{ + // Adds '\0' to the buffer. + return boost::asio::buffer(str.c_str(), str.size() + 1); +} + +bool isLastCharacterNull(const std::vector<char> & buffer, std::size_t numBytes) +{ + if (numBytes == 0) + return false; + + return buffer[numBytes - 1] == '\0'; +} + +std::string stringFromBuffer(const std::vector<char> & buffer, std::size_t numBytes) +{ + return std::string{buffer.begin(), buffer.begin() + numBytes}; +} + +} +} +} \ No newline at end of file diff --git a/NetworkingLib/src/Stream.cpp b/NetworkingLib/src/Stream.cpp new file mode 100644 index 00000000..bcc613b3 --- /dev/null +++ b/NetworkingLib/src/Stream.cpp @@ -0,0 +1,33 @@ +// +// Created by philipp on 15.01.18. +// + +#include "../include/Stream.h" + +namespace networking +{ +namespace stream +{ +namespace internal +{ + +boost::asio::const_buffers_1 stringToBuffer(const std::string & str) +{ + // Adds '\0' to the buffer. + return boost::asio::buffer(str.c_str(), str.size() + 1); +} + +std::string stringFromStreambuf(boost::asio::streambuf & streambuf, std::size_t numBytes) +{ + using boost::asio::buffers_begin; + if (numBytes == 0) + return std::string{""}; + auto buffer = streambuf.data(); + auto result = std::string{buffers_begin(buffer), buffers_begin(buffer) + numBytes - 1}; + streambuf.consume(numBytes); + return result; +} + +} +} +} \ No newline at end of file diff --git a/NetworkingLib/src/Time.cpp b/NetworkingLib/src/Time.cpp index 7f1592f6..29b69960 100644 --- a/NetworkingLib/src/Time.cpp +++ b/NetworkingLib/src/Time.cpp @@ -4,9 +4,15 @@ #include "../include/Time.h" -using namespace networking; +namespace networking +{ +namespace time +{ -time::TimePoint time::now() noexcept +time::TimePoint now() noexcept { return std::chrono::steady_clock::now(); +} + +} } \ No newline at end of file diff --git a/NetworkingLib/src/Timer.cpp b/NetworkingLib/src/Timer.cpp index cba2720e..d414ad64 100644 --- a/NetworkingLib/src/Timer.cpp +++ b/NetworkingLib/src/Timer.cpp @@ -5,8 +5,10 @@ #include "../include/Timer.h" #include "../include/Error.h" -using namespace networking; -using namespace time; +namespace networking +{ +namespace time +{ Timer::Timer(PrivateTag, Networking & net) : timer(net.getIoService()) @@ -100,4 +102,7 @@ void Timer::nextPeriod(const time::Duration & interval, Timer::TimeoutHandler ha handler(); self->nextPeriod(interval, handler); }); +} + +} } \ No newline at end of file diff --git a/NetworkingLib/test/PlatoonMessage.h b/NetworkingLib/test/PlatoonMessage.h index 6b7883a4..e052df37 100644 --- a/NetworkingLib/test/PlatoonMessage.h +++ b/NetworkingLib/test/PlatoonMessage.h @@ -7,6 +7,7 @@ #include <cstdint> #include <vector> +#include <../include/Message.h> namespace protocol { @@ -26,7 +27,8 @@ constexpr MessageType REJECT_RESPONSE = 0x04; class PlatoonMessage { public: - using Buffer = std::vector<std::uint8_t>; + PlatoonMessage() + {} PlatoonMessage(VehicleId vehicleId, MessageType messageType, PlatoonId platoonId) : vehicleId(vehicleId) @@ -34,22 +36,6 @@ public: , platoonId(platoonId) {} - PlatoonMessage(Buffer buffer) - : vehicleId(0) - , messageType(0) - , platoonId(0) - { - vehicleId += ((VehicleId) buffer[0]); - vehicleId += ((VehicleId) buffer[1]) << 8; - vehicleId += ((VehicleId) buffer[2]) << 16; - vehicleId += ((VehicleId) buffer[3]) << 24; - messageType += (MessageType) buffer[4]; - platoonId += ((PlatoonId) buffer[5]); - platoonId += ((PlatoonId) buffer[6]) << 8; - platoonId += ((PlatoonId) buffer[7]) << 16; - platoonId += ((PlatoonId) buffer[8]) << 24; - } - static PlatoonMessage followerRequest(VehicleId vehicleId) { return PlatoonMessage(vehicleId, messageTypes::FOLLOWER_REQUEST, 0); } @@ -62,9 +48,6 @@ public: static PlatoonMessage acceptResponse(VehicleId vehicleId, PlatoonId platoonId) { return PlatoonMessage(vehicleId, messageTypes::ACCEPT_RESPONSE, platoonId); } - static Buffer emptyBuffer() - { return Buffer(sizeof(vehicleId) + sizeof(messageType) + sizeof(platoonId)); } - VehicleId getVehicleId() const { return vehicleId; } @@ -74,9 +57,32 @@ public: PlatoonId getPlatoonId() const { return platoonId; } - Buffer data() const +private: + VehicleId vehicleId; + MessageType messageType; + PlatoonId platoonId; +}; + +} + +namespace networking +{ +namespace message +{ + +template<> +struct Encoder<protocol::PlatoonMessage> +{ + std::string operator()(const protocol::PlatoonMessage & message) const { - Buffer data = emptyBuffer(); + using namespace protocol; + + std::string data(9, '\0'); + + auto vehicleId = message.getVehicleId(); + auto messageType = message.getMessageType(); + auto platoonId = message.getPlatoonId(); + data[0] = (std::uint8_t) (vehicleId & 0x000000ff); data[1] = (std::uint8_t) ((vehicleId & 0x0000ff00) >> 8); data[2] = (std::uint8_t) ((vehicleId & 0x00ff0000) >> 16); @@ -86,14 +92,39 @@ public: data[6] = (std::uint8_t) ((platoonId & 0x0000ff00) >> 8); data[7] = (std::uint8_t) ((platoonId & 0x00ff0000) >> 16); data[8] = (std::uint8_t) ((platoonId & 0xff000000) >> 24); + return data; } +}; -private: - VehicleId vehicleId; - MessageType messageType; - PlatoonId platoonId; +template<> +struct Decoder<protocol::PlatoonMessage> +{ + protocol::PlatoonMessage operator()(const std::string & data) const + { + using namespace protocol; + + auto size = data.size(); + + VehicleId vehicleId{0}; + MessageType messageType{0}; + PlatoonId platoonId{0}; + + vehicleId += ((VehicleId) data[0]); + vehicleId += ((VehicleId) data[1]) << 8; + vehicleId += ((VehicleId) data[2]) << 16; + vehicleId += ((VehicleId) data[3]) << 24; + messageType += (MessageType) data[4]; + platoonId += ((PlatoonId) data[5]); + platoonId += ((PlatoonId) data[6]) << 8; + platoonId += ((PlatoonId) data[7]) << 16; + platoonId += ((PlatoonId) data[8]) << 24; + + return PlatoonMessage(vehicleId, messageType, platoonId); + } }; + +} } #endif //PROTOCOL_PLATOONCREATEMESSAGE_H diff --git a/NetworkingLib/test/Test.cpp b/NetworkingLib/test/Test.cpp index 53c57135..c5b95f0a 100644 --- a/NetworkingLib/test/Test.cpp +++ b/NetworkingLib/test/Test.cpp @@ -14,10 +14,14 @@ using boost::asio::ip::tcp; -using namespace networking; using namespace std::chrono_literals; -void test::testServices() +namespace networking +{ +namespace test +{ + +void testServices() { using namespace protocol; Networking net1; @@ -27,7 +31,7 @@ void test::testServices() server->advertiseService( [](const auto & clientEndpoint, - const auto & requestMessage) -> PlatoonMessage + auto & requestMessage) -> PlatoonMessage { std::cout << "Request from " << (int) requestMessage.getVehicleId() @@ -49,7 +53,7 @@ void test::testServices() } } -void test::testTcpClientTimeout() +void testTcpClientTimeout() { using namespace protocol; Networking net1; @@ -61,7 +65,7 @@ void test::testTcpClientTimeout() server->advertiseService( [](const auto & clientEndpoint, - const auto & requestMessage) -> PlatoonMessage + auto & requestMessage) -> PlatoonMessage { // Just sleep for 5 seconds. sleep(5); @@ -87,7 +91,7 @@ void test::testTcpClientTimeout() } } -void test::testMultipleConnections() +void testMultipleConnections() { using namespace protocol; Networking net1; @@ -99,13 +103,13 @@ void test::testMultipleConnections() server1->advertiseService( [](const auto & clientEndpoint, - const auto & requestMessage) -> PlatoonMessage + auto & requestMessage) -> PlatoonMessage { return PlatoonMessage::acceptResponse(1, 42); }); server2->advertiseService( [](const auto & clientEndpoint, - const auto & requestMessage) -> PlatoonMessage + auto & requestMessage) -> PlatoonMessage { return PlatoonMessage::acceptResponse(2, 43); }); @@ -121,7 +125,7 @@ void test::testMultipleConnections() std::cout << "Response from " << response.getVehicleId() << std::endl; } -void test::testStoppingServiceServer() +void testStoppingServiceServer() { using namespace protocol; Networking net1; @@ -130,7 +134,7 @@ void test::testStoppingServiceServer() auto server = service::Server<PlatoonService>::create(net1, 10001); auto handler = [](const auto & clientEndpoint, - const auto & requestMessage) -> PlatoonMessage + auto & requestMessage) -> PlatoonMessage { // Just sleep for 3 seconds. sleep(2); @@ -156,7 +160,7 @@ void test::testStoppingServiceServer() } } -void test::testAsyncDatagramReceiver() +void testAsyncDatagramReceiver() { using namespace protocol; Networking net1; @@ -165,22 +169,29 @@ void test::testAsyncDatagramReceiver() auto receiver = message::DatagramReceiver<PlatoonMessage>::create(net1, 10000); auto sender = message::DatagramSender<PlatoonMessage>::create(net2); + std::atomic<bool> running{true}; + receiver->asyncReceive( 3s, - [](const auto & error, - const auto & message, - const auto & senderHost, - auto senderPort) + [&running](const auto & error, + auto & message, + const auto & senderHost, + auto senderPort) { if (!error) std::cout << "SUCCESS! Received message from: " << message.getVehicleId() << "\n"; + + running = false; }); sleep(1); + sender->send(PlatoonMessage::followerRequest(42), "127.0.0.1", 10000, 5s); + + while (running); } -void test::testPeriodicTimer() +void testPeriodicTimer() { Networking net; @@ -212,7 +223,7 @@ void test::testPeriodicTimer() while (running); } -void test::testServiceClientAsyncCallTimeout() +void testServiceClientAsyncCallTimeout() { using namespace protocol; Networking net1; @@ -222,7 +233,7 @@ void test::testServiceClientAsyncCallTimeout() server->advertiseService( [](const auto & clientEndpoint, - const auto & requestMessage) -> PlatoonMessage + auto & requestMessage) -> PlatoonMessage { sleep(3); return PlatoonMessage::acceptResponse(1, 42); @@ -247,7 +258,7 @@ void test::testServiceClientAsyncCallTimeout() while (running); } -void test::testDatagramSenderAsyncSend() +void testDatagramSenderAsyncSend() { using namespace protocol; Networking net; @@ -260,7 +271,7 @@ void test::testDatagramSenderAsyncSend() receiver->asyncReceive( 3s, [&running](const auto & error, - const auto & message, + auto & message, const std::string & senderHost, auto senderPort) { @@ -285,7 +296,7 @@ void test::testDatagramSenderAsyncSend() while (running); } -void test::testResolver() +void testResolver() { Networking net; @@ -323,3 +334,6 @@ void test::testResolver() std::cout << "SUCCESS!\n"; } +} +} + -- GitLab