From d6b7f0fc1afb121541d92979ae7d8968def279fa Mon Sep 17 00:00:00 2001 From: Hoop77 <p.badenhoop@gmx.de> Date: Fri, 15 Dec 2017 12:04:53 +0100 Subject: [PATCH] finally solved the boost asio issue --- Protocol/FollowerVehicle.cpp | 6 +- Protocol/LeaderVehicle.cpp | 2 +- Protocol/Main.cpp | 2 +- Protocol/NetworkMessage.h | 65 ++++--- Protocol/NetworkServiceClient.h | 9 +- Protocol/NetworkServiceServer.h | 67 ++++--- Protocol/Networking.h | 3 + Protocol/NetworkingTest.cpp | 50 +++--- Protocol/NetworkingUtils.h | 305 ++++++++++++++------------------ Protocol/Vehicle.cpp | 16 +- Protocol/Vehicle.h | 15 +- 11 files changed, 267 insertions(+), 273 deletions(-) diff --git a/Protocol/FollowerVehicle.cpp b/Protocol/FollowerVehicle.cpp index 0d56eb84..d588d3c5 100644 --- a/Protocol/FollowerVehicle.cpp +++ b/Protocol/FollowerVehicle.cpp @@ -17,7 +17,7 @@ void FollowerVehicle::doOnCreatePlatoon() log("follower[", std::to_string(vehicleId), "] starts creating platoon\n"); using namespace std::placeholders; - platoonCreateServiceServer.advertiseService( + platoonCreateServiceServer->advertiseService( "127.0.0.1", vehicleId + 10000, // TODO: endpoint information! std::bind(&FollowerVehicle::makePlatoonCreateResponse, this, _1, _2)); @@ -47,7 +47,7 @@ void FollowerVehicle::doCreatePlatoon() void FollowerVehicle::doOnRunningPlatoon() { log("follower[", std::to_string(vehicleId), "] starts running a platoon\n"); - platoonCreateServiceServer.stop(); + platoonCreateServiceServer->stop(); state = State::RUNNING_PLATOON; } @@ -61,7 +61,7 @@ void FollowerVehicle::doRunningPlatoon() void FollowerVehicle::doOnLeavingPlatoon() { - platoonCreateServiceServer.stop(); + platoonCreateServiceServer->stop(); state = State::LEAVING_PLATOON; } diff --git a/Protocol/LeaderVehicle.cpp b/Protocol/LeaderVehicle.cpp index ceeef858..28207163 100644 --- a/Protocol/LeaderVehicle.cpp +++ b/Protocol/LeaderVehicle.cpp @@ -16,7 +16,7 @@ void LeaderVehicle::doOnCreatePlatoon() platoonId = rand(); using namespace std::placeholders; - platoonCreateServiceServer.advertiseService( + platoonCreateServiceServer->advertiseService( "127.0.0.1", vehicleId + 10000, // TODO: endpoint information! std::bind(&LeaderVehicle::makePlatoonCreateResponse, this, _1, _2)); diff --git a/Protocol/Main.cpp b/Protocol/Main.cpp index 85cf8431..d8f7e5f1 100644 --- a/Protocol/Main.cpp +++ b/Protocol/Main.cpp @@ -4,7 +4,7 @@ #include "FollowerVehicle.h" #include "Networking.h" -//#define NETWORKING_TEST +#define NETWORKING_TEST #if defined(NETWORKING_TEST) diff --git a/Protocol/NetworkMessage.h b/Protocol/NetworkMessage.h index 7cadb0d7..b5bafe9e 100644 --- a/Protocol/NetworkMessage.h +++ b/Protocol/NetworkMessage.h @@ -26,12 +26,15 @@ void send(SyncWriteStream & socket, const Message & message) }; template<typename Message, typename SyncWriteStream> -void send(SyncWriteStream & socket, const Message & message, const boost::posix_time::time_duration & timeout) +void send(Networking & net, + SyncWriteStream & socket, + const Message & message, + const boost::posix_time::time_duration & timeout) { if (timeout <= boost::posix_time::seconds(0)) throw std::runtime_error("message::send() given timeout <= 0"); - networking::utils::write(socket, boost::asio::buffer(message.data()), timeout); + networking::utils::write(net, socket, boost::asio::buffer(message.data()), timeout); }; template<typename Message, typename SyncReadStream> @@ -43,16 +46,48 @@ Message receive(SyncReadStream & socket) }; template<typename Message, typename SyncReadStream> -Message receive(SyncReadStream & socket, const boost::posix_time::time_duration & timeout) +Message receive(Networking & net, + SyncReadStream & socket, + const boost::posix_time::time_duration & timeout) { if (timeout <= boost::posix_time::seconds(0)) throw std::runtime_error("message::receive() given timeout <= 0"); auto buf = Message::emptyBuffer(); - networking::utils::read(socket, boost::asio::buffer(buf), timeout); + networking::utils::read(net, socket, boost::asio::buffer(buf), timeout); return Message(buf); }; +template<typename Message, typename DatagramSocket, typename Endpoint> +void sendDatagram(DatagramSocket & socket, const Message & message, const Endpoint & endpoint) +{ + auto buf = Message::emptyBuffer(); + socket.send_to(buf, endpoint); +} + +template<typename Message> +class DatagramSender : private boost::noncopyable +{ +public: + using Socket = boost::asio::ip::udp::socket; + using Endpoint = boost::asio::ip::udp::endpoint; + + DatagramSender(Networking & net) + : net(net) + , socket(net.getIoService()) + {} + + void send(const Message & message, const std::string & host, uint16_t port) + { + Endpoint endpoint{boost::asio::ip::address::from_string(host), port}; + sendDatagram(socket, message, endpoint); + } + +private: + Networking & net; + Socket socket; +}; + template<typename Message, typename DatagramSocket, typename Endpoint, typename ReceiveHandler> void asyncDatagramReceive(DatagramSocket & socket, Endpoint & senderEndpoint, @@ -88,27 +123,13 @@ public: , socket(net.getIoService()) {} - AsyncDatagramReceiver(const AsyncDatagramReceiver & other) = delete; + AsyncDatagramReceiver(const AsyncDatagramReceiver &) = delete; - AsyncDatagramReceiver & operator=(const AsyncDatagramReceiver & other) = delete; + AsyncDatagramReceiver & operator=(const AsyncDatagramReceiver &) = delete; - AsyncDatagramReceiver(AsyncDatagramReceiver && other) - : net(other.net) - , senderEndpoint(std::move(other.senderEndpoint)) - , socket(std::move(other.socket)) - {} + AsyncDatagramReceiver(AsyncDatagramReceiver &&) = delete; - AsyncDatagramReceiver & operator=(AsyncDatagramReceiver && other) - { - net = other.net; - senderEndpoint = other.senderEndpoint; - socket = other.socket; - } - - ~AsyncDatagramReceiver() - { - networking::utils::closeSafely(socket); - } + AsyncDatagramReceiver & operator=(AsyncDatagramReceiver &&) = delete; void receive(const std::string & host, std::uint16_t port, diff --git a/Protocol/NetworkServiceClient.h b/Protocol/NetworkServiceClient.h index 8540ed9f..4a4b71d0 100644 --- a/Protocol/NetworkServiceClient.h +++ b/Protocol/NetworkServiceClient.h @@ -36,7 +36,6 @@ public: const RequestMessage & request) { Socket socket(net.getIoService()); - networking::utils::SafeSocketCloser<Socket> closer(socket); boost::asio::connect(socket, serverEndpointIterator); message::send(socket, request); return message::receive<ResponseMessage>(socket); @@ -56,22 +55,22 @@ public: boost::posix_time::time_duration timeout) { Socket socket(net.getIoService()); - networking::utils::SafeSocketCloser<Socket> closer(socket); // We have three places to lose time: connecting, sending and receiving. // So after each operation we subtract the time spend from our timeout. auto startTime = boost::posix_time::microsec_clock::local_time(); // Connect to server. - networking::utils::connect(socket, serverEndpointIterator, timeout); + networking::utils::connect(net, socket, serverEndpointIterator, timeout); auto nowTime = boost::posix_time::microsec_clock::local_time(); auto timeSpend = nowTime - startTime; + startTime = nowTime; timeout -= timeSpend; // Send the request. - message::send(socket, request, timeout); + message::send(net, socket, request, timeout); nowTime = boost::posix_time::microsec_clock::local_time(); timeSpend = nowTime - startTime; timeout -= timeSpend; // Receive the response. - return message::receive<ResponseMessage>(socket, timeout); + return message::receive<ResponseMessage>(net, socket, timeout); } ResponseMessage call(const std::string & host, diff --git a/Protocol/NetworkServiceServer.h b/Protocol/NetworkServiceServer.h index 001b6677..9f114d25 100644 --- a/Protocol/NetworkServiceServer.h +++ b/Protocol/NetworkServiceServer.h @@ -17,12 +17,19 @@ namespace service { template<typename Service> -class Server +class Server : public std::enable_shared_from_this<Server<Service>> { +private: + struct PrivateTag + { + }; + public: using RequestMessage = typename Service::RequestMessage; using ResponseMessage = typename Service::ResponseMessage; + using Ptr = std::shared_ptr<Server<Service>>; + using Endpoint = typename boost::asio::ip::tcp::endpoint; using Socket = typename boost::asio::ip::tcp::socket; using Acceptor = typename boost::asio::ip::tcp::acceptor; @@ -32,37 +39,30 @@ public: using OnNetworkErrorCallback = std::function<void(const boost::system::system_error & error)>; - Server(Networking & net) - : net(net) - , acceptor(net.getIoService()) - {} - - Server(const Server & other) = delete; - - Server & operator=(const Server & other) = delete; + static Ptr create(Networking & net) + { + return std::make_shared<Server<Service>>(PrivateTag{}, net); + } - Server(Server && other) + // Should not be used outside. + Server(PrivateTag, Networking & net) : net(net) - , onRequestReceivedCallback(std::move(other.onRequestReceivedCallback)) - , onNetworkErrorCallback(std::move(other.onNetworkErrorCallback)) - , acceptor(std::move(other.acceptor)) - , running(other.running) + , acceptor(net.getIoService()) {} - Server & operator=(Server && other) - { - net = other.net; - onRequestReceivedCallback = other.onRequestReceivedCallback; - onNetworkErrorCallback = other.onNetworkErrorCallback; - acceptor = other.acceptor; - running = other.running; - } - ~Server() { stop(); } + Server(const Server &) = delete; + + Server & operator=(const Server &) = delete; + + Server(Server &&) = delete; + + Server & operator=(Server &&) = delete; + void advertiseService(const Endpoint & bindEndpoint, OnRequestReceivedCallback onRequestReceivedCallback, OnNetworkErrorCallback onNetworkErrorCallback = defaultOnNetworkErrorCallback) @@ -100,37 +100,36 @@ private: OnRequestReceivedCallback onRequestReceivedCallback; OnNetworkErrorCallback onNetworkErrorCallback; Acceptor acceptor; - bool running{false}; + std::atomic<bool> running{false}; void accept() { + auto self = this->shared_from_this(); auto socket = std::make_shared<Socket>(net.getIoService()); - acceptor.async_accept(*socket, [this, socket](boost::system::error_code acceptError) + acceptor.async_accept(*socket, [self, socket](boost::system::error_code acceptError) { - networking::utils::SafeSocketCloser<Socket> closer(*socket); - - if (!running) + if (!self->running) return; if (acceptError) { - onNetworkErrorCallback(acceptError); + self->onNetworkErrorCallback(acceptError); return; } try { - auto request = message::receive<RequestMessage>(*socket, boost::posix_time::seconds(3)); - auto response = onRequestReceivedCallback(socket->remote_endpoint(), request); - message::send(*socket, response, boost::posix_time::seconds(3)); + auto request = message::receive<RequestMessage>(self->net, *socket, boost::posix_time::seconds(3)); + auto response = self->onRequestReceivedCallback(socket->remote_endpoint(), request); + message::send(self->net, *socket, response, boost::posix_time::seconds(3)); } catch (const boost::system::system_error & error) { - onNetworkErrorCallback(error); + self->onNetworkErrorCallback(error); } // The next accept event will be put on the event queue. - accept(); + self->accept(); }); } diff --git a/Protocol/Networking.h b/Protocol/Networking.h index c2c6b9d8..900c9f9f 100644 --- a/Protocol/Networking.h +++ b/Protocol/Networking.h @@ -40,6 +40,9 @@ public: boost::asio::io_service & getIoService() { return ioService; } + std::thread::id getNetworkingThreadId() const + { return thread.get_id(); } + private: boost::asio::io_service ioService; std::unique_ptr<boost::asio::io_service::work> work; diff --git a/Protocol/NetworkingTest.cpp b/Protocol/NetworkingTest.cpp index 67ed307d..946cd51c 100644 --- a/Protocol/NetworkingTest.cpp +++ b/Protocol/NetworkingTest.cpp @@ -15,10 +15,11 @@ using boost::asio::ip::tcp; void networking::test::testServices() { using namespace protocol; - Networking net; + Networking net1; + Networking net2; - service::Server<PlatoonCreateNetworkService> server{net}; - server.advertiseService( + auto server = service::Server<PlatoonCreateNetworkService>::create(net1); + server->advertiseService( "127.0.0.1", 10001, [](const tcp::endpoint & clientEndpoint, const PlatoonCreateNetworkMessage & requestMessage) -> PlatoonCreateNetworkMessage @@ -28,9 +29,9 @@ void networking::test::testServices() return PlatoonCreateNetworkMessage::acceptResponse(1, 42); }); - sleep(3); + sleep(1); - service::Client<PlatoonCreateNetworkService> client{net}; + service::Client<PlatoonCreateNetworkService> client{net2}; for (int i = 0; i < 5; i++) { auto response = client.call("127.0.0.1", @@ -46,31 +47,33 @@ void networking::test::testServices() void ::networking::test::testTcpClientTimeout() { using namespace protocol; - Networking net; + Networking net1; + Networking net2; const auto timeout = boost::posix_time::seconds(3); - service::Server<PlatoonCreateNetworkService> server{net}; - server.advertiseService( + auto server = service::Server<PlatoonCreateNetworkService>::create(net1); + server->advertiseService( "127.0.0.1", 10001, [](const tcp::endpoint & clientEndpoint, const PlatoonCreateNetworkMessage & requestMessage) -> PlatoonCreateNetworkMessage { - // Just sleep for 3 seconds. + // Just sleep for 5 seconds. sleep(5); return PlatoonCreateNetworkMessage::acceptResponse(1, 42); }); sleep(1); - service::Client<PlatoonCreateNetworkService> client{net}; + service::Client<PlatoonCreateNetworkService> client{net2}; auto startTime = boost::posix_time::microsec_clock::local_time(); try { - client.call("127.0.0.1", + auto response = client.call("127.0.0.1", 10001, PlatoonCreateNetworkMessage::followerRequest(2), timeout); + std::cout << "Response: " << response.getPlatoonId() << "\n"; std::cout << "FAILED!"; } catch (const std::exception & e) @@ -85,12 +88,14 @@ void ::networking::test::testTcpClientTimeout() void ::networking::test::testMultipleConnections() { using namespace protocol; - Networking net; + Networking net1; + Networking net2; + Networking net3; - service::Server<PlatoonCreateNetworkService> server1{net}; - service::Server<PlatoonCreateNetworkService> server2{net}; + auto server1 = service::Server<PlatoonCreateNetworkService>::create(net1); + auto server2 = service::Server<PlatoonCreateNetworkService>::create(net2); - server1.advertiseService( + server1->advertiseService( "127.0.0.1", 10001, [](const tcp::endpoint & clientEndpoint, const PlatoonCreateNetworkMessage & requestMessage) -> PlatoonCreateNetworkMessage @@ -98,7 +103,7 @@ void ::networking::test::testMultipleConnections() return PlatoonCreateNetworkMessage::acceptResponse(1, 42); }); - server2.advertiseService( + server2->advertiseService( "127.0.0.1", 10002, [](const tcp::endpoint & clientEndpoint, const PlatoonCreateNetworkMessage & requestMessage) -> PlatoonCreateNetworkMessage @@ -108,7 +113,7 @@ void ::networking::test::testMultipleConnections() sleep(1); - service::Client<PlatoonCreateNetworkService> client{net}; + service::Client<PlatoonCreateNetworkService> client{net3}; auto response = client.call("127.0.0.1", 10001, @@ -126,12 +131,13 @@ void ::networking::test::testMultipleConnections() void ::networking::test::testTcpServerChangePort() { using namespace protocol; - Networking net; + Networking net1; + Networking net2; - service::Client<PlatoonCreateNetworkService> client{net}; + service::Client<PlatoonCreateNetworkService> client{net1}; - service::Server<PlatoonCreateNetworkService> server{net}; - server.advertiseService( + auto server = service::Server<PlatoonCreateNetworkService>::create(net2); + server->advertiseService( "127.0.0.1", 10001, [](const tcp::endpoint & clientEndpoint, const PlatoonCreateNetworkMessage & requestMessage) -> PlatoonCreateNetworkMessage @@ -139,7 +145,7 @@ void ::networking::test::testTcpServerChangePort() return PlatoonCreateNetworkMessage::acceptResponse(1, 42); }); - server.advertiseService( + server->advertiseService( "127.0.0.1", 10002, [](const tcp::endpoint & clientEndpoint, const PlatoonCreateNetworkMessage & requestMessage) -> PlatoonCreateNetworkMessage diff --git a/Protocol/NetworkingUtils.h b/Protocol/NetworkingUtils.h index bef554a5..68b142e4 100644 --- a/Protocol/NetworkingUtils.h +++ b/Protocol/NetworkingUtils.h @@ -7,194 +7,159 @@ #include <boost/asio/ip/tcp.hpp> #include <boost/asio.hpp> +#include "Networking.h" namespace networking { namespace utils { -/** - * Connect a socket with a timeout. - * @tparam Socket - * @tparam EndpointIterator - * @param socket - * @param endpointIterator - * @param timeout - */ -template<typename Socket, typename EndpointIterator> -void connect(Socket & socket, - const EndpointIterator & endpointIterator, - const boost::posix_time::time_duration & timeout) +// Little wrapper for adding an enabled flag to the deadline_timer. +class DeadlineTimer : public std::enable_shared_from_this<DeadlineTimer> { - auto & ioService = socket.get_io_service(); +private: + struct PrivateTag + { + }; - boost::asio::deadline_timer deadline(ioService); - deadline.expires_from_now(timeout); - // This flags makes sure that after the async_connect handler executes, the deadline won't close the socket by accident. - bool deadlineEnabled = true; - // This tells the io_service to call the deadline handler when it either expires or was canceled. - deadline.async_wait( - [&socket, &deadlineEnabled](const boost::system::error_code & error) - { - // Check if our job was canceled. - if (error || !deadlineEnabled) - return; +public: + using Ptr = std::shared_ptr<DeadlineTimer>; + using TimeoutCallback = std::function<void()>; + + DeadlineTimer(PrivateTag, boost::asio::io_service & ioService) + : timer(ioService) + {} + + static Ptr create(boost::asio::io_service & ioService) + { + return std::make_shared<DeadlineTimer>(PrivateTag{}, ioService); + } + + DeadlineTimer(const DeadlineTimer &) = delete; + + DeadlineTimer & operator=(const DeadlineTimer &) = delete; + + DeadlineTimer(DeadlineTimer &&) = delete; - // Timeout expired: close the socket. - boost::system::error_code ignoredError; - socket.close(ignoredError); - }); + DeadlineTimer & operator=(DeadlineTimer &&) = delete; - // A 'would_block' connectError is guaranteed to never occur on an asynchronous operation. - boost::system::error_code connectError = boost::asio::error::would_block; + void asyncWait(const boost::posix_time::time_duration & timeout, TimeoutCallback callback) + { + auto self = shared_from_this(); + timer.expires_from_now(timeout); + timer.async_wait([self, callback](const boost::system::error_code & error) + { + if (error || !self->enabled) + return; + + callback(); + }); + } + + void disable() + { + enabled = false; + timer.cancel(); + } + +private: + boost::asio::deadline_timer timer; + std::atomic<bool> enabled{true}; +}; + +template<typename AsyncOperation, typename Socket, typename... Args> +void timedSocketOperation(Networking & net, + AsyncOperation asyncOperation, + Socket & socket, + const boost::posix_time::time_duration & timeout, + Args && ... args) +{ + auto & ioService = net.getIoService(); + + auto deadline = DeadlineTimer::create(ioService); + deadline->asyncWait(timeout, + [&socket]() + { + // Timeout expired: close the socket. + boost::system::error_code ignoredError; + socket.close(ignoredError); + }); + + // A 'would_block' socketError is guaranteed to never occur on an asynchronous operation. + boost::system::error_code socketError = boost::asio::error::would_block; // Run asynchronous connect. - boost::asio::async_connect(socket, endpointIterator, - [&connectError, &deadline, &deadlineEnabled](const boost::system::error_code & error, - EndpointIterator iter) - { - deadlineEnabled = false; - deadline.cancel(); - // update connectError variable - connectError = error; - }); - - ioService.reset(); - // We wait run the io_service until "something happens" with the socket. - do ioService.run_one(); while (connectError == boost::asio::error::would_block); + asyncOperation(socket, std::forward<Args>(args)..., + [&socketError, deadline](const boost::system::error_code & error, auto ...) + { + deadline->disable(); + // update socketError variable + socketError = error; + }); + + // This one is quite tricky: + // We want to wait until "something happens" with the socket, meaning socketError is assigned to some other + // error code. So during our waiting, we have to run ioService. But there are two cases to consider: + // We were called from an ioService handler and therefor from the ioService-thread: + // In this case we must invoke ioService.run_one() to ensure that further handlers can be invoked. + // Else we were not called from the ioService-thread: + // Since we must not call ioService.run_one() from a different thread (since we assume ioService.run() permanently + // runs already on the ioService-thread, we just wait until the error changed "magically". + if (std::this_thread::get_id() == net.getNetworkingThreadId()) + { + while (socketError == boost::asio::error::would_block && !ioService.stopped()) + { + ioService.run_one(); + } + } + else + { + while (socketError == boost::asio::error::would_block && !ioService.stopped()); + } // Determine whether a connection was successfully established. // Even though our deadline handler might have run to close the socket, the connect operation // might have notionally succeeded! - if (connectError || !socket.is_open()) + if (socketError || !socket.is_open()) { throw boost::system::system_error( - connectError ? connectError : boost::asio::error::operation_aborted); + socketError ? socketError : boost::asio::error::operation_aborted); } } -/** - * Writes data to a socket with timeout. - * @tparam SyncWriteStream - * @tparam MutableBufferSequence - * @param stream - * @param buffer - * @param timeout - */ +template<typename Socket, typename EndpointIterator> +void connect(Networking & net, + Socket & socket, + const EndpointIterator & endpointIterator, + const boost::posix_time::time_duration & timeout) +{ + // This lambda is used because we cannot pass boost::asio::async_connect directly as a function + // since it is a template. + auto asyncOperation = [](auto && ... args) + { boost::asio::async_connect(std::forward<decltype(args)>(args)...); }; + timedSocketOperation(net, asyncOperation, socket, timeout, endpointIterator); +} + template<typename SyncWriteStream, typename ConstBufferSequence> -void write(SyncWriteStream & stream, +void write(Networking & net, + SyncWriteStream & stream, const ConstBufferSequence & buffer, const boost::posix_time::time_duration & timeout) { - auto & ioService = stream.get_io_service(); - - boost::asio::deadline_timer deadline(ioService); - deadline.expires_from_now(timeout); - bool deadlineEnabled = true; - - deadline.async_wait( - [&stream, &deadlineEnabled](const boost::system::error_code & error) - { - if (error || !deadlineEnabled) - return; - - boost::system::error_code ignoredError; - stream.cancel(ignoredError); - }); - - boost::system::error_code writeError = boost::asio::error::would_block; - boost::asio::async_write(stream, buffer, - [&writeError, &deadline, &deadlineEnabled](const boost::system::error_code & error, - size_t bytesWritten) - { - deadlineEnabled = false; - deadline.cancel(); - writeError = error; - }); - - ioService.reset(); - do ioService.run_one(); while (writeError == boost::asio::error::would_block); - - if (writeError) - throw boost::system::system_error(writeError); + auto asyncOperation = [](auto && ... args) + { boost::asio::async_write(std::forward<decltype(args)>(args)...); }; + timedSocketOperation(net, asyncOperation, stream, timeout, buffer); }; -/** - * Reads data from socket with timeout. - * @tparam SyncReadStream - * @tparam MutableBufferSequence - * @param stream - * @param buffer - * @param timeout - */ template<typename SyncReadStream, typename MutableBufferSequence> -void read(SyncReadStream & stream, +void read(Networking & net, + SyncReadStream & stream, const MutableBufferSequence & buffer, const boost::posix_time::time_duration & timeout) { - auto & ioService = stream.get_io_service(); - - boost::asio::deadline_timer deadline(ioService); - deadline.expires_from_now(timeout); - bool deadlineEnabled = true; - - deadline.async_wait( - [&stream, &deadlineEnabled](const boost::system::error_code & error) - { - if (error || !deadlineEnabled) - return; - - boost::system::error_code ignoredError; - stream.cancel(ignoredError); - }); - - boost::system::error_code readError = boost::asio::error::would_block; - boost::asio::async_read(stream, buffer, - [&readError, &deadline, &deadlineEnabled](const boost::system::error_code & error, size_t) - { - deadlineEnabled = false; - deadline.cancel(); - readError = error; - }); - - ioService.reset(); - do ioService.run_one(); while (readError == boost::asio::error::would_block); - - if (readError) - throw boost::system::system_error(readError); -}; - -template<typename Socket> -void closeSafely(Socket & socket) -{ - if (!socket.is_open()) - return; - - boost::system::error_code ignoredError; - socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignoredError); - socket.close(ignoredError); -} - -template<typename Socket> -class SafeSocketCloser -{ -public: - SafeSocketCloser(Socket & socket) : socket(socket) - {} - - ~SafeSocketCloser() - { closeSafely(socket); } - - SafeSocketCloser(const SafeSocketCloser & other) = delete; - - SafeSocketCloser(SafeSocketCloser && other) = delete; - - SafeSocketCloser & operator=(const SafeSocketCloser & other) = delete; - - SafeSocketCloser & operator=(SafeSocketCloser && other) = delete; - -private: - Socket & socket; + auto asyncOperation = [](auto && ... args) + { boost::asio::async_read(std::forward<decltype(args)>(args)...); }; + timedSocketOperation(net, asyncOperation, stream, timeout, buffer); }; template< @@ -208,28 +173,24 @@ void asyncReceiveFrom(DatagramSocket & socket, ReceiveHandler receiveHandler, const boost::posix_time::time_duration & timeout) { + // TODO: FIX THIS!!! auto & ioService = socket.get_io_service(); - auto deadline = std::make_shared<boost::asio::deadline_timer>(ioService); - auto deadlineEnabled = std::make_shared<bool>(true); - deadline->expires_from_now(timeout); - - deadline->async_wait( - [socket, deadlineEnabled](const boost::system::error_code & error) - { - if (error || !*deadlineEnabled.get()) - return; - boost::system::error_code ignoredError; - socket.cancel(ignoredError); - }); + auto deadline = DeadlineTimer::create(ioService); + deadline->asyncWait(timeout, + [&socket]() + { + // Timeout expired: close the socket. + boost::system::error_code ignoredError; + socket.close(ignoredError); + }); socket.async_receive_from(buffer, endpoint, - [deadline, deadlineEnabled, receiveHandler]( + [deadline, receiveHandler]( const boost::system::error_code & receiveError, std::size_t bytesTransferred) { - *deadlineEnabled.get() = false; - deadline->cancel(); + deadline->disable(); receiveHandler(receiveError, bytesTransferred); } ); diff --git a/Protocol/Vehicle.cpp b/Protocol/Vehicle.cpp index 102b2e44..fca841b7 100644 --- a/Protocol/Vehicle.cpp +++ b/Protocol/Vehicle.cpp @@ -9,6 +9,14 @@ using namespace protocol; +Vehicle::Vehicle(networking::Networking & net, VehicleId vehicleId, Vehicle::Role role) + : net(net) + , vehicleId(vehicleId) + , role(role) + , platoonCreateServiceServer(PlatoonCreateServiceServer::create(net)) + , platoonCreateServiceClient(net) +{} + bool Vehicle::run() { switch (state.load()) @@ -81,8 +89,8 @@ void Vehicle::scanAvailableVehicles() // remove me out of the list (I cannot connect to myself) availableVehicleEndpoints.erase(std::remove_if(availableVehicleEndpoints.begin(), availableVehicleEndpoints.end(), - [this](VehicleEndpoint & endpoint) -> bool - { - return endpoint.getVehicleId() == vehicleId; - }), availableVehicleEndpoints.end()); + [this](VehicleEndpoint & endpoint) -> bool + { + return endpoint.getVehicleId() == vehicleId; + }), availableVehicleEndpoints.end()); } diff --git a/Protocol/Vehicle.h b/Protocol/Vehicle.h index 620e4f6a..84e49f97 100644 --- a/Protocol/Vehicle.h +++ b/Protocol/Vehicle.h @@ -19,6 +19,9 @@ namespace protocol class Vehicle { public: + using PlatoonCreateServiceServer = networking::service::Server<PlatoonCreateNetworkService>; + using PlatoonCreateServiceClient = networking::service::Client<PlatoonCreateNetworkService>; + enum class Role { LEADER, FOLLOWER @@ -44,13 +47,7 @@ protected: Vehicle( networking::Networking & net, VehicleId vehicleId, - Role role) - : net(net) - , vehicleId(vehicleId) - , role(role) - , platoonCreateServiceServer(net) - , platoonCreateServiceClient(net) - {} + Role role); enum class State { @@ -69,8 +66,8 @@ protected: std::mutex platoonConfigMutex; std::atomic<State> state{State::IDLE}; networking::Networking & net; - networking::service::Server<PlatoonCreateNetworkService> platoonCreateServiceServer; - networking::service::Client<PlatoonCreateNetworkService> platoonCreateServiceClient; + PlatoonCreateServiceServer::Ptr platoonCreateServiceServer; + PlatoonCreateServiceClient platoonCreateServiceClient; std::vector<VehicleEndpoint> availableVehicleEndpoints; virtual void doOnCreatePlatoon() = 0; -- GitLab