whs31-
whs31-

Reputation: 19

Boost::Asio deadline timer blocks UDP socket read

I have a simple class whose purpose is to read data over UDP all the time, and send a request packet every X seconds. I tried to implement scheduled request sending via boost::asio::deadline_timer, however my code started to behave strangely: it correctly reads incoming packets until the timer fires for the first time, then it stops reading and the socket only sends data. I confirmed that the problem is not on the side of the device with which the connection is being made.

Here's a summary of my code:

powerswitch.h:

#pragma once

/* includes */

namespace a
{
  using std::thread;
  using std::unique_ptr;
  using std::array;

  class PowerSwitch 
  {
    struct [[gnu::packed]] RequestPacket { /* ... */ };
    struct [[gnu::packed]] ResponsePacket { /* ... */ };
    struct ChannelData { /* ... */ };

    public:
      explicit PowerSwitch(string_view ipv4, u16 port, boost::asio::io_context& context, std::chrono::seconds request_interval);
      ~PowerSwitch();

      auto toggle_channel(int channel) -> void;
      auto stop() -> void;

    private:
      auto configure(string_view ipv4, u16 port, std::chrono::seconds request_interval) -> expected<void, string>;
      auto request() -> void;
      auto read() -> void;
      auto handle_incoming(usize bytes) -> void;
      auto handle_timer() -> void;
      auto write(string_view data) -> void;

    private:
      boost::asio::ip::udp::socket m_socket;
      boost::asio::ip::udp::endpoint m_endpoint;
      boost::asio::ip::udp::endpoint m_target;
      boost::asio::deadline_timer m_timer;
      std::chrono::seconds m_request_interval;
      array<u8, 1024> m_buffer;
      array<ChannelData, 8> m_channels;
  };
}

powerswitch.cpp:

/* includes */

using std::span;
using std::vector;
namespace asio = boost::asio;

namespace a
{
  constexpr u16 LOCAL_PORT = 12000;
  constexpr u16 DUMMY_CHANNEL = 9'999;
  constexpr auto REQUEST_MARKER = 0xAAAAAAAA;

  PowerSwitch::PowerSwitch(
    const string_view ipv4,
    const u16 port,
    asio::io_context& context,
    const std::chrono::seconds request_interval
  )
    : m_socket(context, asio::ip::udp::endpoint()),
      m_endpoint(this->m_socket.local_endpoint()),
      m_timer(context, boost::posix_time::seconds(request_interval.count())),
      m_request_interval(request_interval),
      m_buffer(array<u8, 1024>()),
      m_channels(array<ChannelData, 8>())
  {
    this->configure(ipv4, port, request_interval)
      .map_error([](const auto& e){ llog::error("failed to initialize powerswitch: {}", e); });
  }

  PowerSwitch::~PowerSwitch() { this->stop(); }

  auto PowerSwitch::toggle_channel(const int channel) -> void
  {
    const auto packet = RequestPacket {
      .marker = REQUEST_MARKER,
      .channel = static_cast<u16>(channel),
      .response_port = this->m_endpoint.port(),
      .checksum = 0x0000
    };

    this->write({ reinterpret_cast<const char*>(&packet), sizeof(packet) });
  }

  auto PowerSwitch::stop() -> void
  {
    this->m_socket.close();
    llog::trace("closing connection to {}", this->m_endpoint.address().to_string());
  }

  auto PowerSwitch::configure(
    const string_view ipv4,
    const u16 port,
    const std::chrono::seconds request_interval
  ) -> expected<void, string>
  {
    this->stop();
    try {
      this->m_endpoint = asio::ip::udp::endpoint(
        asio::ip::make_address_v4(ip::Ipv4::local_address_unchecked().address),
        LOCAL_PORT
      );

      llog::trace("opening socket at {}:{}", this->m_endpoint.address().to_string(), this->m_endpoint.port());

      this->m_socket.open(this->m_endpoint.protocol());
      this->m_socket.bind(this->m_endpoint);
      this->m_request_interval = request_interval;

      this->m_target = asio::ip::udp::endpoint(
        asio::ip::make_address_v4(ipv4),
        port
      );

      this->m_timer = asio::deadline_timer(
        this->m_socket.get_executor(),
        boost::posix_time::seconds(request_interval.count())
      );
      this->handle_timer();

      llog::debug("powerswitch service started at {}:{} (receiving from {}:{})",
        this->m_socket.local_endpoint().address().to_string(),
        this->m_socket.local_endpoint().port(),
        this->m_target.address().to_string(),
        this->m_target.port()
      );
    } catch(const std::exception& e) {
      return Err("exception: {}", e.what());
    }
    this->read();
    return {};
  }

  auto PowerSwitch::request() -> void {
    llog::trace("powerswitch: sending planned request");
    this->toggle_channel(DUMMY_CHANNEL);
  }

  auto PowerSwitch::read() -> void
  {
    this->m_socket.async_receive_from(
      asio::buffer(this->m_buffer),
      this->m_endpoint,
      [this](const auto& ec, const auto& bytes_transferred)
      {
        if(ec) {
          llog::error("powerswitch: error: {}", ec.what());
          return;
        }
        this->handle_incoming(bytes_transferred);
        this->read();
      }
    );
  }

  auto PowerSwitch::handle_incoming(const usize bytes) -> void
  {
    const auto raw = span(this->m_buffer).first(bytes);
    for(const auto datagram = reinterpret_cast<array<ResponsePacket, 8>*>(raw.data());
      const auto& [marker, channel, enabled, voltage, current] : *datagram) {
      this->m_channels[channel] = ChannelData {
        .voltage = static_cast<f32>(voltage) / 1'000.0f,
        .current = static_cast<f32>(current),
        .enabled = static_cast<bool>(enabled)
      };
    }

    llog::trace("[:{} {}V {} mA]", this->m_channels.front().enabled, this->m_channels.front().voltage, this->m_channels.front().current);
  }

  auto PowerSwitch::handle_timer() -> void
  {
    this->request();
    this->m_timer.expires_from_now(boost::posix_time::seconds(this->m_request_interval.count()));
    this->m_timer.async_wait(
      [this](const auto& ec)
      {
        if(ec) {
          llog::error("powerswitch: error: {}", ec.what());
          return;
        }
        this->handle_timer();
      }
    );
  }

  auto PowerSwitch::write(const string_view data) -> void
  {
    this->m_socket.async_send_to(
      asio::buffer(data),
      this->m_target,
      [this](const auto& ec, const auto& bytes_transferred)
      {
        if(ec) {
          llog::error("powerswitch: error: {}", ec.what());
          return;
        }
        llog::trace("powerswitch: sent {} bytes to {}:{}",
          bytes_transferred,
          this->m_target.address().to_string(),
          this->m_target.port()
        );
      }
    );
    this->read();
  }
}

main.cpp:

auto context = boost::asio::io_context();
  auto powerswitch = a::PowerSwitch(
    "192.168.1.50",
    44000,
    context,
    std::chrono::seconds(5)
  );

  context.run();
  return 0;

I got the following output with code above:

[ trace  ] [thread 7144 ]: opening socket at 192.168.1.10:12000 
[ trace  ] [thread 7144 ]: powerswitch: sending planned request 
[ debug  ] [thread 7144 ]: powerswitch service started at 192.168.1.10:12000 (receiving from 192.168.1.50:44000) 
[ trace  ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000 
[ trace  ] [thread 7144 ]: [:false 11.961V 477 mA] 
[ trace  ] [thread 7144 ]: [:false 11.934V 493 mA] 
[ trace  ] [thread 7144 ]: [:false 11.974V 728 mA] 
[ trace  ] [thread 7144 ]: [:false 12.006V 543 mA] 
[ trace  ] [thread 7144 ]: [:false 12.004V 543 mA] 
[ trace  ] [thread 7144 ]: [:false 11.953V 692 mA] 
[ trace  ] [thread 7144 ]: [:false 11.959V 491 mA] 
[ trace  ] [thread 7144 ]: [:false 12.063V 583 mA] 
[ trace  ] [thread 7144 ]: [:false 11.833V 615 mA] 
[ trace  ] [thread 7144 ]: powerswitch: sending planned request 
[ trace  ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000 
[ trace  ] [thread 7144 ]: [:false 12.075V 613 mA] 
[ trace  ] [thread 7144 ]: powerswitch: sending planned request 
[ trace  ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000 
[ trace  ] [thread 7144 ]: powerswitch: sending planned request 
[ trace  ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000 
[ trace  ] [thread 7144 ]: powerswitch: sending planned request 
[ trace  ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000 
[ trace  ] [thread 7144 ]: powerswitch: sending planned request 
[ trace  ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000 

I would be grateful for any help or advice.

Upvotes: 0

Views: 64

Answers (1)

sehe
sehe

Reputation: 393487

There are many issues. Most of them due to over-complication.

The first big issue that caught my eye is that you write the local variable packet in the toggle_channel method. That leads to Undefined Behavior because the async write operation will use it after its lifetime has ended.

To be honest, since UDP is fire-and-forget in nature, I don't see the need to use async operations here. You can just use the synchronous send_to method and be done with it.

Both your read() loop and write() chain to more read() operations, which means that you will end up with many read operations pending. This likely explains the problem that made you post the question.

The stop() in the destructor really has no use. It will only be reached after the iocontext has run out of work. That means that nothing is there to stop. What you might want is a cancel() method that cancels all pending operations and then wait for them to complete.

When you handle incoming message, you have to treat it as untrusted input. That means that you should validate it before using it. Most importantly, you should validate channel index is within bounds. Using std::array::at instead of std::array::operator[] will give you bounds checking.

Simplifying handle_incoming method might look like:

void PowerSwitch::handle_incoming(usize bytes) {
    auto n_reponses = bytes / sizeof(ResponsePacket);
    auto responses  = std::span<ResponsePacket const>(
        reinterpret_cast<ResponsePacket const*>(m_buffer.data()), n_reponses);

    for (auto& [marker, channel, enabled, voltage, current] : responses) {
        m_channels.at(channel) = {.voltage = voltage / 1'000.0f,
                                  .current = static_cast<f32>(current),
                                  .enabled = static_cast<bool>((enabled))};
    }

    auto& chan = m_channels.front();
    llog::trace("[:{} {}V {} mA]", chan.enabled, chan.voltage, chan.current);
}

Note that this presumes that you make sure the buffer is properly aligned for ResponsePacket:

alignas(ResponsePacket) std::array<u8, 1024> m_buffer{};

Extending the checks a bit:

void PowerSwitch::handle_incoming(usize bytes) {
    auto n_reponses = bytes / sizeof(ResponsePacket);
    auto responses  = std::span<ResponsePacket const>(
        reinterpret_cast<ResponsePacket const*>(m_buffer.data()), n_reponses);

    for (auto& [marker, channel, enabled, voltage, current] : responses) {
        switch (marker) {
            case RESPONSE_MARKER: {
                if (channel >= m_channels.size()) {
                    llog::warn("powerswitch: invalid channel: {:#x}", channel);
                    break;
                }
                m_channels.at(channel) = {.voltage = voltage / 1'000.0f,
                                          .current = static_cast<f32>(current),
                                          .enabled = static_cast<bool>((enabled))};
                break;
            }
            default: llog::warn("powerswitch: invalid response marker: {:#x}", marker); break;
        }
    }

    auto& chan = m_channels.front();
    llog::trace("[:{} {}V {} mA]", chan.enabled, chan.voltage, chan.current);
}

DEMO

Here's my attempt at fixing these things. I have made up some details to make it self-contained, and simplified many things along the way.

Live On Coliru

#include <boost/asio.hpp>
#include <fmt/core.h>
#include <fmt/ostream.h>
#include <fmt/ranges.h>
#include <iostream>
#include <span>
#include <string_view>
namespace asio = boost::asio;
using namespace std::chrono_literals;

template <> struct fmt::formatter<asio::ip::udp::endpoint> : ostream_formatter {};

namespace llog {
    template <typename... Args> constexpr auto trace(auto const& fmt, Args const&... args) -> void {
        std::cout << "TRACE\t" << fmt::format(fmt::runtime(fmt), args...) << std::endl;
    }
    template <typename... Args> constexpr auto debug(auto const& fmt, Args const&... args) -> void {
        std::cout << "DEBUG\t" << fmt::format(fmt::runtime(fmt), args...) << std::endl;
    }
    template <typename... Args> constexpr auto error(auto const& fmt, Args const&... args) -> void {
        std::cout << "ERROR\t" << fmt::format(fmt::runtime(fmt), args...) << std::endl;
    }
    template <typename... Args> constexpr auto warn(auto const& fmt, Args const&... args) -> void {
        std::cout << "WARN\t" << fmt::format(fmt::runtime(fmt), args...) << std::endl;
    }   
} // namespace llog

namespace mylib {
    using udp = asio::ip::udp;
    using boost::system::error_code;
    using f32   = float;
    using u32   = uint32_t;
    using u16   = uint16_t;
    using u8    = uint8_t;
    using usize = size_t;
    using duration = std::chrono::steady_clock::duration;

    class PowerSwitch {
        struct [[gnu::packed]] RequestPacket {
            u32 marker;
            u16 channel;
            u16 response_port;
            u16 checksum;
        };
        struct [[gnu::packed]] ResponsePacket {
            u32 marker;
            u16 channel;
            u16 enabled;
            u16 voltage;
            u16 current;
        };
        struct ChannelData {
            f32  voltage;
            f32  current;
            bool enabled;
        };

        static_assert(sizeof(mylib::PowerSwitch::RequestPacket) == 10);
        static_assert(sizeof(mylib::PowerSwitch::ResponsePacket) == 12);
        static_assert(std::is_trivial_v<mylib::PowerSwitch::RequestPacket>);
        static_assert(std::is_trivial_v<mylib::PowerSwitch::ResponsePacket>);
        static_assert(std::is_standard_layout_v<mylib::PowerSwitch::RequestPacket>);
        static_assert(std::is_standard_layout_v<mylib::PowerSwitch::ResponsePacket>);

      public:
        explicit PowerSwitch(std::string_view ipv4, u16 port, asio::io_context& context,
                             duration request_interval);
        ~PowerSwitch();

        void toggle_channel(int channel);
        void cancel();

      private:
        void configure(std::string_view ipv4, u16 port, duration request_interval);
        void send_request();
        void read_loop();
        void handle_incoming(usize bytes);
        void timed_request_loop();
        void write(std::span<std::byte const> data);

      private:
        udp::socket        m_socket;
        udp::endpoint      m_endpoint = m_socket.local_endpoint();
        duration           m_request_interval;
        udp::endpoint      m_target;
        asio::steady_timer m_timer{m_socket.get_executor(), m_request_interval};

        std::array<ChannelData, 8> m_channels{};
        alignas(ResponsePacket) std::array<u8, 1024> m_buffer{};
    };
} // namespace mylib

namespace mylib {
    constexpr u16  LOCAL_PORT      = 12000;
    constexpr u16  DUMMY_CHANNEL   = 9'999;
    constexpr auto REQUEST_MARKER  = 0xAAAAAAAA;
    constexpr auto RESPONSE_MARKER = 0x55555555;

    PowerSwitch::PowerSwitch(std::string_view ipv4, u16 port, asio::io_context& context,
                             duration request_interval)
        : m_socket(context, {{}, LOCAL_PORT})
        , m_request_interval(request_interval)
        , m_target(asio::ip::make_address_v4(ipv4), port) {

        llog::trace("opening socket at {}", m_endpoint);

        timed_request_loop();
        read_loop();

        llog::debug("powerswitch service started at {} (receiving from {})", m_endpoint, m_target);
    }

    PowerSwitch::~PowerSwitch() { cancel(); }

    void PowerSwitch::toggle_channel(int channel) {
        RequestPacket packet[]{
            {.marker        = REQUEST_MARKER,
             .channel       = static_cast<u16>(channel),
             .response_port = m_endpoint.port(),
             .checksum      = 0x0000},
        };

        write(std::as_bytes(std::span(packet)));
    }

    void PowerSwitch::cancel() {
        llog::trace("closing connection to {}", m_endpoint);
        m_timer.cancel();
        m_socket.cancel();
    }

    void PowerSwitch::send_request() {
        llog::trace("powerswitch: sending planned request");
        toggle_channel(DUMMY_CHANNEL);
    }

    void PowerSwitch::read_loop() {
        m_socket.async_receive_from( //
            asio::buffer(m_buffer), m_endpoint, [this](error_code ec, size_t bytes_transferred) {
                if (ec) {
                    llog::error("powerswitch: error: {}", ec.what());
                    return;
                }
                handle_incoming(bytes_transferred);
                read_loop();
            });
    }

    void PowerSwitch::handle_incoming(usize bytes) {
        auto n_reponses = bytes / sizeof(ResponsePacket);
        auto responses  = std::span<ResponsePacket const>(
            reinterpret_cast<ResponsePacket const*>(m_buffer.data()), n_reponses);
        llog::debug("powerswitch: received {} responses in {} bytes", n_reponses, bytes);

        for (auto& [marker, channel, enabled, voltage, current] : responses) {
            switch (marker) {
                case RESPONSE_MARKER: {
                    if (channel >= m_channels.size()) {
                        llog::warn("powerswitch: invalid channel: {:#x}", channel);
                        break;
                    }
                    auto& c = m_channels.at(channel);
                    c       = {.voltage = voltage / 1'000.0f,
                               .current = static_cast<f32>(current),
                               .enabled = static_cast<bool>((enabled))};
                    llog::debug("powerswitch: updated channel {} [:{} {}V {} mA]", channel, //
                                c.enabled, c.voltage, c.current);
                    break;
                }
                default: llog::warn("powerswitch: invalid response marker: {:#x}", marker); break;
            }
        }

        /*
         * auto& chan = m_channels.front();
         * llog::trace("front [:{} {}V {} mA]", chan.enabled, chan.voltage, chan.current);
         */
    }

    void PowerSwitch::timed_request_loop() {
        send_request();
        m_timer.expires_from_now(m_request_interval);
        m_timer.async_wait([this](error_code ec) {
            if (ec) {
                llog::error("powerswitch: error: {}", ec.what());
                return;
            }
            timed_request_loop();
        });
    }

    void PowerSwitch::write(std::span<std::byte const> data) {
        error_code ec;
        auto       bytes_transferred = m_socket.send_to(asio::buffer(data), m_target, {}, ec);

        if (ec)
            llog::error("powerswitch: error: {}", ec.what());

        llog::trace("powerswitch: sent {} bytes to {} ({})", bytes_transferred, m_target, ec.message());
    }
} // namespace mylib

int main() {
    auto context     = asio::io_context();
 // auto powerswitch = mylib::PowerSwitch("192.168.1.50", 44000, context, 5s);
    auto powerswitch = mylib::PowerSwitch("127.0.0.1", 44000, context, 5s);

    asio::signal_set signals(context, SIGINT, SIGTERM);
    signals.async_wait([&](mylib::error_code ec, int signal) {
        if (!ec) {
            llog::debug("received signal {}", signal);
            //context.stop();
            powerswitch.cancel();
        }
    });

    context.run();
}

With a local demo:

enter image description here

Upvotes: 1

Related Questions