Andrew
Andrew

Reputation: 11

Asynchronous sending data using kqueue

I have a server written in plain-old C accepting TCP connections using kqueue on FreeBSD.

Incoming connections are accepted and added to a simple connection pool to keep track of the file handle.

When data is received (on EVFILT_READ), I call recv() and then I put the payload in a message queue for a different thread to process it. Receiving and processing data this way works perfect.

When the processing thread is done, it may need to send something back to the client. Since the processing thread has access to the connection pool and can easily get the file handle, I'm simply calling send() from the processing thread.

This works 99% of the time, but every now and then kqueue gives me a EV_EOF flag, and the connection is dropped.

There is a clear correlation between the frequency of the calls to send() and the number of EV_EOF errors, so I have a feeling the EV_EOF due to some race condition between my kqueue thread and the processing thread.

The calls to send() always returns the expected byte count, so I'm not filling up the tx buffer.

So my question; Is it acceptable to call send() from a separate thread as described here? If not, what would be the right way to send data back to the clients asynchronously?

All the examples I find calls send() in the same context as the kqueue loop, but my processing threads may need to send back data at any time - even minutes after the last received data from the client - so obviously I can't block the kqueue loop for that time..

Relevant code snippets:

void    *tcp_srvthread(void *arg)
{
    [[...Bunch of declarations...]]

    tcp_serversocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);        
    ...
    setsockopt(tcp_serversocket, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(int));            
    ...
    err = bind(tcp_serversocket, (const struct sockaddr*)&sa, sizeof(sa));
    ...
    err = listen(tcp_serversocket, 10);
    ...
    kq = kqueue();    
    EV_SET(&evSet, tcp_serversocket, EVFILT_READ | EV_CLEAR, EV_ADD, 0, 0, NULL);
    ...
    while(!fTerminated) {
        timeout.tv_sec = 2;  timeout.tv_nsec = 0;
        nev = kevent(kq, &evSet, 0, evList, NLIST, &timeout);

        for (i=0; i<nev; i++) {            
            if (evList[i].ident == tcp_serversocket) {                              // new connection?
                socklen = sizeof(addr);
                fd = accept(evList[i].ident, &addr, &socklen);   // accept it
                if(fd > 0) {                                                       // accept ok?
                    uidx = conn_add(fd, (struct sockaddr_in *)&addr);               // Add it to connected controllers
                    if(uidx >= 0) {                                                 // add ok?                        

                        EV_SET(&evSet, fd, EVFILT_READ | EV_CLEAR, EV_ADD, 0, 0, (void*)(uint64_t)(0x00E20000 | uidx));        // monitor events from it
                        if (kevent(kq, &evSet, 1, NULL, 0, NULL) == -1) {           // monitor ok?
                            conn_delete(uidx);                                      // ..no, so delete it from my list also
                        }
                    } else {                                                        // no room on server?
                        close(fd);
                    }
                }
                else Log(0, "ERR: accept fd=%d", fd);
            }
            else
            if (evList[i].flags & EV_EOF) {
                [[ ** THIS IS CALLED SOMETIMES AFTER CALLING SEND - WHY??  ** ]]
                uidx = (uint32_t)evList[i].udata;                
                conn_delete( uidx );               
            }
            else
            if (evList[i].filter == EVFILT_READ) {                  
                if((nr = recv(evList[i].ident, buf, sizeof(buf)-2, 0)) >     0) {
                    uidx = (uint32_t)evList[i].udata;
                    recv_data(uidx, buf, nr);    // This will queue the message for the processing thread
                }
            }
        }
        else {
            // should not get here.
        }

    }
}

The processing thread looks something like this (obviously there's a lot of data manipulation going on in addition to what's shown) :

void    *parsethread(void *arg)
{         
    int                 len;
    tmsg_Queue          mq;
    char                is_ok;

    while(!fTerminated) {
        if((len = msgrcv(msgRxQ, &mq, sizeof(tmsg_Queue), 0, 0)) > 0) {            
             if( process_message(mq) ) {
                 [[ processing will find the uidx of the client and build the return data ]]
                 send( ctl[uidx].fd, replydata, replydataLen, 0 );
             }
        }
    }
}

Appreciate any ideas or nudges in the right direction. Thanks.

Upvotes: 1

Views: 514

Answers (1)

NitroMelon
NitroMelon

Reputation: 46

EV_EOF

If you write to a socket after the peer closed the reading part of it, you will receive a RST, which triggered EVFILT_READ with EV_EOF set.

Async

You should try aio_read and aio_write.

Upvotes: 1

Related Questions