aripod
aripod

Reputation: 57

Using select with multiple sockets

I am using lwip with one thread (based on FreeRTOS, where I only want one task). There, I have 2 sockets listening for incoming connections. Therefore, I want to use select to wait for incoming new connections and also I don't want to block while waiting. Therefore, after creating both sockets, binding and listening I go to my infinitive loop:

#define TOTAL_MASTERS   2

fd_set current_sockets, ready_sockets;
struct timeval timeout;,
int master1_fd, master2_fd;
struct sockaddr_in address;


// Master 1
if ((master1_fd = lwip_socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    xil_printf("TCP server: Error creating Socket\r\n");
    return;
}

// Set up to wait for subscribers
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port1);
address.sin_addr.s_addr = INADDR_ANY;

if (n=bind(master1_fd, (struct sockaddr *)&address, sizeof (address)) < 0) {
    port = port11;
    xil_printf("[err: %d] TCP server: Unable to bind to port %d\r\n",n, ((port&0xFF)<<8) | ((port>>8)&0xFF));
    close(sock);
    return;
}

if (n=listen(master1_fd, 1) < 0) {
    xil_printf("[err: %d] TCP server: tcp_listen failed\r\n", n);
    close(master1_fd);
    return;
}

// Master 2
if ((master2_fd = lwip_socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    xil_printf("TCP server: Error creating Socket\r\n");
    return;
}

// Set up to wait for subscribers
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port2);
address.sin_addr.s_addr = INADDR_ANY;

if (n=bind(master2_fd, (struct sockaddr *)&address, sizeof (address)) < 0) {
    port = port22;
    xil_printf("[err: %d] TCP server: Unable to bind to port %d\r\n",n, ((port&0xFF)<<8) | ((port>>8)&0xFF));
    close(sock);
    return;
}

if (n=listen(master2_fd, 1) < 0) {
    xil_printf("[err: %d] TCP server: tcp_listen failed\r\n", n);
    close(master2_fd);
    return;
}

FD_ZERO(&current_sockets);
FD_SET(master1_fd, &current_sockets);
FD_SET(master2_fd, &current_sockets);

timeout.tv_sec = 0;
timeout.tv_usec = 5000; // 5ms for timeout

while(1)
{
    FD_ZERO(&ready_sockets);
    ready_sockets = current_sockets;

    if(sret = select(TOTAL_PUBLISHERS, &current_sockets, NULL, NULL, &timeout) == 0)
    {
        //timeout
        //xil_printf("Select time out: %d\n", sret);
    }
    else
    {
        xil_printf("Something happened: %d\n", sret);
        for(int i=0; i<TOTAL_MASTERS; i++){
            if(FD_ISSET(i, &ready_sockets)){
                if(i == publisher_FPGA_ROS_mymsg.socket_to_wait_for_subscribers_fd){
                    if ((new_sd = lwip_accept(master1_fd, (struct sockaddr *)&remote, (socklen_t *)&size)) > 0){
                        if ((read_bytes = lwip_recvfrom(new_sd, message, ARR_SIZE, 0, NULL, NULL)) > 0) {
                            xil_printf("New client on master 1:\n%s", message);
                        }
                    }
                }
                if(i == publisher_FPGA_ROS_geometry_msgs_point.socket_to_wait_for_subscribers_fd){
                    if ((new_sd = lwip_accept(master2_fd, (struct sockaddr *)&remote, (socklen_t *)&size)) > 0){
                        if ((read_bytes = lwip_recvfrom(new_sd, message, ARR_SIZE, 0, NULL, NULL)) > 0) {
                            xil_printf("New client on master 2:\n%s", message);
                        }
                    }
                }
            }
        }
    }
}

The problem that I have is that with a timeout it does not react to incoming new clients but always times out. If I change &timeout in selectto NULL, then I get incoming connections but only on master1.

Is it possible to wait for incoming connections on more than one socket using only one thread or task?

Thanks for the help.

Upvotes: 0

Views: 2128

Answers (1)

Remy Lebeau
Remy Lebeau

Reputation: 597166

There are a number of mistakes in your code.

You are mixing lwip and C socket functions. Use only lwip functions for consistency.

Your bind(), listen(), and select() expressions are missing required parenthesis, since < and == have a higher precedence than =. You are using parenthesis correctly on the lwip_socket(), lwip_accept() and lwip_recvfrom() expressions, though. But really, it is generally considered bad practice to perform assignment and comparison in the same expression, you really should break those up into separate expressions.

You are passing the wrong max descriptor value to the 1st parameter of select(). It needs to be +1 more than the highest socket descriptor you are select()'ing.

You are passing your master list current_sockets to the 2nd parameter of select() instead of passing your ready_sockets copy. So current_sockets is being modified and may not contain all of the listening sockets anymore on the next call to select(). After a few calls, it is likely to end up completely empty.

Your FD_ISSET() check is wrong, too. You are checking file descriptors 0 and 1, which are not the listening sockets you created. You don't need the for loop to check master1_fd and master2_fd, you can pass them as-is to FD_ISSET().

With that said, try this instead:

fd_set current_sockets, ready_sockets;
struct timeval timeout;,
int master1_fd, master2_fd;
struct sockaddr_in address, remote;
socklen_t size;


// Master 1
master1_fd = lwip_socket(AF_INET, SOCK_STREAM, 0);
if (master1_fd < 0) {
    xil_printf("[err: %d] TCP server: Error creating Socket\r\n", errno);
    return;
}

// Set up to wait for subscribers
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port1);
address.sin_addr.s_addr = INADDR_ANY;

n = lwip_bind(master1_fd, (struct sockaddr *)&address, sizeof (address));
if (n < 0) {
    xil_printf("[err: %d] TCP server: Unable to bind to port %hd\r\n", errno, port1);
    lwip_close(master1_fd);
    return;
}

n = lwip_listen(master1_fd, 1);
if (n < 0) {
    xil_printf("[err: %d] TCP server: tcp_listen failed\r\n", errno);
    lwip_close(master1_fd);
    return;
}

// Master 2
master2_fd = lwip_socket(AF_INET, SOCK_STREAM, 0);
if (master2_fd < 0) {
    xil_printf("[err: %d] TCP server: Error creating Socket\r\n", errno);
    lwip_close(master1_fd);
    return;
}

// Set up to wait for subscribers
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port2);
address.sin_addr.s_addr = INADDR_ANY;

n = lwip_bind(master2_fd, (struct sockaddr *)&address, sizeof (address));
if (n < 0) {
    xil_printf("[err: %d] TCP server: Unable to bind to port %hd\r\n", errno, port2);
    lwip_close(master2_fd);
    lwip_close(master1_fd);
    return;
}

n = lwip_listen(master2_fd, 1);
if (n < 0) {
    xil_printf("[err: %d] TCP server: tcp_listen failed\r\n", errno);
    lwip_close(master2_fd);
    lwip_close(master1_fd);
    return;
}

FD_ZERO(&current_sockets);
FD_SET(master1_fd, &current_sockets);
FD_SET(master2_fd, &current_sockets);

int max_fd;
if (master1_fd > master2_fd)
    max_fd = master1_fd;
else
    max_fd = master2_fd;

while (1)
{
    FD_ZERO(&ready_sockets);
    ready_sockets = current_sockets;

    timeout.tv_sec = 0;
    timeout.tv_usec = 5000; // 5ms for timeout

    sret = lwip_select(max_fd+1, &ready_sockets, NULL, NULL, &timeout);
    if (sret < 0)
    {
        //error
        //xil_printf("Select error: %d\n", errno);
    }
    else if (sret == 0)
    {
        //timeout
        //xil_printf("Select time out\n");
    }
    else
    {
        xil_printf("Something happened\n");

        if (FD_ISSET(master1_fd, &ready_sockets)){
            size = sizeof (remote);
            new_sd = lwip_accept(master1_fd, (struct sockaddr *)&remote, &size)
            if (new_sd > 0){
                read_bytes = lwip_recv(new_sd, message, ARR_SIZE, 0);
                if (read_bytes > 0) {
                    xil_printf("New client on master 1:\n%.*s", read_bytes, message);
                }
            }
        }
        if (FD_ISSET(master2_fd, &ready_sockets)){
            size = sizeof (remote);
            new_sd = lwip_accept(master2_fd, (struct sockaddr *)&remote, &size);
            if (new_sd > 0){
                read_bytes = lwip_recv(new_sd, message, ARR_SIZE, 0);
                if (read_bytes > 0) {
                    xil_printf("New client on master 2:\n%.*s", read_bytes, message);
                }
            }
        }
    }
}

However, you still have a big logic hole in your code. You are leaking client sockets, as you never close() the socket descriptors that lwip_accept() returns. And you need to select() the accepted sockets to know when they have data available to be read.

So try something more like this instead:

fd_set ready_sockets;
struct timeval timeout;
int master1_fd, master2_fd, max_fd, curr_fd, i;
int[] sockets; // <-- PSEUDO-CODE!!
struct sockaddr_in address, remote;
socklen_t size;


// Master 1
master1_fd = lwip_socket(AF_INET, SOCK_STREAM, 0);
if (master1_fd < 0) {
    xil_printf("[err: %d] TCP server: Error creating Socket\r\n", errno);
    return;
}

// Set up to wait for subscribers
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port1);
address.sin_addr.s_addr = INADDR_ANY;

n = lwip_bind(master1_fd, (struct sockaddr *)&address, sizeof (address));
if (n < 0) {
    xil_printf("[err: %d] TCP server: Unable to bind to port %hd\r\n", errno, port1);
    lwip_close(master1_fd);
    return;
}

n = lwip_listen(master1_fd, 1);
if (n < 0) {
    xil_printf("[err: %d] TCP server: tcp_listen failed\r\n", errno);
    lwip_close(master1_fd);
    return;
}

// Master 2
master2_fd = lwip_socket(AF_INET, SOCK_STREAM, 0);
if (master2_fd < 0) {
    xil_printf("[err: %d] TCP server: Error creating Socket\r\n", errno);
    lwip_close(master1_fd);
    return;
}

// Set up to wait for subscribers
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port2);
address.sin_addr.s_addr = INADDR_ANY;

n = lwip_bind(master2_fd, (struct sockaddr *)&address, sizeof (address));
if (n < 0) {
    xil_printf("[err: %d] TCP server: Unable to bind to port %hd\r\n", errno, port2);
    lwip_close(master2_fd);
    lwip_close(master1_fd);
    return;
}

n = lwip_listen(master2_fd, 1);
if (n < 0) {
    xil_printf("[err: %d] TCP server: tcp_listen failed\r\n", errno);
    lwip_close(master2_fd);
    lwip_close(master1_fd);
    return;
}

sockets.add(master1_fd); // <-- PSEUDO-CODE!!!
sockets.add(master2_fd); // <-- PSEUDO-CODE!!!

while (1)
{
    max_fd = -1;
    for (int i = 0; i < sockets.length; ++i){ // <-- PSEUDO-CODE!!!
        curr_fd = sockets[i];
        FD_SET(curr_fd, &ready_sockets);
        if (curr_fd > max_fd){
            max_fd = curr_fd;
        }
    }

    timeout.tv_sec = 0;
    timeout.tv_usec = 5000; // 5ms for timeout

    sret = lwip_select(max_fd+1, &ready_sockets, NULL, NULL, &timeout);
    if (sret < 0)
    {
        //error
        //xil_printf("Select error: %d\n", errno);
    }
    else if (sret == 0)
    {
        //timeout
        //xil_printf("Select time out\n");
    }
    else
    {
        xil_printf("Something happened\n");

        i = 0;
        while (i < sockets.length){ // <-- PSEUDO-CODE!!!
            curr_fd = sockets[i];
            if (FD_ISSET(curr_fd, &ready_sockets)){
                if ((curr_fd == master1_fd) || (curr_fd == master2_fd)){
                    size = sizeof (remote);
                    new_sd = lwip_accept(curr_fd, (struct sockaddr *)&remote, &size)
                    if (new_sd > 0){
                        xil_printf("New client on master %d:\n", (curr_fd == master1_fd) ? 1 : 2);
                        sockets.add(new_sd); // <-- PSEUDO-CODE!!!
                    }
                }
                else{
                    read_bytes = lwip_recv(curr_fd, message, ARR_SIZE, 0);
                    if (read_bytes > 0) {
                        xil_printf("%.*s", read_bytes, message);
                    }
                    else {
                        lwip_close(curr_fd);
                        sockets.remove(i); // <-- PSEUDO-CODE!!!
                        continue;
                    }
                }
            }
            ++i;
        }
    }
}

for(i = 0; i < sockets.length; ++i){ // <-- PSEUDO-CODE!!!
    lwip_close(sockets[i]);
}

Regarding the PSEUDO-CODE portions of the above code, you did not indicate whether you are using C or C++. In C++, you could simple use a std::vector for the sockets list. But in C, you would have to decide whether you want to use a fixed array with a max capacity or use a dynamically sized array. I didn't really feel like writing a bunch of extra code to manage the list. I leave that as an exercise for you to figure out. How you store the sockets outside of the fd_set being select()'ed is not important to this topic.

Upvotes: 3

Related Questions