Clemens
Clemens

Reputation: 53

Trying to write UDP server class, io_context doesn't block

I try to open a UDP server. A baby example works (I receive what I expect and what wireshark also shows): Baby example:

int main(int argc, char* const argv[])
{
  try 
  {
    boost::asio::io_context io_context;
    boost::asio::ip::udp::endpoint ep(boost::asio::ip::udp::v4(), 60001);
    boost::asio::ip::udp::socket sock(io_context, ep);
    UDPServer server(std::move(sock), callbackUDP);
    io_context.run();
  }
  catch (std::exception& e) 
  {
      std::cerr << e.what() << std::endl;
  }
}

UDPServer.hpp:

#include <boost/asio.hpp>
#include <functional>
#include <vector>
#include <thread>

#define BUFFERSIZE 1501

class UDPServer
{
public:
    explicit UDPServer(boost::asio::ip::udp::socket socket, std::function<void(const std::vector<char>&)> callbackFunction);
    virtual ~UDPServer();
private:
    void read();
    boost::asio::ip::udp::socket socket_;
    boost::asio::ip::udp::endpoint endpoint_;
    std::function<void(const std::vector<char>&)> callbackFunction_;
    char data_[1500 + 1]; // 1500 bytes is safe limit as it is max of ethernet frame, +1 is for \0 terminator
};

UDPServer.cpp:

#include <iostream>
#include "UDPServer.h"

UDPServer::UDPServer(boost::asio::ip::udp::socket socket, std::function<void(const std::vector<char>&)> callbackFunction):
socket_(std::move(socket)),
callbackFunction_(callbackFunction)
{
    read();
}

UDPServer::~UDPServer()
{
}

void UDPServer::read() 
{
    socket_.async_receive_from(boost::asio::buffer(data_, 1500), endpoint_,
        [this](boost::system::error_code ec, std::size_t length) 
        {
            if (ec)
            {
                return;
            }
            data_[length] = '\0';
            if (strcmp(data_, "\n") == 0)
            {
                return;
            }
            std::vector<char> dataVector(data_, data_ + length);
            callbackFunction_(dataVector);
            read();
        }
    );
}

Now what I want to convert this to is a class with as constructor only the port and a callback function (let forget about the latter and just print the message for now, adding the callback is normally no problem).

I tried the following, but it doesn't work:

int main(int argc, char* const argv[])
{
  UDPServer server(60001);
}

UDPServer.h:

#include <boost/asio.hpp>
#include <functional>
#include <vector>
#include <thread>

#define BUFFERSIZE 1501

class UDPServer
{
public:
    explicit UDPServer(uint16_t port);
    virtual ~UDPServer();
private:
    boost::asio::io_context io_context_;
    boost::asio::ip::udp::socket socket_;
    boost::asio::ip::udp::endpoint endpoint_;
    std::array<char, BUFFERSIZE> recv_buffer_;
    std::thread thread_;
    void run();
    void start_receive();
    void handle_reply(const boost::system::error_code& error, std::size_t bytes_transferred);
};

UDPServer.cpp:

#include <iostream>
#include "UDPServer.h"
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <iostream>

UDPServer::UDPServer(uint16_t port):
endpoint_(boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port)),
io_context_(),
socket_(io_context_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port)),
thread_(&UDPServer::run, this)
{
        start_receive();
}

UDPServer::~UDPServer()
{
    io_context_.stop();
    thread_.join();
}

void UDPServer::start_receive()
{
    socket_.async_receive_from(boost::asio::buffer(recv_buffer_), endpoint_,
        boost::bind(&UDPServer::handle_reply, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void UDPServer::handle_reply(const boost::system::error_code& error, std::size_t bytes_transferred)
{
    if (!error)
    {
        try {
            std::string string(recv_buffer_.data(), recv_buffer_.data() + bytes_transferred);
            std::cout << "Message received: " << std::to_string(bytes_transferred) << ", " << string << std::endl;
        }
        catch (std::exception ex) {
            std::cout << "handle_reply: Error parsing incoming message:" << ex.what() << std::endl;
        }
        catch (...) 
        {
            std::cout << "handle_reply: Unknown error while parsing incoming message" << std::endl;
        }
    }
    else
    {
        std::cout << "handle_reply: error: " << error.message() << std::endl;
    }
    start_receive();
}

void UDPServer::run()
{
    try {
        io_context_.run();
    } catch( const std::exception& e ) 
    {
        std::cout << "Server network exception: " << e.what() << std::endl;
    }
    catch(...) 
    {
        std::cout << "Unknown exception in server network thread" << std::endl;
    }
    std::cout << "Server network thread stopped" << std::endl;
};

When running I get "Server network thread stopped". io_context doesn't seem to start and doesn't block. Someone an idea what I do wrong? Thanks a lot!

EDIT tried this after comment, same result (except that message comes after 1 second)

UDPServer::UDPServer(uint16_t port):
endpoint_(boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port)),
io_context_(),
socket_(io_context_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port))
{
        start_receive();
        std::this_thread::sleep_for (std::chrono::seconds(1));
        thread_ = std::thread(&UDPServer::run, this);
}

Upvotes: 1

Views: 393

Answers (1)

sehe
sehe

Reputation: 393769

Your destructor explicitly tells the service to stop:

UDPServer::~UDPServer() {
    io_context_.stop();
    thread_.join();
}

That's part of your problem. The other part is as pointed out in the comment: you have a race condition where the thread exits before you even post your first async operation.

Solve it by adding a work guard:

boost::asio::io_context io_;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_ {io_.get_executor()};

Now the destructor can be:

UDPServer::~UDPServer() {
    work_.reset(); // allow service to run out of work
    thread_.join();
}

Other notes:

  • avoid chaining back to start_receive when there was an error

  • std::to_string was redundant

  • the order of initialization for members is defined by the order of their declaration, not their initializers in the initializer list. Catch these bug sources with -Wall -Wextra -pedantic = handle exceptions in your service thread (see Should the exception thrown by boost::asio::io_service::run() be caught?)

  • I'd suggest std::bind over boost::bind:

    std::bind(&UDPServer::handle_reply, this,
              std::placeholders::_1,
              std::placeholders::_2));
    
  • Or just use a lambda:

     [this](error_code ec, size_t xfer) { handle_reply(ec, xfer); });
    

LIVE DEMO

Compiler Explorer

#include <boost/asio.hpp>
#include <fstream>
#include <functional>
#include <iomanip>
#include <iostream>
#include <thread>
#include <vector>

using boost::asio::ip::udp;
using boost::system::error_code;
using boost::asio::io_context;

#define BUFFERSIZE 1501

class UDPServer {
  public:
    explicit UDPServer(uint16_t port);
    virtual ~UDPServer();

  private:
    io_context io_;
    boost::asio::executor_work_guard<io_context::executor_type> work_ {io_.get_executor()};
    udp::endpoint endpoint_;
    udp::socket socket_;
    std::array<char, BUFFERSIZE> recv_buffer_;
    std::thread thread_;

    void run();
    void start_receive();
    void handle_reply(const error_code& error, size_t transferred);
};

UDPServer::UDPServer(uint16_t port)
        : endpoint_(udp::endpoint(udp::v4(), port)),
          socket_(io_, endpoint_), 
          thread_(&UDPServer::run, this) {
    start_receive();
}

UDPServer::~UDPServer() {
    work_.reset(); // allow service to run out of work
    thread_.join();
}

void UDPServer::start_receive() {
    socket_.async_receive_from(boost::asio::buffer(recv_buffer_), endpoint_,
#if 0
            std::bind(&UDPServer::handle_reply, this,
                std::placeholders::_1,
                std::placeholders::_2));
#else
            [this](error_code ec, size_t xfer) { handle_reply(ec, xfer); });
#endif
}

void UDPServer::handle_reply(const error_code& error, size_t transferred) {
    if (!error) {
        try {
            std::string_view s(recv_buffer_.data(), transferred);
            std::cout << "Message received: " << transferred << ", "
                      << std::quoted(s) << "\n";
        } catch (std::exception const& ex) {
            std::cout << "handle_reply: Error parsing incoming message:"
                      << ex.what() << "\n";
        } catch (...) {
            std::cout
                << "handle_reply: Unknown error while parsing incoming message\n";
        }

        start_receive();
    } else {
        std::cout << "handle_reply: error: " << error.message() << "\n";
    }
}

void UDPServer::run() {
    while (true) {
        try {
            if (io_.run() == 0u) {
                break;
            }
        } catch (const std::exception& e) {
            std::cout << "Server network exception: " << e.what() << "\n";
        } catch (...) {
            std::cout << "Unknown exception in server network thread\n";
        }
    }
    std::cout << "Server network thread stopped\n";
}

int main() {
    std::cout << std::unitbuf;
    UDPServer server(60001);
}

Testing with random words:

sort -R /etc/dictionaries-common/words | while read w; do sleep 1; netcat -u localhost 60001 -w 0 <<<"$w"; done

Live output:

enter image description here

Upvotes: 2

Related Questions