From 73ea0e65d1916145a2dd8e4120333c8b67ed7014 Mon Sep 17 00:00:00 2001 From: Hoop77 <p.badenhoop@gmx.de> Date: Sat, 20 Jan 2018 01:13:55 +0100 Subject: [PATCH] implemented json messages and ability to add VehicleEndpoints by the user interface --- NetworkingLib/include/DatagramReceiver.h | 4 +- NetworkingLib/include/DatagramSender.h | 4 +- NetworkingLib/include/Message.h | 54 ++++---- NetworkingLib/include/ServiceClient.h | 19 +-- NetworkingLib/include/ServiceServer.h | 9 +- NetworkingLib/include/Socket.h | 14 +- NetworkingLib/test/Main.cpp | 2 + NetworkingLib/test/PlatoonMessage.h | 10 +- NetworkingLib/test/Test.cpp | 121 +++++++++------- NetworkingLib/test/Test.h | 35 +++++ Protocol/CMakeLists.txt | 2 +- Protocol/include/FollowerVehicle.h | 10 +- Protocol/include/LeaderVehicle.h | 9 +- Protocol/include/PlatoonMessage.h | 169 ++++++++--------------- Protocol/include/Protocol.h | 2 +- Protocol/include/Vehicle.h | 25 ++-- Protocol/include/VehicleEndpoint.h | 21 ++- Protocol/src/FollowerVehicle.cpp | 62 ++++----- Protocol/src/LeaderVehicle.cpp | 59 ++++---- Protocol/src/Vehicle.cpp | 79 ++++++----- Protocol/src/VehicleEndpoint.cpp | 14 ++ Protocol/test/ProtocolTest.cpp | 22 ++- 22 files changed, 400 insertions(+), 346 deletions(-) create mode 100644 Protocol/src/VehicleEndpoint.cpp diff --git a/NetworkingLib/include/DatagramReceiver.h b/NetworkingLib/include/DatagramReceiver.h index afec3cdf..e07a36f5 100644 --- a/NetworkingLib/include/DatagramReceiver.h +++ b/NetworkingLib/include/DatagramReceiver.h @@ -46,11 +46,11 @@ public: , buffer(maxMessageSize + Frame::HEADER_SIZE) {} - Message receive(std::string & host, std::uint16_t & port, const time::Duration & timeout) + void receive(Message & message, std::string & host, std::uint16_t & port, const time::Duration & timeout) { BusyLock busyLock{*this}; newSocket(); - return message::receiveDatagram<Message>(net, socket, buffer, host, port, timeout); + message::receiveDatagram<Message>(net, socket, buffer, message, host, port, timeout); } void asyncReceive(const time::Duration & timeout, const ReceiveHandler & handler) diff --git a/NetworkingLib/include/DatagramSender.h b/NetworkingLib/include/DatagramSender.h index c4c7c4c5..80fe3448 100644 --- a/NetworkingLib/include/DatagramSender.h +++ b/NetworkingLib/include/DatagramSender.h @@ -47,7 +47,7 @@ public: { BusyLock busyLock{*this}; openSocket(); - networking::message::sendDatagramTo(net, socket, message, ip, port, timeout); + networking::message::sendDatagram(net, socket, message, ip, port, timeout); } void asyncSend(const Message & message, @@ -60,7 +60,7 @@ public: auto state = std::make_shared<AsyncState>(self, handler); openSocket(); - networking::message::asyncSendDatagramTo( + networking::message::asyncSendDatagram( net, socket, message, ip, port, timeout, [state](const auto & error) { diff --git a/NetworkingLib/include/Message.h b/NetworkingLib/include/Message.h index e584759f..c6786056 100644 --- a/NetworkingLib/include/Message.h +++ b/NetworkingLib/include/Message.h @@ -40,8 +40,8 @@ struct Encoder; template<> struct Encoder<std::string> { - std::string operator()(const std::string & message) const - { return message; } + void operator()(const std::string & message, std::string & data) const + { data = message; } }; template<typename Message> @@ -50,8 +50,8 @@ struct Decoder; template<> struct Decoder<std::string> { - std::string operator()(const std::string & data) const - { return data; } + void operator()(std::string & message, const std::string & data) const + { message = data; } }; namespace internal @@ -62,7 +62,7 @@ bool encode(const Message & message, std::string & data) { try { - data = Encoder<Message>{}(message); + Encoder<Message>{}(message, data); return true; } catch (...) @@ -76,7 +76,7 @@ bool decode(const std::string & data, Message & message) { try { - message = Decoder<Message>{}(data); + Decoder<Message>{}(message, data); return true; } catch (...) @@ -93,7 +93,7 @@ void send(Networking & net, const Message & message, const time::Duration & timeout) { - std::string data{}; + std::string data; if (!internal::encode(message, data)) throw error::Encoding{}; networking::stream::write(net, socket, data, timeout); @@ -122,16 +122,15 @@ void asyncSend(Networking & net, }; template<typename Message> -Message receive(Networking & net, +void receive(Networking & net, boost::asio::ip::tcp::socket & socket, boost::asio::streambuf & buffer, + Message & message, const time::Duration & timeout) { auto data = networking::stream::read(net, socket, buffer, timeout); - Message message{}; if (!internal::decode(data, message)) throw error::Decoding{}; - return message; }; template<typename Message> @@ -145,7 +144,7 @@ void asyncReceive(Networking & net, net, socket, buffer, timeout, [handler](const auto & errorCode, auto & data) { - Message message{}; + Message message; if (!internal::decode(data, message)) { handler(error::codes::DECODING, message); @@ -156,12 +155,12 @@ void asyncReceive(Networking & net, }; template<typename Message> -void sendDatagramTo(Networking & net, - boost::asio::ip::udp::socket & socket, - const Message & message, - const std::string & host, - std::uint16_t port, - const time::Duration & timeout) +void sendDatagram(Networking & net, + boost::asio::ip::udp::socket & socket, + const Message & message, + const std::string & host, + std::uint16_t port, + const time::Duration & timeout) { std::string data{}; if (!internal::encode(message, data)) @@ -170,13 +169,13 @@ void sendDatagramTo(Networking & net, } template<typename Message> -void asyncSendDatagramTo(Networking & net, - boost::asio::ip::udp::socket & socket, - const Message & message, - const std::string & host, - std::uint16_t port, - const time::Duration & timeout, - const SendToHandler & handler) +void asyncSendDatagram(Networking & net, + boost::asio::ip::udp::socket & socket, + const Message & message, + const std::string & host, + std::uint16_t port, + const time::Duration & timeout, + const SendToHandler & handler) { auto data = std::make_shared<std::string>(); if (!internal::encode(message, *data)) @@ -194,18 +193,17 @@ void asyncSendDatagramTo(Networking & net, } template<typename Message> -Message receiveDatagram(Networking & net, +void receiveDatagram(Networking & net, boost::asio::ip::udp::socket & socket, std::vector<char> & buffer, + Message & message, std::string & host, std::uint16_t & port, const time::Duration & timeout) { auto data = networking::socket::receiveFrom(net, socket, buffer, host, port, timeout); - Message message{}; if (!internal::decode(data, message)) throw error::Decoding{}; - return message; } template<typename Message> @@ -219,7 +217,7 @@ void asyncReceiveDatagram(Networking & net, net, socket, buffer, timeout, [handler](auto error, auto & data, const auto & senderHost, auto senderPort) { - Message message{}; + Message message; if (!internal::decode(data, message)) { handler(error::codes::DECODING, message, senderHost, senderPort); diff --git a/NetworkingLib/include/ServiceClient.h b/NetworkingLib/include/ServiceClient.h index a7d6a712..d87bb16d 100644 --- a/NetworkingLib/include/ServiceClient.h +++ b/NetworkingLib/include/ServiceClient.h @@ -50,10 +50,11 @@ public: , maxMessageSize(maxMessageSize) {} - ResponseMessage call(const std::string & host, - std::uint16_t port, - const RequestMessage & request, - time::Duration timeout) + void call(const RequestMessage & request, + ResponseMessage & response, + const std::string & host, + std::uint16_t port, + time::Duration timeout) { BusyLock busyLock{*this}; // Close the socket on leaving. @@ -71,12 +72,12 @@ public: updateTimeout(timeout, startTime); // Receive the response. boost::asio::streambuf buffer{maxMessageSize + internal::Frame::HEADER_SIZE}; - return message::receive<ResponseMessage>(net, socket, buffer, timeout); + message::receive<ResponseMessage>(net, socket, buffer, response, timeout); } - void asyncCall(const std::string & host, + void asyncCall(const RequestMessage & request, + const std::string & host, std::uint16_t port, - const RequestMessage & request, const time::Duration & timeout, const CallHandler & handler) { @@ -95,7 +96,7 @@ public: { if (error) { - ResponseMessage noResponse{}; + ResponseMessage noResponse; state->handler(error, noResponse); return; } @@ -109,7 +110,7 @@ public: { if (error) { - ResponseMessage noResponse{}; + ResponseMessage noResponse; state->handler(error, noResponse); return; } diff --git a/NetworkingLib/include/ServiceServer.h b/NetworkingLib/include/ServiceServer.h index 764357c5..ddc138eb 100644 --- a/NetworkingLib/include/ServiceServer.h +++ b/NetworkingLib/include/ServiceServer.h @@ -29,9 +29,9 @@ public: using Endpoint = Resolver::Endpoint; - using RequestReceivedHandler = - std::function<ResponseMessage(const Endpoint & clientEndpoint, - RequestMessage & requestMessage)>; + using RequestReceivedHandler = std::function<void(const Endpoint & clientEndpoint, + RequestMessage & requestMessage, + ResponseMessage & response)>; static Ptr create(Networking & net, uint16_t bindingPort, std::size_t maxMessageSize = 512) { @@ -106,7 +106,8 @@ private: Endpoint clientEndpoint{state->socket.remote_endpoint().address().to_string(), state->socket.remote_endpoint().port()}; - auto response = state->requestReceivedHandler(clientEndpoint, request); + ResponseMessage response; + state->requestReceivedHandler(clientEndpoint, request, response); networking::message::asyncSend( state->self->net, state->socket, response, 5s, diff --git a/NetworkingLib/include/Socket.h b/NetworkingLib/include/Socket.h index eb7e172e..c61e0182 100644 --- a/NetworkingLib/include/Socket.h +++ b/NetworkingLib/include/Socket.h @@ -128,17 +128,17 @@ void sendTo(Networking & net, udp::endpoint endpoint{address::from_string(host), port}; using namespace networking::internal; - Frame frame{(const std::uint8_t *) sendData.c_str(), sendData.size()}; + Frame buffer{(const std::uint8_t *) sendData.c_str(), sendData.size()}; auto asyncOperation = [&socket](auto && ... args) { socket.async_send_to(std::forward<decltype(args)>(args)...); }; std::tuple<boost::system::error_code, std::size_t> result; closeable::timedOperation( - result, net, asyncOperation, socket, timeout, frame.getBuffers(), endpoint); + result, net, asyncOperation, socket, timeout, buffer.getBuffers(), endpoint); auto numBytesTransferred = std::get<1>(result); - if (numBytesTransferred < frame.getSize()) + if (numBytesTransferred < buffer.getSize()) throw error::FailedOperation{}; }; @@ -155,18 +155,18 @@ void asyncSendTo(Networking & net, udp::endpoint endpoint{address::from_string(host), port}; using namespace networking::internal; - auto frame = std::make_shared<Frame>((const std::uint8_t *) sendData.c_str(), sendData.size()); + auto buffer = std::make_shared<Frame>((const std::uint8_t *) sendData.c_str(), sendData.size()); auto asyncOperation = [&socket](auto && ... args) { socket.async_send_to(std::forward<decltype(args)>(args)...); }; closeable::timedAsyncOperation( net, asyncOperation, socket, timeout, - [handler, frame](const auto & networkingError, + [handler, buffer](const auto & networkingError, const auto & boostError, auto numBytesTransferred) { - if (numBytesTransferred < frame->getSize()) + if (numBytesTransferred < buffer->getSize()) { handler(error::codes::FAILED_OPERATION); return; @@ -174,7 +174,7 @@ void asyncSendTo(Networking & net, handler(networkingError); }, - frame->getBuffers(), endpoint); + buffer->getBuffers(), endpoint); }; template<typename DatagramSocket> diff --git a/NetworkingLib/test/Main.cpp b/NetworkingLib/test/Main.cpp index 9b58d1a3..c271a85f 100644 --- a/NetworkingLib/test/Main.cpp +++ b/NetworkingLib/test/Main.cpp @@ -38,5 +38,7 @@ int main(int argc, char ** argv) networking::test::testServiceClientMaxMessageSize(); std::cout << "\ntestDatagramReceiverMaxMessageSize\n\n"; networking::test::testDatagramReceiverMaxMessageSize(); + std::cout << "\ntestNonCopyableMessage\n\n"; + networking::test::testNonCopyableMessage(); return 0; } \ No newline at end of file diff --git a/NetworkingLib/test/PlatoonMessage.h b/NetworkingLib/test/PlatoonMessage.h index e052df37..051984bb 100644 --- a/NetworkingLib/test/PlatoonMessage.h +++ b/NetworkingLib/test/PlatoonMessage.h @@ -73,11 +73,11 @@ namespace message template<> struct Encoder<protocol::PlatoonMessage> { - std::string operator()(const protocol::PlatoonMessage & message) const + void operator()(const protocol::PlatoonMessage & message, std::string & data) const { using namespace protocol; - std::string data(9, '\0'); + data = std::string(9, '\0'); auto vehicleId = message.getVehicleId(); auto messageType = message.getMessageType(); @@ -92,15 +92,13 @@ struct Encoder<protocol::PlatoonMessage> data[6] = (std::uint8_t) ((platoonId & 0x0000ff00) >> 8); data[7] = (std::uint8_t) ((platoonId & 0x00ff0000) >> 16); data[8] = (std::uint8_t) ((platoonId & 0xff000000) >> 24); - - return data; } }; template<> struct Decoder<protocol::PlatoonMessage> { - protocol::PlatoonMessage operator()(const std::string & data) const + void operator()(protocol::PlatoonMessage & message, const std::string & data) const { using namespace protocol; @@ -120,7 +118,7 @@ struct Decoder<protocol::PlatoonMessage> platoonId += ((PlatoonId) data[7]) << 16; platoonId += ((PlatoonId) data[8]) << 24; - return PlatoonMessage(vehicleId, messageType, platoonId); + message = PlatoonMessage(vehicleId, messageType, platoonId); } }; diff --git a/NetworkingLib/test/Test.cpp b/NetworkingLib/test/Test.cpp index 0cb6f2a0..5a4a4332 100644 --- a/NetworkingLib/test/Test.cpp +++ b/NetworkingLib/test/Test.cpp @@ -30,15 +30,14 @@ void testSyncServices() auto server = service::Server<PlatoonService>::create(net1, 10001); server->advertiseService( - [](const auto & clientEndpoint, - auto & requestMessage) -> PlatoonMessage + [](const auto & clientEndpoint, auto & requestMessage, auto & responseMessage) { std::cout << "Request from " << (int) requestMessage.getVehicleId() << " with type: " << (int) requestMessage.getMessageType() << "\n"; - return PlatoonMessage::acceptResponse(1, 42); + responseMessage = PlatoonMessage::acceptResponse(1, 42); }); sleep(1); @@ -48,7 +47,8 @@ void testSyncServices() auto client = service::Client<PlatoonService>::create(net2); for (int i = 0; i < 5; i++) { - auto response = client->call("127.0.0.1", 10001, PlatoonMessage::followerRequest(2), 1s); + PlatoonMessage response; + client->call(PlatoonMessage::followerRequest(2), response, "127.0.0.1", 10001, 1s); std::cout << "Response from " << (int) response.getVehicleId() << " with type: " << (int) response.getMessageType() << " and platoonId: " << (int) response.getPlatoonId() << "\n"; @@ -68,15 +68,14 @@ void testAsyncServices() auto server = service::Server<PlatoonService>::create(net, 10001); server->advertiseService( - [](const auto & clientEndpoint, - auto & requestMessage) -> PlatoonMessage + [](const auto & clientEndpoint, auto & requestMessage, auto & responseMessage) { std::cout << "Request from " << (int) requestMessage.getVehicleId() << " with type: " << (int) requestMessage.getMessageType() << "\n"; - return PlatoonMessage::acceptResponse(1, 42); + responseMessage = PlatoonMessage::acceptResponse(1, 42); }); sleep(1); @@ -88,7 +87,7 @@ void testAsyncServices() for (int i = 0; i < 5; i++) { client->asyncCall( - "127.0.0.1", 10001, PlatoonMessage::followerRequest(2), 1s, + PlatoonMessage::followerRequest(2), "127.0.0.1", 10001, 1s, [&pending, &correct](const auto & error, auto & response) { if (error) @@ -124,12 +123,11 @@ void testTcpClientTimeout() auto server = service::Server<PlatoonService>::create(net1, 10001); server->advertiseService( - [](const auto & clientEndpoint, - auto & requestMessage) -> PlatoonMessage + [](const auto & clientEndpoint, auto & requestMessage, auto & responseMessage) { // Just sleep for 5 seconds. sleep(5); - return PlatoonMessage::acceptResponse(1, 42); + responseMessage = PlatoonMessage::acceptResponse(1, 42); }); sleep(1); @@ -138,7 +136,8 @@ void testTcpClientTimeout() auto startTime = boost::posix_time::microsec_clock::local_time(); try { - auto response = client->call("127.0.0.1", 10001, PlatoonMessage::followerRequest(2), timeout); + PlatoonMessage response; + client->call(PlatoonMessage::followerRequest(2), response, "127.0.0.1", 10001, timeout); std::cout << "Response: " << response.getPlatoonId() << "\n"; std::cout << "FAILED!"; } @@ -162,30 +161,30 @@ void testMultipleConnections() auto server2 = service::Server<PlatoonService>::create(net2, 10002); server1->advertiseService( - [](const auto & clientEndpoint, - auto & requestMessage) -> PlatoonMessage + [](const auto & clientEndpoint, auto & requestMessage, auto & responseMessage) { - return PlatoonMessage::acceptResponse(1, 42); + responseMessage = PlatoonMessage::acceptResponse(1, 42); }); server2->advertiseService( - [](const auto & clientEndpoint, - auto & requestMessage) -> PlatoonMessage + [](const auto & clientEndpoint, auto & requestMessage, auto & responseMessage) { - return PlatoonMessage::acceptResponse(2, 43); + responseMessage = PlatoonMessage::acceptResponse(2, 43); }); sleep(1); auto client = service::Client<PlatoonService>::create(net3); - auto response1 = client->call("127.0.0.1", 10001, PlatoonMessage::followerRequest(1), 5s); + PlatoonMessage response1, response2; + + client->call(PlatoonMessage::followerRequest(1), response1, "127.0.0.1", 10001, 5s); std::cout << "Response from " << response1.getVehicleId() << std::endl; - auto response2 = client->call("127.0.0.1", 10002, PlatoonMessage::followerRequest(2), 5s); + client->call(PlatoonMessage::followerRequest(2), response2, "127.0.0.1", 10002, 5s); std::cout << "Response from " << response2.getVehicleId() << std::endl; if (response1.getVehicleId() == 1 && response1.getPlatoonId() == 42 && - response1.getVehicleId() == 2 && response1.getPlatoonId() == 43) + response2.getVehicleId() == 2 && response2.getPlatoonId() == 43) std::cout << "SUCCESS!\n"; } @@ -197,12 +196,11 @@ void testStoppingServiceServer() auto server = service::Server<PlatoonService>::create(net1, 10001); - auto handler = [](const auto & clientEndpoint, - auto & requestMessage) -> PlatoonMessage + auto handler = [](const auto & clientEndpoint, auto & requestMessage, auto & responseMessage) { // Just sleep for 3 seconds. sleep(2); - return PlatoonMessage::acceptResponse(1, 42); + responseMessage = PlatoonMessage::acceptResponse(1, 42); }; server->advertiseService(handler); @@ -212,13 +210,15 @@ void testStoppingServiceServer() auto client = service::Client<PlatoonService>::create(net2); try { - client->call("127.0.0.1", 10001, PlatoonMessage::followerRequest(42), 1s); + PlatoonMessage response; + client->call(PlatoonMessage::followerRequest(42), response, "127.0.0.1", 10001, 1s); } catch (const error::Aborted & e) { server->stop(); server->advertiseService(handler); - auto response = client->call("127.0.0.1", 10001, PlatoonMessage::followerRequest(42), 5s); + PlatoonMessage response; + client->call(PlatoonMessage::followerRequest(42), response, "127.0.0.1", 10001, 5s); if (response.getMessageType() == messageTypes::ACCEPT_RESPONSE) std::cout << "SUCCESS!\n"; } @@ -237,10 +237,7 @@ void testAsyncDatagramReceiver() receiver->asyncReceive( 3s, - [&running](const auto & error, - auto & message, - const auto & senderHost, - auto senderPort) + [&running](const auto & error, auto & message, const auto & senderHost, auto senderPort) { if (!error) std::cout << "SUCCESS! Received message from: " << message.getVehicleId() << "\n"; @@ -296,11 +293,10 @@ void testServiceClientAsyncCallTimeout() auto server = service::Server<PlatoonService>::create(net1, 10001); server->advertiseService( - [](const auto & clientEndpoint, - auto & requestMessage) -> PlatoonMessage + [](const auto & clientEndpoint, auto & requestMessage, auto & responseMessage) { sleep(3); - return PlatoonMessage::acceptResponse(1, 42); + responseMessage = PlatoonMessage::acceptResponse(1, 42); }); sleep(1); @@ -309,8 +305,9 @@ void testServiceClientAsyncCallTimeout() std::atomic<bool> running{true}; + PlatoonMessage response; client->asyncCall( - "127.0.0.1", 10001, PlatoonMessage::followerRequest(1), 1s, + PlatoonMessage::followerRequest(1), "127.0.0.1", 10001, 1s, [&running](const auto & error, const auto & response) { if (error == error::codes::ABORTED) @@ -334,10 +331,7 @@ void testDatagramSenderAsyncSend() receiver->asyncReceive( 3s, - [&running](const auto & error, - auto & message, - const std::string & senderHost, - auto senderPort) + [&running](const auto & error, auto & message, const std::string & senderHost, auto senderPort) { if (error) std::cout << "FAILED! (receive error)\n"; @@ -410,9 +404,10 @@ void testStringMessageOverDatagram() std::thread receiverThread{ [receiver, &running] { + std::string message; std::string host; std::uint16_t port; - auto message = receiver->receive(host, port, 3s); + receiver->receive(message, host, port, 3s); std::cout << "received: host: " << host << " port: " << port << " message: " << message << "\n"; if (message == "Hello World!") std::cout << "SUCCESS!\n"; @@ -443,19 +438,20 @@ void testStringMessageOverService() [server, &running, &failed] { server->advertiseService( - [&running, &failed](const auto & endpoint, auto & request) -> std::string + [&running, &failed](const auto & endpoint, auto & request, auto & response) { std::cout << "Received request message: " << request << "\n"; if (request != "Ping") failed = true; running = false; - return std::string{"Pong"}; + response = std::string{"Pong"}; }); }}; sleep(1); - auto response = client->call("127.0.0.1", 10000, std::string{"Ping"}, 3s); + std::string response; + client->call(std::string{"Ping"}, response, "127.0.0.1", 10000, 3s); std::cout << "Received response message: " << response << "\n"; if (response != "Pong") failed = true; @@ -479,14 +475,14 @@ void testServiceServerMaxMessageSize() [](auto && ...) { std::cout << "FAILED! (This should not have been called!\n"; - return std::string{}; }); sleep(1); auto client = service::Client<StringService>::create(net, 200); try { - client->call("127.0.0.1", 10000, std::string(200, 'a'), 1s); + std::string response; + client->call(std::string(200, 'a'), response, "127.0.0.1", 10000, 1s); } catch (const error::Error &) { @@ -494,7 +490,7 @@ void testServiceServerMaxMessageSize() } client->asyncCall( - "127.0.0.1", 10000, std::string(200, 'a'), 1s, + std::string(200, 'a'), "127.0.0.1", 10000, 1s, [&syncCallError, &running](const auto & error, auto & message) { if (error == error::codes::FAILED_OPERATION && syncCallError) @@ -516,17 +512,18 @@ void testServiceClientMaxMessageSize() auto server = service::Server<StringService>::create(net, 10000, 200); server->advertiseService( - [&serverReceivedCount](auto && ...) + [&serverReceivedCount](const auto & endpoint, auto & request, auto & response) { serverReceivedCount++; - return std::string(200, 'a'); + response = std::string(200, 'a'); }); sleep(1); auto client = service::Client<StringService>::create(net, 100); try { - client->call("127.0.0.1", 10000, std::string{}, 1s); + std::string response; + client->call(std::string{}, response, "127.0.0.1", 10000, 1s); } catch (const error::Error &) { @@ -534,7 +531,7 @@ void testServiceClientMaxMessageSize() } client->asyncCall( - "127.0.0.1", 10000, std::string(100, 'a'), 1s, + std::string(100, 'a'), "127.0.0.1", 10000, 1s, [&syncCallError, &running, &serverReceivedCount](const auto & error, auto & message) { if (error == error::codes::FAILED_OPERATION && syncCallError && serverReceivedCount == 2) @@ -579,9 +576,10 @@ void testDatagramReceiverMaxMessageSize() { try { - std::string host{}; + std::string message; + std::string host; std::uint16_t port; - receiver->receive(host, port, 3s); + receiver->receive(message, host, port, 3s); } catch (const error::Error &) { @@ -599,6 +597,27 @@ void testDatagramReceiverMaxMessageSize() receiveThread.join(); } +void testNonCopyableMessage() +{ + // It should only compile to see if the only requirement to Message is to be Default-Constructable. + + try + { + Networking net; + auto sender = message::DatagramSender<NonCopyableMessage>::create(net); + auto receiver = message::DatagramReceiver<NonCopyableMessage>::create(net, 10000); + sender->send(NonCopyableMessage{}, "127.0.0.1", 10000, 0s); + NonCopyableMessage message; + std::string host; + std::uint16_t port; + receiver->receive(message, host, port, 0s); + } + catch (...) + {} + + std::cout << "SUCCESS!\n"; +} + } } diff --git a/NetworkingLib/test/Test.h b/NetworkingLib/test/Test.h index e519d005..7280615a 100644 --- a/NetworkingLib/test/Test.h +++ b/NetworkingLib/test/Test.h @@ -6,6 +6,7 @@ #define PROTOCOL_NETWORKTEST_H_H #include <string> +#include "../include/Message.h" namespace networking { @@ -19,6 +20,16 @@ public: using ResponseMessage = std::string; }; +class NonCopyableMessage +{ +public: + NonCopyableMessage() = default; + NonCopyableMessage(const NonCopyableMessage &) = delete; + NonCopyableMessage & operator=(const NonCopyableMessage &) = delete; + NonCopyableMessage(NonCopyableMessage &&) = delete; + NonCopyableMessage & operator=(NonCopyableMessage &&) = delete; +}; + void testSyncServices(); void testAsyncServices(); @@ -49,6 +60,30 @@ void testServiceClientMaxMessageSize(); void testDatagramReceiverMaxMessageSize(); +void testNonCopyableMessage(); + +} +} + +namespace networking +{ +namespace message +{ + +template<> +struct Encoder<networking::test::NonCopyableMessage> +{ + void operator()(const networking::test::NonCopyableMessage & message, std::string & data) const + {} +}; + +template<> +struct Decoder<networking::test::NonCopyableMessage> +{ + void operator()(networking::test::NonCopyableMessage & message, const std::string & data) const + {} +}; + } } diff --git a/Protocol/CMakeLists.txt b/Protocol/CMakeLists.txt index da7fb08a..105f5601 100644 --- a/Protocol/CMakeLists.txt +++ b/Protocol/CMakeLists.txt @@ -36,7 +36,7 @@ set(SOURCE_FILES src/LeaderVehicle.cpp src/FollowerVehicle.cpp src/Vehicle.cpp - ) + src/VehicleEndpoint.cpp) add_library(ProtocolLib ${SOURCE_FILES}) target_link_libraries(ProtocolLib NetworkingLib) diff --git a/Protocol/include/FollowerVehicle.h b/Protocol/include/FollowerVehicle.h index cc0fbbc5..0dbc8796 100644 --- a/Protocol/include/FollowerVehicle.h +++ b/Protocol/include/FollowerVehicle.h @@ -27,10 +27,10 @@ private: public: using Ptr = std::shared_ptr<FollowerVehicle>; - static Ptr create(networking::Networking & net, VehicleId vehicleId); + static Ptr create(networking::Networking & net, VehicleId ownVehicleId); // This should not be used outside. - FollowerVehicle(PrivateTag, networking::Networking & net, VehicleId vehicleId); + FollowerVehicle(PrivateTag, networking::Networking & net, VehicleId ownVehicleId); void stop() override; @@ -48,8 +48,9 @@ protected: void makePlatoonCreateRequest(const VehicleEndpoint & vehicleEndpoint) override; - PlatoonMessage makePlatoonCreateResponse( - const networking::Resolver::Endpoint & clientEndpoint, const PlatoonMessage & request) override; + void makePlatoonCreateResponse(const networking::Resolver::Endpoint & clientEndpoint, + const PlatoonMessage & request, + PlatoonMessage & response) override; Ptr shared_from_this(); @@ -83,5 +84,4 @@ private: } - #endif //PROTOCOL_FOLLOWERVEHICLE_H diff --git a/Protocol/include/LeaderVehicle.h b/Protocol/include/LeaderVehicle.h index 4a221e72..909acbbc 100644 --- a/Protocol/include/LeaderVehicle.h +++ b/Protocol/include/LeaderVehicle.h @@ -28,10 +28,10 @@ private: public: using Ptr = std::shared_ptr<LeaderVehicle>; - static Ptr create(networking::Networking & net, VehicleId vehicleId); + static Ptr create(networking::Networking & net, VehicleId ownVehicleId); // This should not be used outside. - LeaderVehicle(PrivateTag, networking::Networking & net, VehicleId vehicleId); + LeaderVehicle(PrivateTag, networking::Networking & net, VehicleId ownVehicleId); void stop() override; @@ -48,8 +48,9 @@ protected: void makePlatoonCreateRequest(const VehicleEndpoint & vehicleEndpoint) override; - PlatoonMessage makePlatoonCreateResponse( - const networking::Resolver::Endpoint & clientEndpoint, const PlatoonMessage & request) override; + void makePlatoonCreateResponse(const networking::Resolver::Endpoint & clientEndpoint, + const PlatoonMessage & request, + PlatoonMessage & response) override; Ptr shared_from_this(); diff --git a/Protocol/include/PlatoonMessage.h b/Protocol/include/PlatoonMessage.h index aedd4616..eb4777ae 100644 --- a/Protocol/include/PlatoonMessage.h +++ b/Protocol/include/PlatoonMessage.h @@ -8,6 +8,7 @@ #include <cstdint> #include <vector> #include "Protocol.h" +#include "json.hpp" #include <NetworkingLib/Message.h> namespace protocol @@ -15,32 +16,20 @@ namespace protocol namespace messageTypes { -constexpr MessageType LEADER_REQUEST = 0x01; -constexpr MessageType FOLLOWER_REQUEST = 0x02; -constexpr MessageType ACCEPT_RESPONSE = 0x03; -constexpr MessageType REJECT_RESPONSE = 0x04; -constexpr MessageType HEARTBEAT = 0x05; -constexpr MessageType BROADCAST = 0x06; -constexpr MessageType LEAVE_PLATOON = 0x07; +constexpr MessageType LV_BROADCAST = 0x00000001; +constexpr MessageType FV_HEARTBEAT = 0x00000002; +constexpr MessageType LV_REQUEST = 0x00000004; +constexpr MessageType FV_REQUEST = 0x00000008; +constexpr MessageType ACCEPT_RESPONSE = 0x00000010; +constexpr MessageType REJECT_RESPONSE = 0x00000020; +constexpr MessageType LEAVE_PLATOON = 0x00000040; } -union FloatConverter +struct PlatoonMessage { - float floatVal; - char bytes[4]; -}; - -class PlatoonMessage -{ -public: - static constexpr auto SIZE = sizeof(VehicleId) - + sizeof(MessageType) - + sizeof(PlatoonId) - + sizeof(PlatoonSpeed) - + sizeof(InnerPlatoonDistance); + static constexpr std::size_t MAX_SIZE = 512; PlatoonMessage() - : valid{false} {} PlatoonMessage( @@ -54,14 +43,13 @@ public: , platoonId(platoonId) , platoonSpeed(platoonSpeed) , innerPlatoonDistance(innerPlatoonDistance) - , valid{true} {} static PlatoonMessage followerRequest(VehicleId vehicleId) - { return PlatoonMessage(vehicleId, messageTypes::FOLLOWER_REQUEST, 0, 0, 0); } + { return PlatoonMessage(vehicleId, messageTypes::FV_REQUEST, 0, 0, 0); } static PlatoonMessage leaderRequest(VehicleId vehicleId, PlatoonId platoonId) - { return PlatoonMessage(vehicleId, messageTypes::LEADER_REQUEST, platoonId, 0, 0); } + { return PlatoonMessage(vehicleId, messageTypes::LV_REQUEST, platoonId, 0, 0); } static PlatoonMessage rejectResponse(VehicleId vehicleId) { return PlatoonMessage(vehicleId, messageTypes::REJECT_RESPONSE, 0, 0, 0); } @@ -73,41 +61,46 @@ public: PlatoonId platoonId, PlatoonSpeed platoonSpeed, InnerPlatoonDistance innerPlatoonDistance) - { return PlatoonMessage(vehicleId, messageTypes::BROADCAST, platoonId, platoonSpeed, innerPlatoonDistance); } + { return PlatoonMessage(vehicleId, messageTypes::LV_BROADCAST, platoonId, platoonSpeed, innerPlatoonDistance); } static PlatoonMessage heartbeatMessage(VehicleId vehicleId, PlatoonId platoonId) - { return PlatoonMessage(vehicleId, messageTypes::HEARTBEAT, platoonId, 0, 0); } + { return PlatoonMessage(vehicleId, messageTypes::FV_HEARTBEAT, platoonId, 0, 0); } static PlatoonMessage leaveMessage(VehicleId vehicleId, PlatoonId platoonId) { return PlatoonMessage(vehicleId, messageTypes::LEAVE_PLATOON, platoonId, 0, 0); } - MessageType getMessageType() const noexcept - { return messageType; } - - VehicleId getVehicleId() const noexcept - { return vehicleId; } - - PlatoonId getPlatoonId() const noexcept - { return platoonId; } - - PlatoonSpeed getPlatoonSpeed() const noexcept - { return platoonSpeed; } - - InnerPlatoonDistance getInnerPlatoonDistance() const noexcept - { return innerPlatoonDistance; } - - bool isValid() const noexcept - { return valid; } - -private: VehicleId vehicleId; MessageType messageType; PlatoonId platoonId; PlatoonSpeed platoonSpeed; InnerPlatoonDistance innerPlatoonDistance; +}; +} - bool valid; +namespace nlohmann +{ + +template<> +struct adl_serializer<protocol::PlatoonMessage> +{ + static void to_json(json & j, const protocol::PlatoonMessage & message) + { + j = json{{"vehicleId", message.vehicleId}, + {"platoonId", message.platoonId}, + {"platoonSpeed", message.platoonSpeed}, + {"innerPlatoonDistance", message.innerPlatoonDistance}}; + } + + static void from_json(const json & j, protocol::PlatoonMessage & message) + { + using namespace protocol; + message.vehicleId = j.at("vehicleId").get<VehicleId>(); + message.platoonId = j.at("platoonId").get<PlatoonId>(); + message.platoonSpeed = j.at("platoonSpeed").get<PlatoonSpeed>(); + message.innerPlatoonDistance = j.at("innerPlatoonDistance").get<InnerPlatoonDistance>(); + } }; + } namespace networking @@ -118,81 +111,39 @@ namespace message template<> struct Encoder<protocol::PlatoonMessage> { - std::string operator()(const protocol::PlatoonMessage & message) + void operator()(const protocol::PlatoonMessage & message, std::string & data) const { using namespace protocol; - auto vehicleId = message.getVehicleId(); - auto messageType = message.getMessageType(); - auto platoonId = message.getPlatoonId(); - auto platoonSpeed = message.getPlatoonSpeed(); - auto innerPlatoonDistance = message.getInnerPlatoonDistance(); - - std::string data(PlatoonMessage::SIZE, '\0'); - - data[0] = (std::uint8_t) (vehicleId & 0x000000ff); - data[1] = (std::uint8_t) ((vehicleId & 0x0000ff00) >> 8); - data[2] = (std::uint8_t) ((vehicleId & 0x00ff0000) >> 16); - data[3] = (std::uint8_t) ((vehicleId & 0xff000000) >> 24); - data[4] = (std::uint8_t) (messageType); - data[5] = (std::uint8_t) (platoonId & 0x000000ff); - data[6] = (std::uint8_t) ((platoonId & 0x0000ff00) >> 8); - data[7] = (std::uint8_t) ((platoonId & 0x00ff0000) >> 16); - data[8] = (std::uint8_t) ((platoonId & 0xff000000) >> 24); - FloatConverter converter; - converter.floatVal = platoonSpeed; - data[9] = converter.bytes[0]; - data[10] = converter.bytes[1]; - data[11] = converter.bytes[2]; - data[12] = converter.bytes[3]; - converter.floatVal = innerPlatoonDistance; - data[13] = converter.bytes[0]; - data[14] = converter.bytes[1]; - data[15] = converter.bytes[2]; - data[16] = converter.bytes[3]; - - return data; + auto messageType = message.messageType; + // 4 bytes for message type + data = std::string(4, '\0'); + data[0] = networking::utils::byte<0>(messageType); + data[1] = networking::utils::byte<1>(messageType); + data[2] = networking::utils::byte<2>(messageType); + data[3] = networking::utils::byte<3>(messageType); + + using json = nlohmann::json; + json j = message; + data.append(j.dump()); } }; template<> struct Decoder<protocol::PlatoonMessage> { - protocol::PlatoonMessage operator()(const std::string & data) + void operator()(protocol::PlatoonMessage & message, const std::string & data) const { using namespace protocol; - if (data.size() < PlatoonMessage::SIZE) - return PlatoonMessage{}; - - VehicleId vehicleId{0}; - MessageType messageType{0}; - PlatoonId platoonId{0}; - PlatoonSpeed platoonSpeed{0}; - InnerPlatoonDistance innerPlatoonDistance{0}; - - vehicleId += ((VehicleId) data[0]); - vehicleId += ((VehicleId) data[1]) << 8; - vehicleId += ((VehicleId) data[2]) << 16; - vehicleId += ((VehicleId) data[3]) << 24; - messageType += (MessageType) data[4]; - platoonId += ((PlatoonId) data[5]); - platoonId += ((PlatoonId) data[6]) << 8; - platoonId += ((PlatoonId) data[7]) << 16; - platoonId += ((PlatoonId) data[8]) << 24; - FloatConverter converter; - converter.bytes[0] = data[9]; - converter.bytes[1] = data[10]; - converter.bytes[2] = data[11]; - converter.bytes[3] = data[12]; - platoonSpeed = converter.floatVal; - converter.bytes[0] = data[13]; - converter.bytes[1] = data[14]; - converter.bytes[2] = data[15]; - converter.bytes[3] = data[16]; - innerPlatoonDistance = converter.floatVal; - - return PlatoonMessage{vehicleId, messageType, platoonId, platoonSpeed, innerPlatoonDistance}; + if (data.size() < sizeof(MessageType)) + throw 0; // Throw something to inform that we failed to decode the message. + + using json = nlohmann::json; + auto j = json::parse(data.begin() + sizeof(MessageType), data.end()); + message = j; + message.messageType = networking::utils::bytesToInt<sizeof(MessageType), MessageType>( + (const std::uint8_t *) data.c_str()); }; }; diff --git a/Protocol/include/Protocol.h b/Protocol/include/Protocol.h index c18eaa85..57642183 100644 --- a/Protocol/include/Protocol.h +++ b/Protocol/include/Protocol.h @@ -13,7 +13,7 @@ namespace protocol { using VehicleId = std::uint32_t; -using MessageType = std::uint8_t; +using MessageType = std::uint32_t; using PlatoonId = std::uint32_t; using PlatoonSpeed = float; using InnerPlatoonDistance = float; diff --git a/Protocol/include/Vehicle.h b/Protocol/include/Vehicle.h index cba65ef7..f169b55c 100644 --- a/Protocol/include/Vehicle.h +++ b/Protocol/include/Vehicle.h @@ -22,6 +22,9 @@ public: using Callback = std::function<void()>; + static constexpr std::uint16_t UDP_PORT = 10000; + static constexpr std::uint16_t TCP_PORT = 10000; + enum class Role { LEADER, FOLLOWER @@ -40,6 +43,11 @@ public: virtual void stop(); + void addVehicleEndpoint(VehicleId vehicleId, const std::string & host); + + VehicleEndpoint getOwnEndpoint() const + { return ownEndpoint; } + Role getRole() const noexcept { return role; } @@ -52,7 +60,7 @@ public: protected: Vehicle( networking::Networking & net, - VehicleId vehicleId, + VehicleId ownVehicleId, Role role); enum class State @@ -64,6 +72,7 @@ protected: static constexpr PlatoonConfig DEFAULT_PLATOON_CONFIG{0.0f, 0.0f}; + const VehicleEndpoint ownEndpoint; PlatoonId platoonId; std::atomic<PlatoonConfig> platoonConfig{DEFAULT_PLATOON_CONFIG}; State state{State::IDLE}; @@ -71,8 +80,8 @@ protected: networking::Networking & net; networking::service::Server<PlatoonService>::Ptr platoonCreateServiceServer; networking::service::Client<PlatoonService>::Ptr platoonCreateServiceClient; - std::vector<VehicleEndpoint> availableVehicleEndpoints; - std::vector<VehicleEndpoint>::iterator vehicleEndpointIter; + std::vector<VehicleEndpoint> vehicleEndpoints; + std::vector<VehicleEndpoint>::iterator vehicleEndpointsIter; Callback onRunningPlatoonCallback; Callback onLeavingPlatoonCallback; @@ -84,20 +93,18 @@ protected: virtual void makePlatoonCreateRequest(const VehicleEndpoint & vehicleEndpoint) = 0; - virtual PlatoonMessage makePlatoonCreateResponse( - const networking::Resolver::Endpoint & clientEndpoint, const PlatoonMessage & request) = 0; + virtual void makePlatoonCreateResponse(const networking::Resolver::Endpoint & clientEndpoint, + const PlatoonMessage & request, + PlatoonMessage & response) = 0; void makePlatoonCreateRequestToNextEndpoint(); - void scanAvailableVehicles(); - void setState(State newState); - VehicleEndpoint getOwnEndpoint() const; + VehicleEndpoint createVehicleEndpoint(VehicleId vehicleId, const std::string & host); private: Role role; - VehicleId vehicleId; }; } diff --git a/Protocol/include/VehicleEndpoint.h b/Protocol/include/VehicleEndpoint.h index 9ce6aedd..a2453424 100644 --- a/Protocol/include/VehicleEndpoint.h +++ b/Protocol/include/VehicleEndpoint.h @@ -16,7 +16,13 @@ namespace protocol class VehicleEndpoint { public: + friend bool operator==(const VehicleEndpoint & lhs, const VehicleEndpoint & rhs); + VehicleEndpoint() + : vehicleId(0) + , host("") + , tcpPort(0) + , udpPort(0) {} VehicleEndpoint(VehicleId vehicleId, const std::string & host, std::uint16_t tcpPort, std::uint16_t udpPort) @@ -26,20 +32,20 @@ public: , udpPort(udpPort) {} - VehicleId getVehicleId() const + VehicleId getVehicleId() const noexcept { return vehicleId; } - std::string getHost() const + std::string getHost() const noexcept { return host; } - std::uint16_t getTcpPort() const + std::uint16_t getTcpPort() const noexcept { return tcpPort; } - std::uint16_t getUdpPort() const + std::uint16_t getUdpPort() const noexcept { return udpPort; } - bool operator==(VehicleEndpoint & other) - { return vehicleId == other.vehicleId; } + explicit operator bool() const noexcept + { return vehicleId != 0; } private: VehicleId vehicleId; @@ -47,6 +53,9 @@ private: std::uint16_t tcpPort; std::uint16_t udpPort; }; + +bool operator==(const VehicleEndpoint & lhs, const VehicleEndpoint & rhs); + } #endif //PROTOCOL_ENPOINTINFO_H diff --git a/Protocol/src/FollowerVehicle.cpp b/Protocol/src/FollowerVehicle.cpp index 7a766e36..14722394 100644 --- a/Protocol/src/FollowerVehicle.cpp +++ b/Protocol/src/FollowerVehicle.cpp @@ -13,18 +13,18 @@ using namespace std::chrono_literals; constexpr networking::time::Duration FollowerVehicle::HEARTBEAT_INTERVAL; constexpr networking::time::Duration FollowerVehicle::BROADCAST_TIMEOUT; -FollowerVehicle::Ptr FollowerVehicle::create(networking::Networking & net, VehicleId vehicleId) +FollowerVehicle::Ptr FollowerVehicle::create(networking::Networking & net, VehicleId ownVehicleId) { - return std::make_shared<FollowerVehicle>(PrivateTag{}, net, vehicleId); + return std::make_shared<FollowerVehicle>(PrivateTag{}, net, ownVehicleId); } -FollowerVehicle::FollowerVehicle(PrivateTag, networking::Networking & net, VehicleId vehicleId) - : Vehicle(net, vehicleId, Role::FOLLOWER) +FollowerVehicle::FollowerVehicle(PrivateTag, networking::Networking & net, VehicleId ownVehicleId) + : Vehicle(net, ownVehicleId, Role::FOLLOWER) { heartbeatTimer = networking::time::Timer::create(net); heartbeatSender = networking::message::DatagramSender<PlatoonMessage>::create(net); - broadcastReceiver = networking::message::DatagramReceiver<PlatoonMessage>::create(net, - getOwnEndpoint().getUdpPort()); + broadcastReceiver = networking::message::DatagramReceiver<PlatoonMessage>::create( + net, getOwnEndpoint().getUdpPort(), PlatoonMessage::MAX_SIZE); } void FollowerVehicle::stop() @@ -86,9 +86,9 @@ void FollowerVehicle::makePlatoonCreateRequest(const VehicleEndpoint & vehicleEn // We'll send a request indicating that we want to be the follower. auto self = shared_from_this(); platoonCreateServiceClient->asyncCall( + PlatoonMessage::followerRequest(getOwnEndpoint().getVehicleId()), vehicleEndpoint.getHost(), vehicleEndpoint.getTcpPort(), - PlatoonMessage::followerRequest(getOwnEndpoint().getVehicleId()), 3s, [self, vehicleEndpoint](const auto & error, const auto & response) { @@ -98,40 +98,41 @@ void FollowerVehicle::makePlatoonCreateRequest(const VehicleEndpoint & vehicleEn if (error) return; - if (response.getVehicleId() != vehicleEndpoint.getVehicleId() || - response.getMessageType() != messageTypes::ACCEPT_RESPONSE) + if (response.vehicleId != vehicleEndpoint.getVehicleId() || + response.messageType != messageTypes::ACCEPT_RESPONSE) return; self->leader = vehicleEndpoint; - self->platoonId = response.getPlatoonId(); + self->platoonId = response.platoonId; self->doRunPlatoon(); }); } -PlatoonMessage FollowerVehicle::makePlatoonCreateResponse(const networking::Resolver::Endpoint & clientEndpoint, - const PlatoonMessage & request) +void FollowerVehicle::makePlatoonCreateResponse(const networking::Resolver::Endpoint & clientEndpoint, + const PlatoonMessage & request, + PlatoonMessage & response) { if (state != State::CREATING_PLATOON) - return PlatoonMessage::rejectResponse(getOwnEndpoint().getVehicleId()); + { + response = PlatoonMessage::rejectResponse(getOwnEndpoint().getVehicleId()); + return; + } + auto vehicleId = request.vehicleId; // Only consider this message if it is a leader request, // meaning that a leader wants us to join its platoon. - if (request.getMessageType() != messageTypes::LEADER_REQUEST) - return PlatoonMessage::rejectResponse(getOwnEndpoint().getVehicleId()); - - auto leaderId = request.getVehicleId(); - // TODO: change vehicle endpoint information! - leader = VehicleEndpoint{ - leaderId, - clientEndpoint.ip, - leaderId + 10000, - leaderId + 10010 - }; - platoonId = request.getPlatoonId(); + if (request.messageType != messageTypes::LV_REQUEST || vehicleId == 0) + { + response = PlatoonMessage::rejectResponse(getOwnEndpoint().getVehicleId()); + return; + } + + leader = createVehicleEndpoint(request.vehicleId, clientEndpoint.ip); + platoonId = request.platoonId; doRunPlatoon(); - return PlatoonMessage::acceptResponse(getOwnEndpoint().getVehicleId(), platoonId); + response = PlatoonMessage::acceptResponse(getOwnEndpoint().getVehicleId(), platoonId); } void FollowerVehicle::startHeartbeat() @@ -183,18 +184,17 @@ void FollowerVehicle::receiveBroadcast() return; } else if (error || - !message.isValid() || - message.getVehicleId() != self->leader.getVehicleId() || - message.getPlatoonId() != self->platoonId) + message.vehicleId != self->leader.getVehicleId() || + message.platoonId != self->platoonId) { // In case of any other error we just ignore the message and receive again. self->receiveBroadcast(); return; } - if (message.getMessageType() == messageTypes::BROADCAST) + if (message.messageType == messageTypes::LV_BROADCAST) { - PlatoonConfig newConfig{message.getPlatoonSpeed(), message.getInnerPlatoonDistance()}; + PlatoonConfig newConfig{message.platoonSpeed, message.innerPlatoonDistance}; if (self->platoonConfig.load() != newConfig) { self->platoonConfig = newConfig; diff --git a/Protocol/src/LeaderVehicle.cpp b/Protocol/src/LeaderVehicle.cpp index 0bb801de..a714c762 100644 --- a/Protocol/src/LeaderVehicle.cpp +++ b/Protocol/src/LeaderVehicle.cpp @@ -12,17 +12,17 @@ using namespace std::chrono_literals; constexpr networking::time::Duration LeaderVehicle::HEARTBEAT_TIMEOUT; constexpr networking::time::Duration LeaderVehicle::BROADCAST_INTERVAL; -LeaderVehicle::Ptr LeaderVehicle::create(networking::Networking & net, VehicleId vehicleId) +LeaderVehicle::Ptr LeaderVehicle::create(networking::Networking & net, VehicleId ownVehicleId) { - return std::make_shared<LeaderVehicle>(PrivateTag{}, net, vehicleId); + return std::make_shared<LeaderVehicle>(PrivateTag{}, net, ownVehicleId); } -LeaderVehicle::LeaderVehicle(PrivateTag, networking::Networking & net, VehicleId vehicleId) - : Vehicle(net, vehicleId, Role::LEADER) +LeaderVehicle::LeaderVehicle(PrivateTag, networking::Networking & net, VehicleId ownVehicleId) + : Vehicle(net, ownVehicleId, Role::LEADER) { broadcastTimer = networking::time::Timer::create(net); - heartbeatReceiver = networking::message::DatagramReceiver<PlatoonMessage>::create(net, - getOwnEndpoint().getUdpPort()); + heartbeatReceiver = networking::message::DatagramReceiver<PlatoonMessage>::create( + net, getOwnEndpoint().getUdpPort(), PlatoonMessage::MAX_SIZE); } void LeaderVehicle::stop() @@ -91,9 +91,9 @@ void LeaderVehicle::makePlatoonCreateRequest(const VehicleEndpoint & vehicleEndp // We'll send a request indicating that we want to be the leader. auto self = shared_from_this(); platoonCreateServiceClient->asyncCall( + PlatoonMessage::leaderRequest(getOwnEndpoint().getVehicleId(), platoonId), vehicleEndpoint.getHost(), vehicleEndpoint.getTcpPort(), - PlatoonMessage::leaderRequest(getOwnEndpoint().getVehicleId(), platoonId), 3s, [self, vehicleEndpoint](const auto & error, const auto & response) { @@ -104,9 +104,9 @@ void LeaderVehicle::makePlatoonCreateRequest(const VehicleEndpoint & vehicleEndp if (error) return; - if (response.getVehicleId() != vehicleEndpoint.getVehicleId() || - response.getMessageType() != messageTypes::ACCEPT_RESPONSE || - response.getPlatoonId() != self->platoonId) + if (response.vehicleId != vehicleEndpoint.getVehicleId() || + response.messageType != messageTypes::ACCEPT_RESPONSE || + response.platoonId != self->platoonId) return; self->addFollower(vehicleEndpoint); @@ -115,31 +115,31 @@ void LeaderVehicle::makePlatoonCreateRequest(const VehicleEndpoint & vehicleEndp }); } -PlatoonMessage LeaderVehicle::makePlatoonCreateResponse(const networking::Resolver::Endpoint & clientEndpoint, - const PlatoonMessage & request) +void LeaderVehicle::makePlatoonCreateResponse(const networking::Resolver::Endpoint & clientEndpoint, + const PlatoonMessage & request, + PlatoonMessage & response) { if (state != State::CREATING_PLATOON && state != State::RUNNING_PLATOON) - return PlatoonMessage::rejectResponse(getOwnEndpoint().getVehicleId()); + { + response = PlatoonMessage::rejectResponse(getOwnEndpoint().getVehicleId()); + return; + } + auto vehicleId = request.vehicleId; // Only consider this message if it is a follower request, // meaning that a follower wants us to be its leader. - if (request.getMessageType() != messageTypes::FOLLOWER_REQUEST) - return PlatoonMessage::rejectResponse(getOwnEndpoint().getVehicleId()); - - auto followerId = request.getVehicleId(); - // TODO: change vehicle endpoint information! - VehicleEndpoint followerEndpoint{ - followerId, - clientEndpoint.ip, - followerId + 10000, - followerId + 10010}; + if (request.messageType != messageTypes::FV_REQUEST || vehicleId == 0) + { + response = PlatoonMessage::rejectResponse(getOwnEndpoint().getVehicleId()); + return; + } - addFollower(followerEndpoint); + addFollower(createVehicleEndpoint(request.vehicleId, clientEndpoint.ip)); doRunPlatoon(); - return PlatoonMessage::acceptResponse(getOwnEndpoint().getVehicleId(), platoonId); + response = PlatoonMessage::acceptResponse(getOwnEndpoint().getVehicleId(), platoonId); } bool LeaderVehicle::findFollower(VehicleId id) @@ -212,16 +212,15 @@ void LeaderVehicle::receiveHeartbeat() if (self->state != State::RUNNING_PLATOON) return; - auto followerId = message.getVehicleId(); + auto followerId = message.vehicleId; if (!error && - message.isValid() && self->findFollower(followerId) && - message.getPlatoonId() == self->platoonId) + message.platoonId == self->platoonId) { auto & follower = self->followers[followerId]; - switch (message.getMessageType()) + switch (message.messageType) { case messageTypes::LEAVE_PLATOON: self->removeFollower(follower); @@ -230,7 +229,7 @@ void LeaderVehicle::receiveHeartbeat() self->doLeavePlatoon(); break; - case messageTypes::HEARTBEAT: + case messageTypes::FV_HEARTBEAT: // Reset the timer. self->stopHeartbeatTimer(follower); self->startHeartbeatTimer(follower); diff --git a/Protocol/src/Vehicle.cpp b/Protocol/src/Vehicle.cpp index 7061a638..76e23c0b 100644 --- a/Protocol/src/Vehicle.cpp +++ b/Protocol/src/Vehicle.cpp @@ -10,16 +10,34 @@ using namespace protocol; +constexpr std::uint16_t Vehicle::UDP_PORT; +constexpr std::uint16_t Vehicle::TCP_PORT; constexpr PlatoonConfig Vehicle::DEFAULT_PLATOON_CONFIG; -Vehicle::Vehicle(networking::Networking & net, VehicleId vehicleId, Vehicle::Role role) +Vehicle::Vehicle(networking::Networking & net, + VehicleId ownVehicleId, + Vehicle::Role role) : net(net) - , vehicleId(vehicleId) , role(role) + , ownEndpoint(createVehicleEndpoint(ownVehicleId, "")) { - platoonCreateServiceServer = networking::service::Server<PlatoonService>::create(net, - getOwnEndpoint().getTcpPort()); - platoonCreateServiceClient = networking::service::Client<PlatoonService>::create(net); + platoonCreateServiceServer = networking::service::Server<PlatoonService>::create( + net, ownEndpoint.getTcpPort(), PlatoonMessage::MAX_SIZE); + platoonCreateServiceClient = networking::service::Client<PlatoonService>::create( + net, PlatoonMessage::MAX_SIZE); +} + +void Vehicle::addVehicleEndpoint(VehicleId vehicleId, const std::string & host) +{ + if (vehicleId == 0) + throw std::invalid_argument{"Vehicle id must not be 0!"}; + + auto endpoint = createVehicleEndpoint(vehicleId, host); + + auto self = shared_from_this(); + net.callLater( + [self, endpoint] + { self->vehicleEndpoints.push_back(endpoint); }); } bool Vehicle::isIdle() @@ -61,12 +79,11 @@ void Vehicle::doCreatePlatoon() { auto self = shared_from_this(); platoonCreateServiceServer->advertiseService( - [self](const auto & clientEndpoint, const auto & requestMessage) -> PlatoonMessage - { return self->makePlatoonCreateResponse(clientEndpoint, requestMessage); }); + [self](const auto & clientEndpoint, const auto & requestMessage, auto & responseMessage) + { self->makePlatoonCreateResponse(clientEndpoint, requestMessage, responseMessage); }); // We cycle through every available endpoint and try to make a create-platoon-request. - scanAvailableVehicles(); - vehicleEndpointIter = availableVehicleEndpoints.begin(); + vehicleEndpointsIter = vehicleEndpoints.begin(); makePlatoonCreateRequestToNextEndpoint(); } @@ -75,11 +92,12 @@ void Vehicle::makePlatoonCreateRequestToNextEndpoint() if (state != State::CREATING_PLATOON) return; - if (!platoonCreateServiceClient->isCalling()) + if (!platoonCreateServiceClient->isCalling() && + vehicleEndpointsIter != vehicleEndpoints.end()) // in case of an empty vector of vehicle endpoints { - auto vehicleEndpoint = *vehicleEndpointIter; + auto vehicleEndpoint = *vehicleEndpointsIter; makePlatoonCreateRequest(vehicleEndpoint); - utils::cycle(vehicleEndpointIter, availableVehicleEndpoints); + utils::cycle(vehicleEndpointsIter, vehicleEndpoints); } auto self = shared_from_this(); @@ -88,36 +106,27 @@ void Vehicle::makePlatoonCreateRequestToNextEndpoint() { self->makePlatoonCreateRequestToNextEndpoint(); }); } -void Vehicle::scanAvailableVehicles() -{ - // TODO: change vehicle endpoint information! - availableVehicleEndpoints = { - VehicleEndpoint{1, "127.0.0.1", 10001, 10011}, - VehicleEndpoint{2, "127.0.0.1", 10002, 10022}, - VehicleEndpoint{3, "127.0.0.1", 10003, 10033}, - }; - - // remove me out of the list (I cannot connect to myself) - availableVehicleEndpoints.erase(std::remove_if( - availableVehicleEndpoints.begin(), availableVehicleEndpoints.end(), - [this](VehicleEndpoint & endpoint) -> bool - { - return endpoint.getVehicleId() == vehicleId; - }), availableVehicleEndpoints.end()); -} - void Vehicle::setState(Vehicle::State newState) { std::lock_guard<std::mutex> lock(stateMutex); state = newState; } -VehicleEndpoint Vehicle::getOwnEndpoint() const +VehicleEndpoint Vehicle::createVehicleEndpoint(VehicleId vehicleId, const std::string & host) { - // TODO: How to get this in real usage??? + if (vehicleId == 0) + throw std::invalid_argument{"Vehicle id must not be 0!"}; + return VehicleEndpoint{ vehicleId, - "127.0.0.1", - vehicleId + 10000, - vehicleId + 10010}; + host, +#ifdef DEBUG + // To test this locally, we must ensure that each vehicle has a different port. + TCP_PORT + vehicleId, + UDP_PORT + vehicleId +#else + TCP_PORT, + UDP_PORT +#endif + }; } diff --git a/Protocol/src/VehicleEndpoint.cpp b/Protocol/src/VehicleEndpoint.cpp new file mode 100644 index 00000000..38db0332 --- /dev/null +++ b/Protocol/src/VehicleEndpoint.cpp @@ -0,0 +1,14 @@ +// +// Created by philipp on 20.01.18. +// + +#include "../include/VehicleEndpoint.h" + +namespace protocol +{ + +bool operator==(const VehicleEndpoint & lhs, const VehicleEndpoint & rhs) +{ return lhs.vehicleId == rhs.vehicleId; } + +} + diff --git a/Protocol/test/ProtocolTest.cpp b/Protocol/test/ProtocolTest.cpp index 3701724e..fb43e8ec 100644 --- a/Protocol/test/ProtocolTest.cpp +++ b/Protocol/test/ProtocolTest.cpp @@ -11,13 +11,23 @@ using namespace protocol; void test::testBasicSetup() { - networking::Networking net; + networking::Networking net1; + networking::Networking net2; + networking::Networking net3; + using namespace protocol; + + auto leader = LeaderVehicle::create(net1, 1); + auto follower1 = FollowerVehicle::create(net2, 2); + auto follower2 = FollowerVehicle::create(net3, 3); - auto leader = protocol::LeaderVehicle::create(net, 1); - auto follower1 = protocol::FollowerVehicle::create(net, 2); - auto follower2 = protocol::FollowerVehicle::create(net, 3); + leader->addVehicleEndpoint(2, "127.0.0.1"); + leader->addVehicleEndpoint(3, "127.0.0.1"); + follower1->addVehicleEndpoint(1, "127.0.0.1"); + follower1->addVehicleEndpoint(3, "127.0.0.1"); + follower2->addVehicleEndpoint(1, "127.0.0.1"); + follower2->addVehicleEndpoint(2, "127.0.0.1"); - leader->setPlatoonConfig(protocol::PlatoonConfig{10.0f, 20.0f}); + leader->setPlatoonConfig(PlatoonConfig{10.0f, 20.0f}); int configCount1 = 0; follower1->setOnPlatoonConfigUpdatedCallback( @@ -60,7 +70,7 @@ void test::testBasicSetup() sleep(1); - leader->setPlatoonConfig(protocol::PlatoonConfig{5.0f, 10.0f}); + leader->setPlatoonConfig(PlatoonConfig{5.0f, 10.0f}); sleep(3); -- GitLab