Reputation: 11
I have a server written in plain-old C accepting TCP connections using kqueue on FreeBSD.
Incoming connections are accepted and added to a simple connection pool to keep track of the file handle.
When data is received (on EVFILT_READ), I call recv() and then I put the payload in a message queue for a different thread to process it. Receiving and processing data this way works perfect.
When the processing thread is done, it may need to send something back to the client. Since the processing thread has access to the connection pool and can easily get the file handle, I'm simply calling send() from the processing thread.
This works 99% of the time, but every now and then kqueue gives me a EV_EOF flag, and the connection is dropped.
There is a clear correlation between the frequency of the calls to send() and the number of EV_EOF errors, so I have a feeling the EV_EOF due to some race condition between my kqueue thread and the processing thread.
The calls to send() always returns the expected byte count, so I'm not filling up the tx buffer.
So my question; Is it acceptable to call send() from a separate thread as described here? If not, what would be the right way to send data back to the clients asynchronously?
All the examples I find calls send() in the same context as the kqueue loop, but my processing threads may need to send back data at any time - even minutes after the last received data from the client - so obviously I can't block the kqueue loop for that time..
Relevant code snippets:
void *tcp_srvthread(void *arg)
{
[[...Bunch of declarations...]]
tcp_serversocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
...
setsockopt(tcp_serversocket, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(int));
...
err = bind(tcp_serversocket, (const struct sockaddr*)&sa, sizeof(sa));
...
err = listen(tcp_serversocket, 10);
...
kq = kqueue();
EV_SET(&evSet, tcp_serversocket, EVFILT_READ | EV_CLEAR, EV_ADD, 0, 0, NULL);
...
while(!fTerminated) {
timeout.tv_sec = 2; timeout.tv_nsec = 0;
nev = kevent(kq, &evSet, 0, evList, NLIST, &timeout);
for (i=0; i<nev; i++) {
if (evList[i].ident == tcp_serversocket) { // new connection?
socklen = sizeof(addr);
fd = accept(evList[i].ident, &addr, &socklen); // accept it
if(fd > 0) { // accept ok?
uidx = conn_add(fd, (struct sockaddr_in *)&addr); // Add it to connected controllers
if(uidx >= 0) { // add ok?
EV_SET(&evSet, fd, EVFILT_READ | EV_CLEAR, EV_ADD, 0, 0, (void*)(uint64_t)(0x00E20000 | uidx)); // monitor events from it
if (kevent(kq, &evSet, 1, NULL, 0, NULL) == -1) { // monitor ok?
conn_delete(uidx); // ..no, so delete it from my list also
}
} else { // no room on server?
close(fd);
}
}
else Log(0, "ERR: accept fd=%d", fd);
}
else
if (evList[i].flags & EV_EOF) {
[[ ** THIS IS CALLED SOMETIMES AFTER CALLING SEND - WHY?? ** ]]
uidx = (uint32_t)evList[i].udata;
conn_delete( uidx );
}
else
if (evList[i].filter == EVFILT_READ) {
if((nr = recv(evList[i].ident, buf, sizeof(buf)-2, 0)) > 0) {
uidx = (uint32_t)evList[i].udata;
recv_data(uidx, buf, nr); // This will queue the message for the processing thread
}
}
}
else {
// should not get here.
}
}
}
The processing thread looks something like this (obviously there's a lot of data manipulation going on in addition to what's shown) :
void *parsethread(void *arg)
{
int len;
tmsg_Queue mq;
char is_ok;
while(!fTerminated) {
if((len = msgrcv(msgRxQ, &mq, sizeof(tmsg_Queue), 0, 0)) > 0) {
if( process_message(mq) ) {
[[ processing will find the uidx of the client and build the return data ]]
send( ctl[uidx].fd, replydata, replydataLen, 0 );
}
}
}
}
Appreciate any ideas or nudges in the right direction. Thanks.
Upvotes: 1
Views: 514
Reputation: 46
If you write to a socket after the peer closed the reading part of it, you will receive a RST, which triggered EVFILT_READ with EV_EOF set.
You should try aio_read and aio_write.
Upvotes: 1