Will Lee
Will Lee

Reputation: 61

boost::asio::async_write write ssl::stream succuess but server not get

I code a ssl server and client run a pingpang process, after a little time, client say send data success but server not get it. client run in multi thread, when single thread, seem normal. and i try add timer to add shake hand, then server can get all data, but i want it can run rightly with out shake hand can anyone help figure out whats wrong.

here is my server

#include <cstdlib>
#include <functional>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/bind/bind.hpp>

using boost::asio::ip::tcp;

class session : public std::enable_shared_from_this<session> {
public:
    session(tcp::socket socket, boost::asio::ssl::context &context)
        : socket_(std::move(socket), context), m_strand(socket.get_executor()) {
    }

    void start() {
        do_handshake();
    }

private:
    void do_handshake() {
        auto self(shared_from_this());
        socket_.async_handshake(boost::asio::ssl::stream_base::server,
                                [this, self](const boost::system::error_code &error) {
                                    if (!error) {
                                        do_read();
                                    }
                                });
    }

    void do_read() {
        auto self(shared_from_this());

        socket_.async_read_some(boost::asio::buffer(data_),
                                [this, self](const boost::system::error_code &ec, std::size_t length) {
                                    if (!ec) {
                                        std::cout << "get <";
                                        std::cout.write(data_, length);
                                        std::cout << std::endl;
                                        do_write(length);
                                    }
                                });
    }

    void do_write(std::size_t length) {
        auto self(shared_from_this());
        std::cout << "send <";
        std::cout.write(data_, length);
        std::cout << std::endl;
        boost::asio::async_write(socket_, boost::asio::buffer(data_, length),
                                 [this, self](const boost::system::error_code &ec,
                                              std::size_t /*length*/) {
                                     if (!ec) {
                                         do_read();
                                     }
                                 });
    }

    boost::asio::ssl::stream<tcp::socket> socket_;
    boost::asio::strand<boost::asio::ip::tcp::socket::executor_type> m_strand;
    char data_[1024];
};

class server {
public:
    server(boost::asio::io_context &io_context, unsigned short port)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
          context_(boost::asio::ssl::context::sslv23) {
        context_.set_options(
            boost::asio::ssl::context::default_workarounds
            | boost::asio::ssl::context::no_sslv2
            | boost::asio::ssl::context::single_dh_use);
        context_.set_password_callback(std::bind(&server::get_password, this));
        context_.use_certificate_chain_file("server.pem");
        context_.use_private_key_file("server.pem", boost::asio::ssl::context::pem);
        context_.use_tmp_dh_file("dh2048.pem");

        do_accept();
    }

private:
    std::string get_password() const {
        return "test";
    }

    void do_accept() {
        acceptor_.async_accept(
            [this](const boost::system::error_code &error, tcp::socket socket) {
                if (!error) {
                    std::make_shared<session>(std::move(socket), context_)->start();
                }

                do_accept();
            });
    }

    tcp::acceptor acceptor_;
    boost::asio::ssl::context context_;
};

int main(int argc, char *argv[]) {
    try {
        if (argc != 2) {
            std::cerr << "Usage: server <port>\n";
            return 1;
        }

        boost::asio::io_context io_context;

        using namespace std; // For atoi.
        server s(io_context, atoi(argv[1]));

        io_context.run();
    }
    catch (std::exception &e) {
        std::cerr << "Exception: " << e.what() << "\n";
    }

    return 0;
}

and next client

#include <cstdlib>
#include <cstring>
#include <functional>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/thread.hpp>

using boost::asio::ip::tcp;
using std::placeholders::_1;
using std::placeholders::_2;

enum {
    max_length = 1024
};

class client {
public:
    client(boost::asio::io_context &io_context,
           boost::asio::ssl::context &context,
           const tcp::resolver::results_type &endpoints)
        : socket_(io_context, context), strand_(io_context.get_executor()) {
        socket_.set_verify_mode(boost::asio::ssl::verify_peer);
        socket_.set_verify_callback(
            std::bind(&client::verify_certificate, this, _1, _2));

        connect(endpoints);
    }

private:
    bool verify_certificate(bool preverified,
                            boost::asio::ssl::verify_context &ctx) {
        char subject_name[256];
        X509 *cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
        X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
        std::cout << "Verifying " << subject_name << "\n";

        return true;
    }

    void connect(const tcp::resolver::results_type &endpoints) {
        boost::asio::async_connect(socket_.lowest_layer(), endpoints,
                                   [this](const boost::system::error_code &error,
                                          const tcp::endpoint & /*endpoint*/) {
                                       if (!error) {
                                           handshake();
                                       } else {
                                           std::cout << "Connect failed: " << error.message() << "\n";
                                       }
                                   });
    }

    void handshake() {
        socket_.async_handshake(boost::asio::ssl::stream_base::client,
                                [this](const boost::system::error_code &error) {
                                    if (!error) {
                                        send_request("hello ssl");
                                        boost::asio::post(strand_, std::bind(&client::recv, this));
                                    } else {
                                        std::cout << "Handshake failed: " << error.message() << "\n";
                                    }
                                });
    }

    void send_request(const std::string &msg) {
        boost::asio::async_write(
            socket_, boost::asio::buffer(msg),
            [this](const boost::system::error_code &error, std::size_t length) {
                if (!error) {
                    std::cout << "send data success, size : " << length << std::endl;
                } else {
                    std::cout << "Write failed: " << error.message() << std::endl;
                }
            });
    }

    void recv() {
        boost::asio::async_read(
            socket_, buffer_, boost::asio::transfer_exactly(9),
            boost::asio::bind_executor(
                strand_, [this](const boost::system::error_code &error, std::size_t length) {
                    if (!error) {
                        std::istream buffer(&buffer_);
                        std::vector<char> msg(length, 0);
                        buffer.readsome(msg.data(), length);
                        std::string recvMsg(msg.begin(), msg.end());
                        std::cout << recvMsg << std::endl;
                        send_request(recvMsg);
                        boost::asio::post(strand_, std::bind(&client::recv, this));
                    } else {
                        std::cout << "Read failed: " << error.message() << std::endl;
                    }
                }));
    }

    boost::asio::ssl::stream<tcp::socket> socket_;
    boost::asio::streambuf buffer_;
    boost::asio::strand<boost::asio::io_context::executor_type> strand_;
};

int main(int argc, char *argv[]) {
    try {
        if (argc != 3) {
            std::cerr << "Usage: client <host> <port>\n";
            return 1;
        }

        boost::asio::io_context io_context;

        tcp::resolver resolver(io_context);
        auto endpoints = resolver.resolve(argv[1], argv[2]);

        boost::asio::ssl::context ctx(boost::asio::ssl::context::sslv23);
        ctx.load_verify_file("ca.pem");

        client c(io_context, ctx, endpoints);
        boost::thread_group threadPool;
        for (size_t i = 0; i < boost::thread::hardware_concurrency(); ++i) {
            threadPool.create_thread(boost::bind(&boost::asio::io_context::run, &io_context));
        }
        io_context.run();
    }
    catch (std::exception &e) {
        std::cerr << "Exception: " << e.what() << "\n";
    }

    return 0;
}

when data not send to server,client will print like this

hello ssl
hello ssl
send data success, size : 9
send data success, size : 9

Upvotes: 0

Views: 1102

Answers (1)

sehe
sehe

Reputation: 393174

Check this out. If you remove the thread_group (as far as I can tell, it adds no value), everything works. This is a good sign that you have a threading bug.

I'm not in the mood to read the code until I see the problem, so let's circle a bit.

Adding ASAN/UBSAN shows nothing bad immediately, so that's good.

Let me look at the code a little then.

  • session creates a m_strand - which is never used...
  • you forgot to join the extra threads

Now that I noticed some potential confusion around the strand, I looked at the client strand use. And see that it is inconsistent:

  • the socket itself is NOT on the strand
  • send_request doesn't run on nor bind the completion handler to the strand's executor
  • the communication is full-duplex (meaning async_write and async_read happen concurrently).
  • this means that where client::recv is posted to the strand, it doesn't actually synchronize threaded access to socket_ (because the send_request is not tied to the strand in the first place)

If the above is surprising, you're not the first to fall into that: Why is `net::dispatch` needed when the I/O object already has an executor?. In your example connect() and handshake() can be considered safe because they form a logical strand (sequential flow of execution). The problem arises with the concurrent paths.

By far the simplest way to fix the situation seems to construct socket_ from the strand_. This implies reordering the members so strand_ is initialized first:

client(boost::asio::io_context& io_context, ssl::context& context,
       const tcp::resolver::results_type& endpoints)
    : strand_(io_context.get_executor())
    , socket_(strand_, context)

Next up, all posts to the strand can be dropped because they always happen from a completion handler on that strand.

send_request("hello ssl");
recv(); // already on the strand in this completion handler

The mild irony is that send_request was executed under the implied assumption that it was on the strand.

The cleaned up programs until this point are

  • File client.cpp

     #include <boost/asio.hpp>
     #include <boost/asio/ssl.hpp>
     #include <boost/thread.hpp>
     #include <cstdlib>
     #include <cstring>
     #include <functional>
     #include <iostream>
    
     using boost::asio::ip::tcp;
     using boost::system::error_code;
     using std::placeholders::_1;
     using std::placeholders::_2;
     namespace ssl = boost::asio::ssl;
    
     class client {
       public:
         client(boost::asio::io_context& io_context, ssl::context& context,
                const tcp::resolver::results_type& endpoints)
             : strand_(io_context.get_executor())
             , socket_(strand_, context)
         {
             socket_.set_verify_mode(ssl::verify_peer);
             socket_.set_verify_callback(
                 std::bind(&client::verify_certificate, this, _1, _2));
    
             connect(endpoints);
         }
    
       private:
         bool verify_certificate(bool preverified, ssl::verify_context& ctx)
         {
             char  subject_name[256];
             X509* cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
             X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
             std::cout << "Verifying " << subject_name << "\n";
    
             return true;
         }
    
         void connect(const tcp::resolver::results_type& endpoints)
         {
             async_connect( //
                 socket_.lowest_layer(), endpoints,
                 bind_executor(
                     strand_, [this](error_code error, const tcp::endpoint&) {
                         if (!error) {
                             handshake();
                         } else {
                             std::cout << "Connect failed: " << error.message() << "\n";
                         }
                     }));
         }
    
         void handshake()
         {
             socket_.async_handshake(
                 ssl::stream_base::client,
                 bind_executor(strand_, [this](error_code error) {
                     if (!error) {
                         send_request("hello ssl");
                         recv(); // already on the strand in this completion handler
                     } else {
                         std::cout << "Handshake failed: " << error.message()
                                   << "\n";
                     }
                 }));
         }
    
         void send_request(std::string const& msg)
         {
             msg_ = msg;
             async_write(
                 socket_, boost::asio::buffer(msg_),
                 bind_executor(
                     strand_, [/*this*/](error_code error, std::size_t length) {
                         if (!error) {
                             std::cout << "send data success, size : " << length << std::endl;
                         } else {
                             std::cout << "Write failed: " << error.message() << std::endl;
                         }
                     }));
         }
    
         void recv()
         {
             async_read(
                 socket_, buffer_, boost::asio::transfer_exactly(9),
                 boost::asio::bind_executor(
                     strand_, [this](error_code error, std::size_t length) {
                         if (!error) {
                             std::istream      buffer(&buffer_);
                             std::vector<char> msg(length, 0);
                             buffer.readsome(msg.data(), length);
    
                             msg_.assign(msg.begin(), msg.end());
                             std::cout << msg_ << std::endl;
    
                             send_request(msg_);
                             recv(); // already on the strand in this completion handler
                         } else {
                             std::cout << "Read failed: " << error.message() << std::endl;
                         }
                     }));
         }
    
         boost::asio::strand<boost::asio::io_context::executor_type> strand_;
         ssl::stream<tcp::socket>                                    socket_;
         boost::asio::streambuf                                      buffer_;
         std::string                                                 msg_;
     };
    
     int main(int argc, char* argv[])
     {
         try {
             if (argc != 3) {
                 std::cerr << "Usage: client <host> <port>\n";
                 return 1;
             }
    
             boost::asio::io_context io_context;
    
             tcp::resolver resolver(io_context);
             auto          endpoints = resolver.resolve(argv[1], argv[2]);
    
             ssl::context ctx(ssl::context::sslv23);
             ctx.load_verify_file("ca.pem");
    
             client              c(io_context, ctx, endpoints);
             boost::thread_group threadPool;
             for (size_t i = 0; i < boost::thread::hardware_concurrency(); ++i) {
                 threadPool.create_thread(
                     boost::bind(&boost::asio::io_context::run, &io_context));
             }
             threadPool.join_all();
             //io_context.run();
    
             return 0;
         } catch (std::exception const& e) {
             std::cerr << "Exception: " << e.what() << "\n";
             return 1;
         }
     }
    
  • File server.cpp

     #include <boost/asio.hpp>
     #include <boost/asio/ssl.hpp>
     #include <boost/bind/bind.hpp>
     #include <cstdlib>
     #include <functional>
     #include <iostream>
    
     namespace ssl = boost::asio::ssl;
     using boost::asio::ip::tcp;
     using boost::system::error_code;
    
     class session : public std::enable_shared_from_this<session> {
       public:
         session(tcp::socket socket, ssl::context& context)
             : socket_(std::move(socket), context)
             , m_strand(socket.get_executor())
         {
         }
    
         void start()
         {
             do_handshake();
         }
    
       private:
         void do_handshake()
         {
             auto self(shared_from_this());
             socket_.async_handshake(ssl::stream_base::server,
                                     [this, self](error_code error) {
                                         if (!error) {
                                             do_read();
                                         }
                                     });
         }
    
         void do_read()
         {
             auto self(shared_from_this());
    
             socket_.async_read_some(
                 boost::asio::buffer(data_),
                 [this, self](error_code ec, std::size_t length) {
                     if (!ec) {
                         std::cout << "get <";
                         std::cout.write(data_, length);
                         std::cout << std::endl;
                         do_write(length);
                     }
                 });
         }
    
         void do_write(std::size_t length)
         {
             auto self(shared_from_this());
             std::cout << "send <";
             std::cout.write(data_, length);
             std::cout << std::endl;
             boost::asio::async_write(
                 socket_, boost::asio::buffer(data_, length),
                 [this, self](error_code ec, std::size_t /*length*/) {
                     if (!ec) {
                         do_read();
                     }
                 });
         }
    
         ssl::stream<tcp::socket>                        socket_;
         boost::asio::strand<tcp::socket::executor_type> m_strand;
         char                                            data_[1024];
     };
    
     class server {
       public:
         server(boost::asio::io_context& io_context, unsigned short port)
             : acceptor_(io_context, tcp::endpoint(tcp::v4(), port))
             , context_(ssl::context::sslv23)
         {
             context_.set_options(ssl::context::default_workarounds |
                                  ssl::context::no_sslv2 |
                                  ssl::context::single_dh_use);
             context_.set_password_callback(std::bind(&server::get_password, this));
             context_.use_certificate_chain_file("server.pem");
             context_.use_private_key_file("server.pem", ssl::context::pem);
             context_.use_tmp_dh_file("dh2048.pem");
    
             do_accept();
         }
    
       private:
         std::string get_password() const
         {
             return "test";
         }
    
         void do_accept()
         {
             acceptor_.async_accept([this](error_code error, tcp::socket socket) {
                 if (!error) {
                     std::make_shared<session>(std::move(socket), context_)->start();
                 }
    
                 do_accept();
             });
         }
    
         tcp::acceptor acceptor_;
         ssl::context  context_;
     };
    
     int main(int argc, char* argv[])
     {
         try {
             if (argc != 2) {
                 std::cerr << "Usage: server <port>\n";
                 return 1;
             }
    
             boost::asio::io_context io_context;
    
             server s(io_context, std::atoi(argv[1]));
    
             io_context.run();
             return 0;
         } catch (std::exception const& e) {
             std::cerr << "Exception: " << e.what() << "\n";
             return 1;
         }
     }
    

Other Problems

Lifetime Error

UBSAN/ASAN didn't catch it,but this is wrong:

void send_request(const std::string& msg)
{
    async_write(
        socket_, boost::asio::buffer(msg),
        ...

The problem is the lifetime of msg, which disappears before the async operation got a chance to run, let alone complete. So, move the buffer so the lifetime is sufficient (e.g. member msg_).

Concurrent Writes

When the client locks up, it shows

send data success, size : 9
hello ssl
hello ssl
send data success, size : 9
send data success, size : 9

This indicates that a second hello ssl is received before send is initiated. This means that a second send is initiated. Under the hood this cancels a duplex synchronization object inside the ssl stream context. You can see this with -DBOOST_ASIO_ENABLE_HANDLER_TRACKING:

@asio|1630155694.209267|51139|[email protected]

Visualizing with the handlerviz.pl script:

enter image description here

The problem is violating the requirements here:

The program must ensure that the stream performs no other write operations (such as async_write, the stream's async_write_some function, or any other composed operations that perform writes) until this operation completes.

Two easy ways to fix it:

  1. change the IO from full duplex to sequential read/write/read/write just like the server
  2. make an output queue that contains messages still to be written in sequence

Fixed Solution

This uses an outbox as in the second solution for overlapping writes above. I've also taken the liberty to

  • remove the unnecessary intermediate buffer streambuf buffer_ instead reading directly into a string.
  • replace io_context + thread_group with the more elegant thread_pool
  • many minor improvements (some mentioned above)
  • File client.cpp

     #include <boost/asio.hpp>
     #include <boost/asio/ssl.hpp>
     #include <boost/thread.hpp>
     #include <cstdlib>
     #include <cstring>
     #include <functional>
     #include <iostream>
    
     using boost::asio::ip::tcp;
     using boost::system::error_code;
     using std::placeholders::_1;
     using std::placeholders::_2;
     namespace ssl = boost::asio::ssl;
    
     using Executor = boost::asio::thread_pool::executor_type;
    
     class client {
       public:
         client(Executor ex, ssl::context& context,
                const tcp::resolver::results_type& endpoints)
             : strand_(ex)
             , socket_(strand_, context)
         {
             socket_.set_verify_mode(ssl::verify_peer);
             socket_.set_verify_callback(
                 std::bind(&client::verify_certificate, this, _1, _2));
    
             connect(endpoints);
         }
    
       private:
         bool verify_certificate(bool preverified, ssl::verify_context& ctx)
         {
             char  subject_name[256];
             X509* cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
             X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
             std::cout << "Verifying " << subject_name << "\n";
    
             return true;
         }
    
         void connect(const tcp::resolver::results_type& endpoints)
         {
             async_connect( //
                 socket_.lowest_layer(), endpoints,
                 bind_executor(
                     strand_, [this](error_code error, const tcp::endpoint&) {
                         if (!error) {
                             handshake();
                         } else {
                             std::cout << "Connect failed: " << error.message() << "\n";
                         }
                     }));
         }
    
         void handshake()
         {
             socket_.async_handshake(
                 ssl::stream_base::client,
                 bind_executor(strand_, [this](error_code error) {
                     if (!error) {
                         send_request("hello ssl");
                         recv(); // already on the strand in this completion handler
                     } else {
                         std::cout << "Handshake failed: " << error.message()
                                   << "\n";
                     }
                 }));
         }
    
         void send_request(std::string msg)
         {
             outbox_.push_back(std::move(msg));
             if (outbox_.size() == 1)
             {
                 send_loop();
             }
         }
    
         void send_loop()
         {
             async_write( //
                 socket_, boost::asio::buffer(outbox_.back()),
                 bind_executor(
                     strand_, [this](error_code error, std::size_t length) {
                         if (!error) {
                             std::cout << "send data success, size : " << length << std::endl;
                             outbox_.pop_back();
                         } else {
                             std::cout << "Write failed: " << error.message() << std::endl;
                         }
                         if (!outbox_.empty())
                             send_loop();
                     }));
         }
    
         void recv()
         {
             async_read(
                 socket_, boost::asio::dynamic_buffer(buffer_), boost::asio::transfer_exactly(9),
                 boost::asio::bind_executor(
                     strand_, [this](error_code error, std::size_t length) {
                         if (!error) {
                             std::cout << buffer_ << std::endl;
    
                             send_request(std::move(buffer_));
                             recv(); // already on the strand in this completion handler
                         } else {
                             std::cout << "Read failed: " << error.message() << std::endl;
                         }
                     }));
         }
    
         boost::asio::strand<Executor> strand_;
         ssl::stream<tcp::socket>      socket_;
         std::string                   buffer_;
         std::deque<std::string>       outbox_;
     };
    
     int main(int argc, char* argv[])
     {
         try {
             if (argc != 3) {
                 std::cerr << "Usage: client <host> <port>\n";
                 return 1;
             }
    
             ssl::context ctx(ssl::context::sslv23);
             ctx.load_verify_file("ca.pem");
    
             boost::asio::thread_pool io;
             tcp::resolver resolver(io);
    
             client c(io.get_executor(), ctx, resolver.resolve(argv[1], argv[2]));
             io.join();
    
             return 0;
         } catch (std::exception const& e) {
             std::cerr << "Exception: " << e.what() << "\n";
             return 1;
         }
     }
    
  • File server.cpp

     #include <boost/asio.hpp>
     #include <boost/asio/ssl.hpp>
     #include <boost/bind/bind.hpp>
     #include <cstdlib>
     #include <functional>
     #include <iostream>
    
     namespace ssl = boost::asio::ssl;
     using boost::asio::ip::tcp;
     using boost::system::error_code;
    
     class session : public std::enable_shared_from_this<session> {
       public:
         session(tcp::socket socket, ssl::context& context)
             : socket_(std::move(socket), context)
         {
         }
    
         void start()
         {
             do_handshake();
         }
    
       private:
         void do_handshake()
         {
             auto self(shared_from_this());
             socket_.async_handshake(ssl::stream_base::server,
                                     [this, self](error_code error) {
                                         if (!error) {
                                             do_read();
                                         }
                                     });
         }
    
         void do_read()
         {
             auto self(shared_from_this());
    
             socket_.async_read_some(
                 boost::asio::buffer(data_),
                 [this, self](error_code ec, std::size_t length) {
                     if (!ec) {
                         std::cout << "get <";
                         std::cout.write(data_.data(), length);
                         std::cout << std::endl;
                         do_write(length);
                     }
                 });
         }
    
         void do_write(std::size_t length)
         {
             auto self(shared_from_this());
             std::cout << "send <";
             std::cout.write(data_.data(), length);
             std::cout << std::endl;
             boost::asio::async_write(
                 socket_, boost::asio::buffer(data_.data(), length),
                 [this, self](error_code ec, std::size_t /*length*/) {
                     if (!ec) {
                         do_read();
                     }
                 });
         }
    
         ssl::stream<tcp::socket> socket_;
         std::array<char, 1024>   data_;
     };
    
     class server {
       public:
         server(boost::asio::io_context& io_context, unsigned short port)
             : acceptor_(io_context, tcp::endpoint(tcp::v4(), port))
             , context_(ssl::context::sslv23)
         {
             context_.set_options(ssl::context::default_workarounds |
                                  ssl::context::no_sslv2 |
                                  ssl::context::single_dh_use);
             context_.set_password_callback(std::bind(&server::get_password, this));
             context_.use_certificate_chain_file("server.pem");
             context_.use_private_key_file("server.pem", ssl::context::pem);
             context_.use_tmp_dh_file("dh2048.pem");
    
             do_accept();
         }
    
       private:
         std::string get_password() const
         {
             return "test";
         }
    
         void do_accept()
         {
             acceptor_.async_accept([this](error_code error, tcp::socket socket) {
                 if (!error) {
                     std::make_shared<session>(std::move(socket), context_)->start();
                 }
    
                 do_accept();
             });
         }
    
         tcp::acceptor acceptor_;
         ssl::context  context_;
     };
    
     int main(int argc, char* argv[])
     {
         try {
             if (argc != 2) {
                 std::cerr << "Usage: server <port>\n";
                 return 1;
             }
    
             boost::asio::io_context io_context;
    
             server s(io_context, std::atoi(argv[1]));
    
             io_context.run();
             return 0;
         } catch (std::exception const& e) {
             std::cerr << "Exception: " << e.what() << "\n";
             return 1;
         }
     }
    

Live Demo:

enter image description here

As you can see (using a uniq -dc trick to suppress all non-repeating lines) now it happily continues in the case where multiple receives come in before send is initiated.

Upvotes: 2

Related Questions