Rubiks
Rubiks

Reputation: 481

TCP server that can handle two different client write request without blocking each other

I am trying to write a TCP server that can handle two different clients. I have a requestor and provider client. The provider is multithreaded and adds and removes new services to the server. Each time a new service is added or removed it should send it to the server and the server will print the update. The requestor client allows the user to enter a service and then checks the server to see if the service exists.

The problem I am running into is with the recv() function. I call it twice in my program, once to read from the producer client and the other to read from the requestor. The issue is the server receives only one message then freezes. It should update each time the threads runs. The problem seems to be occurring because the second recv() call blocks it becuase it is waiting on the requestor. I tried to make the second recv() call nonblocking by using a nonblocking event flag (MSG_DONTWAIT), but that did not resolve my issue.

How do I write a TCP server that can handle two different client write request and prevent them from blocking each other? My code is below.

Client 1- provider

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h>  
#include <unistd.h>
#include <time.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <stdbool.h>
#include <pthread.h>

#define PORT 8080

int SIZE = 100;
int counter = 0;
int semaphore = 1; //set to false

struct values 
       {
       int serviceArray[100]; 
       int portArray[100];  
       }input;

void callServer()
{
struct sockaddr_in address; 
    int sock = 0, valread; 
    struct sockaddr_in serv_addr; 

    if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
    { 
        printf("\n Socket creation error \n"); 

    } 

    memset(&serv_addr, '0', sizeof(serv_addr)); 

    serv_addr.sin_family = AF_INET; 
    serv_addr.sin_port = htons(PORT); 

    // Convert IPv4 and IPv6 addresses from text to binary form 
    if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr)<=0)  
    { 
        printf("\nInvalid address/ Address not supported \n"); 

    } 

    if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) 
    { 
        printf("\nConnection Failed \n"); 

    } 
    send(sock , &input , sizeof(input) , 0 ); 


} 






int arraySearch(int number){
    int i=0; 
    for(i=0; i< 100; i++){
        if(input.serviceArray[i] == number)
        {
            return 0; //is found
        }

    }

    return 1;
}


void *createService()
{

    while(counter < SIZE)
    {
        sleep(2);
        if(semaphore ==1)
        {

            int randomValue = rand() % SIZE;

            if(arraySearch(randomValue) == 1)
            {
                input.serviceArray[counter] = randomValue;
                input.portArray[counter] = randomValue + PORT;
                printf("Thread 1 is adding service number: %d and port number: %d\n", input.serviceArray[counter], input.portArray[counter]);
                semaphore = 0;//unlock
                counter = counter + 1;
                callServer();

            }
        }
    }
}
void *removeService()
{

    while(counter < SIZE)
    {
        sleep(4);
        if(semaphore ==0)
        {
            printf("Thread 2 is removing service number: %d and port number: %d\n", input.serviceArray[counter - 1], input.portArray[counter - 1]);
            input.serviceArray[counter -1] = 0;
            input.portArray[counter - 1] = 0;
            semaphore = 1; //lock
            callServer();

        }
    }
}



int main(void)
{

    //create threads
        pthread_t thread_id1, thread_id2;  


        pthread_create(&thread_id1, NULL, createService, NULL);
        pthread_create(&thread_id2, NULL, removeService, NULL); 


        pthread_join(thread_id1, NULL);  
        pthread_join(thread_id2, NULL); 



}

Client 2- Requestor

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h>  
#include <unistd.h>
#include <time.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <stdbool.h>
#include <pthread.h>
#define PORT 8080

void callServer(int serviceNum)
{
struct sockaddr_in address; 
    int sock = 0, valread; 
    struct sockaddr_in serv_addr; 

    if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
    { 
        printf("\n Socket creation error \n"); 

    } 

    memset(&serv_addr, '0', sizeof(serv_addr)); 

    serv_addr.sin_family = AF_INET; 
    serv_addr.sin_port = htons(PORT); 

    // Convert IPv4 and IPv6 addresses from text to binary form 
    if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr)<=0)  
    { 
        printf("\nInvalid address/ Address not supported \n"); 

    } 

    if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) 
    { 
        printf("\nConnection Failed \n"); 

    } 
    send(sock , &serviceNum , sizeof(serviceNum) , 0 ); 


} 


void main(){
int serviceNum;

printf("Which service would you like to run?\n");
scanf("%d",&serviceNum);
printf("You entered: %d", serviceNum);
callServer(serviceNum);

}

Server

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h>  
#include <unistd.h>
#include <time.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <stdbool.h>
#include <pthread.h>

#define PORT 8080 

 int sockfd; 
 int serviceNum;

    struct sockaddr_in servaddr, cliaddr; 

struct values {
       int serviceArray[100]; 
       int portArray[100];
       }input;






int main()
{

    int server_fd, server_fd2, new_socket, valread, valread2; 

    struct sockaddr_in address; 

    int opt = 1; 

    int addrlen = sizeof(address); 





    // Creating socket file descriptor 

    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 



    // Forcefully attaching socket to the port 8080 

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | 0, 

                                                  &opt, sizeof(opt))) 

    { 

        perror("setsockopt"); 

        exit(EXIT_FAILURE); 

    } 

    address.sin_family = AF_INET; 

    address.sin_addr.s_addr = INADDR_ANY; 

    address.sin_port = htons( PORT ); 



    // Forcefully attaching socket to the port 8080 

    if (bind(server_fd, (struct sockaddr *)&address,  

                                 sizeof(address))<0) 

    { 

        perror("bind failed"); 

        exit(EXIT_FAILURE); 

    } 

    if (listen(server_fd, 3) < 0) 

    { 

        perror("listen"); 

        exit(EXIT_FAILURE); 

    } 

    if ((new_socket = accept(server_fd, (struct sockaddr *)&address,  

                       (socklen_t*)&addrlen))<0) 

    { 

        perror("accept"); 

        exit(EXIT_FAILURE); 

    } 



        if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 


while(1){
   valread = recv( new_socket , &input, sizeof(input), 0); 




   int j;
        for(j = 0; j < 5; j++)
        {
            printf("service: %d and port: %d\n", input.serviceArray[j], input.portArray[j]);

        }

recv( new_socket , &serviceNum, sizeof(serviceNum), MSG_DONTWAIT)

    printf("The service number you passed is %d", serviceNum);

    }


} 

EDIT - This is an update to the server and is multithreaded. I still have blocking issues.

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h>  
#include <unistd.h>
#include <time.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <stdbool.h>
#include <pthread.h>

#define PORT 8080 

 int sockfd; 
 int serviceNum;

    struct sockaddr_in servaddr, cliaddr; 

struct values {
       int serviceArray[100]; 
       int portArray[100];
       }input;

int server_fd, server_fd2, new_socket, valread, valread2; 

    struct sockaddr_in address; 

    int opt = 1; 

    int addrlen = sizeof(address);     



void *producer()
{
     // Creating socket file descriptor 

    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 



    // Forcefully attaching socket to the port 8080 

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | 0, 

                                                  &opt, sizeof(opt))) 

    { 

        perror("setsockopt"); 

        exit(EXIT_FAILURE); 

    } 

    address.sin_family = AF_INET; 

    address.sin_addr.s_addr = INADDR_ANY; 

    address.sin_port = htons( PORT ); 



    // Forcefully attaching socket to the port 8080 

    if (bind(server_fd, (struct sockaddr *)&address,  

                                 sizeof(address))<0) 

    { 

        perror("bind failed"); 

        exit(EXIT_FAILURE); 

    } 

    if (listen(server_fd, 3) < 0) 

    { 

        perror("listen"); 

        exit(EXIT_FAILURE); 

    } 

    if ((new_socket = accept(server_fd, (struct sockaddr *)&address,  

                       (socklen_t*)&addrlen))<0) 

    { 

        perror("accept"); 

        exit(EXIT_FAILURE); 

    } 



        if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 
    while(1)
    {
   valread = read( new_socket , &input, sizeof(input));   
   int j;
        for(j = 0; j < 5; j++)
        {
            printf("service: %d and port: %d\n", input.serviceArray[j], input.portArray[j]);

        }
}
}

void *requestor()
{
     // Creating socket file descriptor 

    if ((server_fd2 = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 



    // Forcefully attaching socket to the port 8080 

    if (setsockopt(server_fd2, SOL_SOCKET, SO_REUSEADDR | 0, 

                                                  &opt, sizeof(opt))) 

    { 

        perror("setsockopt"); 

        exit(EXIT_FAILURE); 

    } 

    address.sin_family = AF_INET; 

    address.sin_addr.s_addr = INADDR_ANY; 

    address.sin_port = htons(8090); 





    if (bind(server_fd2, (struct sockaddr *)&address,  

                                 sizeof(address))<0) 

    { 

        perror("bind failed"); 

        exit(EXIT_FAILURE); 

    } 

    if (listen(server_fd2, 3) < 0) 

    { 

        perror("listen"); 

        exit(EXIT_FAILURE); 

    } 

    if ((new_socket = accept(server_fd2, (struct sockaddr *)&address,  

                       (socklen_t*)&addrlen))<0) 

    { 

        perror("accept"); 

        exit(EXIT_FAILURE); 

    } 



        if ((server_fd2 = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 
    while(1)
    {
    valread2 = read( new_socket , &serviceNum, sizeof(serviceNum)); 
    printf("The service number you passed is %d", serviceNum);
    }
}


int main()
{

//create threads
        pthread_t thread_id1, thread_id2;  


        pthread_create(&thread_id1, NULL, producer, NULL);
        pthread_create(&thread_id2, NULL, requestor, NULL); 


        pthread_join(thread_id1, NULL);  
        pthread_join(thread_id2, NULL); 







} 

Upvotes: 0

Views: 60

Answers (1)

Remy Lebeau
Remy Lebeau

Reputation: 595847

By default, a socket operates in blocking mode. So, it makes sense that when you call recv() on one socket, it will block other sockets on the same thread while waiting for data to arrive.

For what you are attempting to do, you need to change your server code to either:

  • leave each accepted socket in blocking mode, and operate on them in their own individual worker threads or forked processes.

  • switch each accepted socket to non-blocking mode (fctrl(FIONBIO), etc), and then use select(), (e)poll(), or other similar mechanism to monitor the sockets together in one thread, and then react to when it tells you when individual sockets have activity you need to handle (ie, don't read from a socket until there is actually something available to be read, etc).

  • (Windows only) use each accepted socket asynchronously by using Overlapped I/O operations. Let the OS notify you when there is activity on each socket.

Upvotes: 3

Related Questions