Furch Radeon
Furch Radeon

Reputation: 77

Boost Asio Timeout Approach

enter image description here

I have situation where I need to collect data from more than 100 clients in 100 ms. After that time I need to process collected data. When process is done, need to restart step where I am collecting data from the clients and so on in the loop.

To collect the data I am using the current implementation :

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>

#include <boost/date_time/posix_time/posix_time.hpp>

#include <iostream>
#include <list>
#include <set>

namespace net = boost::asio;
using net::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::placeholders;

struct listener {

    using Buffer = std::array<char, 100>; // receiver buffer
    udp::socket s;

    listener(net::any_io_executor ex, uint16_t port) : s{ex, {{}, port}} {}

    void start() {
        read_loop(error_code{}, -1); // prime the async pump
    }

    void stop() {
        post(s.get_executor(), [this] { s.cancel(); });
    }

    void report() const {
        std::cout << s.local_endpoint() << ": A total of " << received_packets
                  << " were received from " << unique_senders.size()
                  << " unique senders\n";
    }

private:
    Buffer receive_buffer;
    udp::endpoint         sender;

    std::set<udp::endpoint> unique_senders;
    size_t                  received_packets = 0;


    void read_loop(error_code ec, size_t bytes) {
        if (bytes != size_t(-1)) {
            // std::cout << "read_loop (" << ec.message() << ")\n";
            if (ec)
                return;

            received_packets += 1;
            unique_senders.insert(sender);
             std::cout << "Received:" << bytes << " sender:" << sender << " recorded:"
             << received_packets << "\n";
             //std::cout <<
            // std::string_view(receive_buffer.data(), bytes) << "\n";
        }
        s.async_receive_from(net::buffer(receive_buffer), sender,
                             std::bind_front(&listener::read_loop, this));
    };
};

int main() {
    net::thread_pool io(1); // single threaded

    using Timer = net::steady_timer;
    using TimePoint = std::chrono::steady_clock::time_point;
    using Clock = std::chrono::steady_clock;

    Timer timer_(io);
    std::list<listener> listeners;

    auto each = [&](auto mf) { for (auto& l : listeners) (l.*mf)(); };

    for (uint16_t port : {1234, 1235, 1236})
        listeners.emplace_back(io.get_executor(), port);

    each(&listener::start);

    TimePoint startTP = Clock::now();
    timer_.expires_at(startTP + 100ms); // collect data for 100 ms
    timer_.async_wait([&](auto &&){each(&listener::stop);});

    std::cout << "Done ! \n";
    each(&listener::report);
    

    io.join();
}

Is it okay approach to stop collecting process ?

TimePoint startTP = Clock::now();
timer_.expires_at(startTP + 100ms); // collect data for 100 ms
timer_.async_wait([&](auto &&){each(&listener::stop);});

Upvotes: 1

Views: 439

Answers (1)

sehe
sehe

Reputation: 392833

I'm interpreting this as basically asking how to combine

This is also reflected in your comment there:

I have one bottleneck. According to last code what you shared with me : [...] I tried to add the condition : record 100 ms of the data and after resume the sockets, go to process collected data. When is done, start again 100 ms to collect data from sockets and again process for 900 ms etc... The problem is that each listener now have its own current time. I am thinking how to have everything in one place, and when 100 ms is elapsed, notify all 'listeners' to resume using "stop() function provided by you".

It would seem much easier to use the same time-slice calculation I used in the first (single-listener) example.

The whole point of the way I calculated time slices was to allow for synchronization to a clock, without time-drift. The beauty of it is that it translates 1:1 on multi-listeners.

Here's the combination with 1 timer per listener but synchronized time slices, created in exactly the same way I created the multi-listener sample from the original answer code:

(just copied all the things related to read_loop into a class, and done)

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <list>
#include <set>

namespace net = boost::asio;
using net::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::placeholders;

using Clock = std::chrono::steady_clock;
using Timer = net::steady_timer;
constexpr auto period      = 1s;
constexpr auto armed_slice = 100ms;

struct listener {
    udp::socket s;

    listener(Clock::time_point start, net::any_io_executor ex, uint16_t port)
        : s{ex, {{}, port}}
        , start_{start} {}

    void start() {
        read_loop(event::init, error_code{}, -1); // prime the async pump
    }

    void stop() {
        post(s.get_executor(), [this] {
            stopped_ = true;
            s.cancel();
            timer.cancel();
        });
    }

    void report() const {
        std::cout << s.local_endpoint() << ": A total of " << received_packets
                  << " were received from " << unique_senders.size()
                  << " unique senders\n";
    }

  private:
    std::atomic_bool stopped_{false};
    enum class event { init, resume, receive };

    Clock::time_point const start_;
    Timer                   timer{s.get_executor()};
    std::array<char, 100>   receive_buffer;
    udp::endpoint           sender;

    std::set<udp::endpoint> unique_senders;
    size_t                  received_packets = 0;

    void read_loop(event ev, error_code ec, [[maybe_unused]] size_t bytes) {
        if (stopped_)
            return;
        auto const now    = Clock::now();
        auto const relnow = now - start_;
        switch (ev) {
        case event::receive:
            // std::cout << s.local_endpoint() << "receive (" << ec.message()
            //<< ")\n";
            if (ec)
                return;

            if ((relnow % period) > armed_slice) {
                // ignore this receive

                // wait for next slice
                auto next_slice = start_ + period * (relnow / period + 1);
                std::cout << s.local_endpoint() << " Waiting "
                          << (next_slice - now) / 1ms << "ms ("
                          << received_packets << " received)\n";
                timer.expires_at(next_slice);
                return timer.async_wait(std::bind(&listener::read_loop, this,
                                                  event::resume, _1, 0));
            } else {
                received_packets += 1;
                unique_senders.insert(sender);
                /*
                 *std::cout << s.local_endpoint() << " Received:" << bytes
                 *          << " sender:" << sender
                 *          << " recorded:" << received_packets << "\n";
                 *std::cout << std::string_view(receive_buffer.data(), bytes)
                 *          << "\n";
                 */
            }
            break;
        case event::resume:
            //std::cout << "resume (" << ec.message() << ")\n";
            if (ec)
                return;
            break;
        case event::init:
            //std::cout << s.local_endpoint() << " init " << (now - start_) / 1ms << "ms\n";
            break;
        };
        s.async_receive_from(
            net::buffer(receive_buffer), sender,
            std::bind_front(&listener::read_loop, this, event::receive));
    }
};

int main() {
    net::thread_pool io(1); // single threaded

    std::list<listener> listeners;

    auto each = [&](auto mf) { for (auto& l : listeners) (l.*mf)(); };

    auto const start = Clock::now();

    for (uint16_t port : {1234, 1235, 1236})
        listeners.emplace_back(start, io.get_executor(), port);

    each(&listener::start);

    // after 5s stop
    std::this_thread::sleep_for(5s);

    each(&listener::stop);

    io.join();
    
    each(&listener::report);
}

Live Demo:

enter image description here

EDIT In case the output goes too fast to interpret:

0.0.0.0:1234 Waiting 899ms (1587 received)
0.0.0.0:1236 Waiting 899ms (1966 received)
0.0.0.0:1235 Waiting 899ms (1933 received)
0.0.0.0:1235 Waiting 899ms (4054 received)
0.0.0.0:1234 Waiting 899ms (3454 received)
0.0.0.0:1236 Waiting 899ms (4245 received)
0.0.0.0:1236 Waiting 899ms (6581 received)
0.0.0.0:1235 Waiting 899ms (6257 received)
0.0.0.0:1234 Waiting 899ms (5499 received)
0.0.0.0:1235 Waiting 899ms (8535 received)
0.0.0.0:1234 Waiting 899ms (7494 received)
0.0.0.0:1236 Waiting 899ms (8811 received)
0.0.0.0:1236 Waiting 899ms (11048 received)
0.0.0.0:1234 Waiting 899ms (9397 received)
0.0.0.0:1235 Waiting 899ms (10626 received)
0.0.0.0:1234: A total of 9402 were received from 7932 unique senders
0.0.0.0:1235: A total of 10630 were received from 8877 unique senders
0.0.0.0:1236: A total of 11053 were received from 9133 unique senders

If you are sure you are remaining single threaded, you might consider using the same actual timer, at the cost of significantly increasing complexity.

Upvotes: 1

Related Questions