Reputation: 3
I trying to learn how to use boost::asio. C++ and ASYNC is very different to my normal programming practices.
I am trying to write a program that discovers a device using UDP and then establishes a TCP connection to it. Once the TCP connection is established the program stops UDP searching stops. Should the TCP connection disconnect or timeout the UDP search starts again.
I have watch many videos today including https://www.youtube.com/watch?v=7FQwAjELMek. I have loosely based my code on the shared pointer idiom discussed as that seems to be the closest I have gotten to a solution.
I have developed two classes.
udpFindQSYNC that I can use to search for the device using UDP/
tcpQSYNC that I can use to establish a connection to the device using TCP.
To test my program - I start it, then I fake a UDP response using netcat and then let the TCP connection timeout by using a non existent IP address to try get the program to loop back into the search.
echo "hello" | nc -lu 0.0.0.0 9720
#include <memory>
#include <string>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
class tcpQSYNC : public std::enable_shared_from_this<tcpQSYNC> {
public:
tcpQSYNC(boost::asio::io_context &ioc, std::string hostname, unsigned int tcpPort) :
m_socket(ioc, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
m_timer(ioc)
{
boost::asio::ip::tcp::resolver resolver(ioc);
boost::asio::ip::tcp::resolver::query query(hostname, std::to_string(tcpPort));
boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
m_remoteEndpoint = *endpoint_iterator;
}
~tcpQSYNC() {
std::cout << "tcpQSYNC destructor" << std::endl;
}
void run () {
startConnection ();
std::cout << "TCP Connection Started" << std::endl;
}
void startConnection() {
m_socket.async_connect(m_remoteEndpoint,
[self = shared_from_this()](boost::system::error_code errorCode) {
self->onConnectHandler(errorCode);
});
}
void onConnectHandler(const boost::system::error_code& error) {return;}
private:
boost::asio::ip::tcp::socket m_socket;
boost::asio::ip::tcp::endpoint m_remoteEndpoint;
boost::asio::deadline_timer m_timer;
};
class udpFindQSYNC : public std::enable_shared_from_this<udpFindQSYNC> {
public:
udpFindQSYNC(boost::asio::io_context &ioc, unsigned int udpPort) :
m_socket(ioc, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0)),
m_localEndpoint(boost::asio::ip::address_v4::broadcast(), udpPort),
m_timer(ioc) {
m_socket.set_option(boost::asio::ip::udp::socket::reuse_address(true));
m_socket.set_option(boost::asio::socket_base::broadcast(true));
}
~udpFindQSYNC() {
std::cout << "udpFindQSYNC() destructor" << std::endl;
}
void run() {
sendUDPBroadcast();
}
void sendUDPBroadcast() {
std::array<uint8_t, 2> data = {{0, 0}};
m_socket.async_send_to(boost::asio::buffer(data, 2), m_localEndpoint,
[self = shared_from_this()](boost::system::error_code errorCode, std::size_t bytes) {
self->onBroadcastComplete(errorCode, bytes);
});
}
void onBroadcastComplete(const boost::system::error_code &errorCode, std::size_t bytes_transferred) {
if (errorCode == boost::system::errc::success) {
std::cout << "UDP Broadcast "<< bytes_transferred << " byte"<< ((bytes_transferred==1) ? "" : "s") << std::endl;
queueRead();
createTimer();
} else {
std::cout << __func__ << " (" << errorCode.message() << ")" << std::endl;
}
}
void createTimer() {
// 10 second retry timer
m_timer.expires_from_now(boost::posix_time::milliseconds(10000));
m_timer.async_wait(
[self = shared_from_this()] (boost::system::error_code errorCode)
{
self->onTimerExpiry(errorCode);
});
}
void queueRead() {
m_socket.async_receive_from(boost::asio::buffer(m_buffer), m_remoteEndpoint,
[self = shared_from_this()](boost::system::error_code errorCode, std::size_t bytes) {
self->onReceiveData(errorCode, bytes);
});
}
void onTimerExpiry(const boost::system::error_code &errorCode) {
if (errorCode == boost::system::errc::success) {
std::cout << "UDP Timer Expired" << std::endl;
// Timer has expired. Cancel outstanding read operation and start again
m_socket.cancel();
sendUDPBroadcast();
} else if (errorCode == boost::system::errc::operation_canceled){
std::cout << "Timer Operation Cancelled " << std::endl;
}
}
void onReceiveData(const boost::system::error_code &errorCode, std::size_t bytes_transferred) {
// Read has completed. Cancel the timer.
m_timer.cancel();
if (errorCode == boost::system::errc::success) {
std::cout << "UDP Received Data " << bytes_transferred << " byte" <<((bytes_transferred==1) ? " " : "s ") << getIPAddress() << std::endl;
} else if (errorCode == boost::system::errc::operation_canceled) {
std::cout << "UDP Read Operation Cancelled " << std::endl;
}
}
std::string getIPAddress() {
std::cout << "Called getIPAddress() " << m_remoteEndpoint.address().to_string() << std::endl;
return m_remoteEndpoint.address().to_string();
}
private:
boost::asio::ip::udp::socket m_socket;
boost::asio::ip::udp::endpoint m_localEndpoint;
boost::asio::ip::udp::endpoint m_remoteEndpoint;
boost::asio::deadline_timer m_timer;
std::array<uint8_t, 32> m_buffer = {0};
};
int main() {
boost::asio::io_context ioc;
boost::asio::io_context::strand strand(ioc);
int loop =0;
while (loop < 2) {
auto udp = std::make_shared<udpFindQSYNC>(ioc, 9720);
udp->run();
std::string remote = udp->getIPAddress(); // Should return 192.168.0.140 in my case.
std::cout << "Main " << remote << std::endl;
// I want to get the address returned from the udpFindQSYNC.
// I have hard code to no existant IP to cause timeout
std::string nonextisthostname("192.168.0.143");
std::make_shared<tcpQSYNC>(ioc, nonextisthostname, 9760)->run();
loop++;
// Run the I/O service on the main thread
ioc.run();
Things I cannot get my head around
How I return the IP address from the udpFindQSYNC class for the tcpQSYNC class to connect too. As the udpFindQSYNC destructor has been called.
How I use the io_context to run two separate classes sequentially in essentially an infinite loop.
I looked a strands, but cannot figure out how to use in my context. I always see the TCP connection run concurrently to the UDP
The log producted my program is:
UDP Broadcast 2 bytes
tcpQSYNC destructor
UDP Timer Expired
UDP Read Operation Cancelled
UDP Broadcast 2 bytes
UDP Received Data 6 bytes Called getIPAddress() 192.168.0.140
192.168.0.140
Timer Operation Cancelled
udpFindQSYNC() destructor <- My class is detroyed
Called getIPAddress() 0.0.0.0
Main 0.0.0.0 <- Thus my result is wrong
TCP Connection Started
tcpQSYNC destructor
udpFindQSYNC() destructor
Would someone be able to point me towards the best way to address the two problems I cannot figure out?
Upvotes: 0
Views: 413
Reputation: 20949
Method getIPAddress
can be called afer onReceiveData
handler was called or within this method.
Your problem is that getIPAddress
is called too early, m_remoteEndpoint
is not filled yet, because udp->run()
returns immediately and handler - onReceiveData
was not called.
Possible solutions to your problem:
1) add some blocking mechanism in getIPAddress
which blocks until onReceiveData
was called, then getIPAddress
can end and return m_remoteEndpoint
address
2) you are calling getIPAddress
from onReceiveData
First way can be achived for example by using condition_variable
and isAddress
flag. Inside getIPAddress
you are calling wait
on condition variable with predicate which checks if isAddress
is set to true. In handler onReceiveData
you set isAddress
to true, and notify condition variable. The weakness of this approach is that in main
you need to start additional thread (in background) in which ioc.run()
works - to handle handlers. Without this main
thread will be blocked on getIPAddress
method.
In second way the main loop can be reduced to:
int loop =0;
while (loop < 2)
{
auto udp = std::make_shared<udpFindQSYNC>(ioc, 9720);
udp->run();
loop++;
// Run the I/O service on the main thread
ioc.run();
}
and I think this is what you want. You start first async operation in udp->run
and the rest work is performed in handlers.
When tcpQSYNC
is created? In onReceiveData
because then you know the address of the other side which you want to connect to.
void onReceiveData(const boost::system::error_code &errorCode, std::size_t bytes_transferred)
{
m_timer.cancel();
if (errorCode == boost::system::errc::success) {
std::cout << "UDP Received Data " << bytes_transferred << " byte" <<((bytes_transferred==1) ? " " : "s ") << getIPAddress() << std::endl;
// m_remoteEndpoint is filled here
std::make_shared<tcpQSYNC>(ioc, m_remoteEndpoint.address().to_string(), 9760)->run();
} else if (errorCode == boost::system::errc::operation_canceled) {
std::cout << "UDP Read Operation Cancelled " << std::endl;
}
}
This
std::array<uint8_t, 2> data = {{0, 0}};
m_socket.async_send_to(boost::asio::buffer(data, 2), m_localEndpoint,
[self = shared_from_this()](boost::system::error_code errorCode, std::size_t bytes) {
self->onBroadcastComplete(errorCode, bytes);
});
is undefined behaviour. data
is local. async_send_to
returns immediately. boost::asio::buffer
doesn't make a copy of passed buffer.
You can store data
as data member of your class to ensure that buffer lives for the whole time while async_send_to
is being performed. Or put it into shared_ptr
and pass smart pointer to lambda by value - lifetime of data will be prolonged.
In tcpQSYNC
, why are you passing endpoint to m_socket
?
m_socket(ioc, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0))
this constructor will bind socket to the given endpoint. What for? You are a client, not server. You should pass only protocol:
m_socket(ioc,boost::asio::ip::tcp::v4()),
Upvotes: 1