Alicia123
Alicia123

Reputation: 3

How to serialise classes for UDP discover and TCP connection using boost::asio

I trying to learn how to use boost::asio. C++ and ASYNC is very different to my normal programming practices.

I am trying to write a program that discovers a device using UDP and then establishes a TCP connection to it. Once the TCP connection is established the program stops UDP searching stops. Should the TCP connection disconnect or timeout the UDP search starts again.

I have watch many videos today including https://www.youtube.com/watch?v=7FQwAjELMek. I have loosely based my code on the shared pointer idiom discussed as that seems to be the closest I have gotten to a solution.

I have developed two classes.

To test my program - I start it, then I fake a UDP response using netcat and then let the TCP connection timeout by using a non existent IP address to try get the program to loop back into the search.

echo "hello" | nc -lu 0.0.0.0 9720

#include <memory>
#include <string>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>

class tcpQSYNC : public std::enable_shared_from_this<tcpQSYNC> {

public:
    tcpQSYNC(boost::asio::io_context &ioc, std::string hostname, unsigned int tcpPort) :
            m_socket(ioc, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
            m_timer(ioc)
    {
        boost::asio::ip::tcp::resolver resolver(ioc);
        boost::asio::ip::tcp::resolver::query query(hostname, std::to_string(tcpPort));
        boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
        m_remoteEndpoint = *endpoint_iterator;
    }

    ~tcpQSYNC() {
        std::cout << "tcpQSYNC destructor" << std::endl;
    }

    void run () {
        startConnection ();
        std::cout << "TCP Connection Started" << std::endl;
    }

    void startConnection() {
        m_socket.async_connect(m_remoteEndpoint,
                               [self = shared_from_this()](boost::system::error_code errorCode) {
                                   self->onConnectHandler(errorCode);
                               });
    }

    void onConnectHandler(const boost::system::error_code& error) {return;}
private:
    boost::asio::ip::tcp::socket m_socket;
    boost::asio::ip::tcp::endpoint m_remoteEndpoint;
    boost::asio::deadline_timer m_timer;
};

class udpFindQSYNC : public std::enable_shared_from_this<udpFindQSYNC> {
public:
    udpFindQSYNC(boost::asio::io_context &ioc, unsigned int udpPort) :
            m_socket(ioc, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0)),
            m_localEndpoint(boost::asio::ip::address_v4::broadcast(), udpPort),
            m_timer(ioc) {

        m_socket.set_option(boost::asio::ip::udp::socket::reuse_address(true));
        m_socket.set_option(boost::asio::socket_base::broadcast(true));
    }

    ~udpFindQSYNC() {
        std::cout << "udpFindQSYNC() destructor" << std::endl;
    }

    void run() {
        sendUDPBroadcast();
    }

    void sendUDPBroadcast() {
        std::array<uint8_t, 2> data = {{0, 0}};

        m_socket.async_send_to(boost::asio::buffer(data, 2), m_localEndpoint,
                               [self = shared_from_this()](boost::system::error_code errorCode, std::size_t bytes) {
                                   self->onBroadcastComplete(errorCode, bytes);
                               });
    }

    void onBroadcastComplete(const boost::system::error_code &errorCode, std::size_t bytes_transferred) {
        if (errorCode == boost::system::errc::success) {
            std::cout << "UDP Broadcast "<< bytes_transferred << " byte"<< ((bytes_transferred==1) ? "" : "s") << std::endl;
            queueRead();
            createTimer();
        } else {
            std::cout << __func__ << " (" << errorCode.message() << ")" << std::endl;
        }
    }

    void createTimer() {
        // 10 second retry timer
        m_timer.expires_from_now(boost::posix_time::milliseconds(10000));
        m_timer.async_wait(
                [self = shared_from_this()] (boost::system::error_code errorCode)
                {
                    self->onTimerExpiry(errorCode);
                });
    }

    void queueRead() {
        m_socket.async_receive_from(boost::asio::buffer(m_buffer), m_remoteEndpoint,
                                    [self = shared_from_this()](boost::system::error_code errorCode, std::size_t bytes) {
                                        self->onReceiveData(errorCode, bytes);
                                    });
    }

    void onTimerExpiry(const boost::system::error_code &errorCode) {
        if (errorCode == boost::system::errc::success) {
            std::cout << "UDP Timer Expired" << std::endl;
            // Timer has expired.   Cancel outstanding read operation and start again
            m_socket.cancel();
            sendUDPBroadcast();
        } else if (errorCode == boost::system::errc::operation_canceled){
            std::cout << "Timer Operation Cancelled " << std::endl;
        }
    }

    void onReceiveData(const boost::system::error_code &errorCode, std::size_t bytes_transferred) {
        // Read has completed. Cancel the timer.
        m_timer.cancel();
        if (errorCode == boost::system::errc::success) {
            std::cout << "UDP Received Data " << bytes_transferred << " byte" <<((bytes_transferred==1) ? " " : "s ") << getIPAddress() << std::endl;
        } else if (errorCode == boost::system::errc::operation_canceled) {
            std::cout << "UDP Read Operation Cancelled " << std::endl;
        }
    }

    std::string getIPAddress() {
        std::cout << "Called getIPAddress() " << m_remoteEndpoint.address().to_string() << std::endl;
        return m_remoteEndpoint.address().to_string();
    }
private:
    boost::asio::ip::udp::socket m_socket;
    boost::asio::ip::udp::endpoint m_localEndpoint;
    boost::asio::ip::udp::endpoint m_remoteEndpoint;
    boost::asio::deadline_timer m_timer;

    std::array<uint8_t, 32> m_buffer = {0};
};

int main() {
    boost::asio::io_context ioc;
    boost::asio::io_context::strand strand(ioc);

    int loop =0;
    while (loop < 2) {
        auto udp = std::make_shared<udpFindQSYNC>(ioc, 9720);
        udp->run();
        std::string remote = udp->getIPAddress();  // Should return 192.168.0.140 in my case.
        std::cout << "Main " << remote << std::endl;

        // I want to get the address returned from the udpFindQSYNC.
        // I have hard code to no existant IP to cause timeout

        std::string nonextisthostname("192.168.0.143");
        std::make_shared<tcpQSYNC>(ioc, nonextisthostname, 9760)->run();

        loop++;
        // Run the I/O service on the main thread
        ioc.run();

Things I cannot get my head around

  1. How I return the IP address from the udpFindQSYNC class for the tcpQSYNC class to connect too. As the udpFindQSYNC destructor has been called.

  2. How I use the io_context to run two separate classes sequentially in essentially an infinite loop.

I looked a strands, but cannot figure out how to use in my context. I always see the TCP connection run concurrently to the UDP

The log producted my program is:

UDP Broadcast 2 bytes
tcpQSYNC destructor
UDP Timer Expired
UDP Read Operation Cancelled 
UDP Broadcast 2 bytes
UDP Received Data 6 bytes Called getIPAddress() 192.168.0.140
192.168.0.140
Timer Operation Cancelled 
udpFindQSYNC() destructor      <- My class is detroyed
Called getIPAddress() 0.0.0.0   
Main 0.0.0.0                   <- Thus my result is wrong   
TCP Connection Started
tcpQSYNC destructor
udpFindQSYNC() destructor

Would someone be able to point me towards the best way to address the two problems I cannot figure out?

Upvotes: 0

Views: 413

Answers (1)

rafix07
rafix07

Reputation: 20949

Method getIPAddress can be called afer onReceiveData handler was called or within this method.

Your problem is that getIPAddress is called too early, m_remoteEndpoint is not filled yet, because udp->run() returns immediately and handler - onReceiveData was not called.

Possible solutions to your problem:

1) add some blocking mechanism in getIPAddress which blocks until onReceiveData was called, then getIPAddress can end and return m_remoteEndpoint address

2) you are calling getIPAddress from onReceiveData

First way can be achived for example by using condition_variable and isAddress flag. Inside getIPAddress you are calling wait on condition variable with predicate which checks if isAddress is set to true. In handler onReceiveData you set isAddress to true, and notify condition variable. The weakness of this approach is that in main you need to start additional thread (in background) in which ioc.run() works - to handle handlers. Without this main thread will be blocked on getIPAddress method.

In second way the main loop can be reduced to:

int loop =0;
while (loop < 2) 
{
    auto udp = std::make_shared<udpFindQSYNC>(ioc, 9720);
    udp->run();

    loop++;
    // Run the I/O service on the main thread
    ioc.run();
}

and I think this is what you want. You start first async operation in udp->run and the rest work is performed in handlers.

When tcpQSYNC is created? In onReceiveData because then you know the address of the other side which you want to connect to.

void onReceiveData(const boost::system::error_code &errorCode, std::size_t bytes_transferred) 
{
    m_timer.cancel();
    if (errorCode == boost::system::errc::success) {
        std::cout << "UDP Received Data " << bytes_transferred << " byte" <<((bytes_transferred==1) ? " " : "s ") << getIPAddress() << std::endl;
     // m_remoteEndpoint is filled here
    std::make_shared<tcpQSYNC>(ioc, m_remoteEndpoint.address().to_string(), 9760)->run();

    } else if (errorCode == boost::system::errc::operation_canceled) {
        std::cout << "UDP Read Operation Cancelled " << std::endl;
    }
}

This

std::array<uint8_t, 2> data = {{0, 0}};

m_socket.async_send_to(boost::asio::buffer(data, 2), m_localEndpoint,
          [self = shared_from_this()](boost::system::error_code errorCode, std::size_t bytes) {
                self->onBroadcastComplete(errorCode, bytes);
          });

is undefined behaviour. data is local. async_send_to returns immediately. boost::asio::buffer doesn't make a copy of passed buffer.

You can store data as data member of your class to ensure that buffer lives for the whole time while async_send_to is being performed. Or put it into shared_ptr and pass smart pointer to lambda by value - lifetime of data will be prolonged.


In tcpQSYNC, why are you passing endpoint to m_socket?

m_socket(ioc, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0))

this constructor will bind socket to the given endpoint. What for? You are a client, not server. You should pass only protocol:

m_socket(ioc,boost::asio::ip::tcp::v4()),

Upvotes: 1

Related Questions