Reputation: 367
I am writing an efficient socket server. The intention is good overall throughput. I use the main thread as the listener. It async_accept
a client and add the socket to a queue. There's a dispatcher threads picks up a socket, who is ready to be read from, from the queue and added to one of the worker threads' queue. I keep a pool of worker threads. A worker thread will do actual read/write.
I use async_accept
in my listener. To find out which socket is ready for read, I use async_read_some in my dispatcher. This idea works, but with a problem. My io_service.run()
is called in listener, so the handler of async_read_some
in dispatcher, is actually run in the listener thread.
Here's my code:
using boost::asio::ip::tcp;
using namespace std;
std::queue<std::shared_ptr<tcp::socket>> q_sock;
boost::mutex m_log1;
boost::condition_variable m_cond1;
boost::mutex::scoped_lock m_lock1 = boost::mutex::scoped_lock(m_log1);
sem_t _sem_sock;
enum { max_length1 = 1024 };
char data_1[max_length1];
void handle_read1(std::shared_ptr<tcp::socket> sock, const boost::system::error_code& error,
size_t bytes_transferred)
{
printf("handle_read1 : error : %s : %d, thread id is: %ld, pid : %d \n", error.category().name(), error.value(), (long int)syscall(SYS_gettid), getpid());
boost::asio::write(*(sock.get()), boost::asio::buffer(data_1, bytes_transferred));
}
void sock_dispatch() {
int v_size = 0;
std::shared_ptr<tcp::socket> curr_sock;
printf("sock_dispatch started. The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());
while(1) {
while(1) {
sem_wait(&_sem_sock);
v_size = q_sock.size();
sem_post(&_sem_sock);
if(v_size <= 0)
m_cond1.timed_wait(m_lock1,boost::posix_time::milliseconds(5000));
else
break;
}
sem_wait(&_sem_sock);
curr_sock = q_sock.front();
q_sock.pop();
sem_post(&_sem_sock);
curr_sock->async_read_some(boost::asio::buffer(data_1, max_length1),
boost::bind(handle_read1, curr_sock,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
class session
{
public:
session(boost::asio::io_service& io_service)
: sockptr(new tcp::socket(io_service)) {}
void start()
{
printf("START NEW SESSION The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());
sem_wait(&_sem_sock);
q_sock.push(sockptr);
sem_post(&_sem_sock);
m_cond1.notify_all();
}
std::shared_ptr<tcp::socket> sockptr;
};
class server
{
public:
server(boost::asio::io_service& io_service, short port)
: io_service_(io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
{
session* new_session = new session(io_service_);
acceptor_.async_accept(*(new_session->sockptr.get()),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
printf("WAITING TO ACCEPT: The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());
}
void handle_accept(session* new_session,
const boost::system::error_code& error)
{
new_session->start();
new_session = new session(io_service_);
acceptor_.async_accept(*(new_session->sockptr.get()),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
private:
boost::asio::io_service& io_service_;
tcp::acceptor acceptor_;
};
int main(int argc, char* argv[])
{
sem_init(&_sem_sock, 0, 1);
boost::asio::io_service io_service;
using namespace std;
server s(io_service, atoi(argv[1]));
boost::thread t(boost::bind(sock_dispatch));
io_service.run();
return 0;
}
This code is modified from a boost::asio example, http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/async_tcp_echo_server.cpp. And the client code is http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/blocking_tcp_echo_client.cpp.
When a client connects, the output of server:
WAITING TO ACCEPT: The ID of this of this thread is: 3843, pid : 3843
sock_dispatch started. The ID of this of this thread is: 3844, pid : 3843
START NEW SESSION The ID of this of this thread is: 3843, pid : 3843
handle_read1 : error : system : 0, thread id is: 3843, pid : 3843
In this case the dispatcher thread id is 3944, but the handle_read1 is run in thread 3843. Ideally, handle_read1 should run in dispatcher, so it won't block accept in listener.
Any idea what I should do to achieve this? Or there's better design for the whole thing at all :)?
Upvotes: 0
Views: 1619
Reputation: 51871
If you need specific handlers being invoked in specific threads, then use different io_service
objects. For example, the acceptor
could be constructed with io_service1
, and the sockets could be constructed with io_service2
. The main thread could then perform io_service1.run()
, while the threads in the thread pool perform io_service2.run()
.
With that said, mixing asynchronous and synchronous functionality can be rather tricky. In most asynchronous programs I have worked on, there is rarely a need to dedicate a thread to specific asynchronous chains.
Overall, I think the conceptual design is fine, but I have a few suggestions for the implementation:
The q_sock
consumer and producer code is a mix of of higher level and lower level constructs. The use of the condition variable is a bit nonidiomatic, and it begs the question as to why sem_t
is being used in place of boost::mutex
, and locks. For example, the following consumer and producer code:
// Consumer
while(1)
{
sem_wait(&_sem_sock);
v_size = q_sock.size();
sem_post(&_sem_sock);
if (v_size <= 0)
m_cond1.timed_wait(m_lock1, boost::posix_time::milliseconds(5000));
else
break;
}
sem_wait(&_sem_sock);
curr_sock = q_sock.front();
q_sock.pop();
sem_post(&_sem_sock);
// Producer
sem_wait(&_sem_sock);
q_sock.push(sockptr);
sem_post(&_sem_sock);
m_cond1.notify_all();
Could be rewritten without the use of sem_t
, and be a bit more idiomatic based on the Boost.Thread's condition_variable
documentation. Consider the alternative:
// Consumer
boost::unique_lock<boost::mutex> lock(m_log1);
while (q_sock.empty())
{
m_cond1.wait(lock);
}
curr_sock = q_sock.front();
q_sock.pop();
lock.unlock();
// Producer
{
boost::lock_guard<boost::mutex> lock(m_log1);
q_sock.push(sockptr);
}
m_cond1.notify_all();
It is unclear as to what functionality session
provides.
session::sockptr
is managed via a smart pointer, but session
is not. With session
not being managed via a smart pointer, a memory leak occurs in server::handle_accept
, as the handle to session
is lost in the reassignment.Identify what functionality session
is to provide, and design the interface around that.
handle_read1
, may need to become member functions.session
has its own asynchronous chain, and is providing itself to handlers, then consider using enable_shared_from_this
. The Boost.Asio tutorial provides an example usage, as does a few of the examples.At the moment, async_read_some
is not indicating which socket is ready to be read. By the time the ReadHandler
has been invoked, data has been read.
This is the fundamental difference between a Proactor and Reactor. If you need Reactor style operations, then use boost::asio::null_buffers
. See this documentation for more details. However, there are consequences to each approach. Thus, it is critical to understand these consequences so that the best decision can be made.
With Boost.Asio providing event demultiplexing through high-level constructs, the sock_dispatch
thread may seem impractical. The session::start
member function could initiate the asynchronous read on the socket. This minor change would eliminate the need for q_sock
, and all synchronization constructs in the example code.
Examine why the synchronous write must be used. In the case of echo clients, as shown in the example, it is often the case that asynchronous writes can be used by controlling the flow of the asynchronous chain itself to remove resource contention. This allows for each connection to have its own buffer, that can be used for both reading and writing.
Upvotes: 2