QenBau
QenBau

Reputation: 405

how to "reuse" a thread multiple times in C?

I need to create a manager-worker server where a worker only handles one request at a time. In the code I thought of, the manager stores the file descriptors in a queue; the thread retrieves the file descriptor and handles the request for it.

My problem is that, in the current code, N threads are created at the beginning which are waiting to handle N requests; but once the N requests are handled, the clientFun() function no longer runs as the initial threads have finished their work.

Server Code:

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include "utils.h"
#include "conn.h"

#define DIM_BUFFER 100
#define N_THREADS 1

struct nodo
{
    int fd;
    struct nodo *prossimoPtr;
};
typedef struct nodo Nodo;
typedef Nodo *NodoPtr;

static Nodo *testaPtr = NULL;
static Nodo *codaPtr = NULL;

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t emptyFd = PTHREAD_COND_INITIALIZER;

unsigned int updateMaxSelect(int maxFd, fd_set set);
static void run_server(int pipeW2M_Read);
void push(NodoPtr *testaPtrF, NodoPtr *codaPtrF, int fdF);
static void *clientFun(void *pipeW2M_WriteF);
int pop(NodoPtr *lPtrF);
void gestioneCoda(int maxFdF, int fd, fd_set set);
void stampa(NodoPtr lPtrF);
void cleanup();

int main()
{
    cleanup();
    atexit(cleanup);

    int pipeW2M[2];
    SYSCALL(pipe(pipeW2M), "Errore: pipe(pipeW2M)")

    pthread_t threadFd[N_THREADS];
    for(int i = 0; i < N_THREADS; i++)
    {
        THREAD_CREATE(&threadFd[i], NULL, &clientFun, (void *) &pipeW2M[WRITE_END], "Thread setId")
    }

    run_server(pipeW2M[READ_END]);
    SYSCALL(close(pipeW2M[WRITE_END]), "Errore: close(pipeW2M[WRITE_END])")
    SYSCALL(close(pipeW2M[READ_END]), "Errore: close(pipeW2M[READ_END])")


    for(int i = 0; i < N_THREADS; i++)
    {
        THREAD_JOIN(threadFd[i], NULL, "Impossibile fare la join: seetId");
    }
}

static void run_server(int pipeW2M_Read)
{
    //Socket di connessione
    int fdSkt;
    RETURN_SYSCALL(fdSkt, socket(AF_UNIX, SOCK_STREAM, 0), "Errore creazione socket - fdSkt")
    struct sockaddr_un sckAddr;
    strncpy(sckAddr.sun_path, SOCKNAME, MAXBACKLOG);
    sckAddr.sun_family = AF_UNIX;
    SYSCALL(bind(fdSkt, (struct sockaddr *) &sckAddr, sizeof(sckAddr)), "Errore bind - fdSkt")
    SYSCALL(listen(fdSkt, SOMAXCONN), "Errore listen - fdSkt")

    //Massimo fd attivo
    int maxFd = fdSkt;

    //Inizializzazione set
    fd_set set, readSet;
    FD_ZERO(&set);
    FD_SET(fdSkt, &set); //FD_SET imposta a 1 il bit corrispondente a fdSkt
    FD_SET(pipeW2M_Read, &set);
    if(pipeW2M_Read > maxFd)
    {
        maxFd = pipeW2M_Read;
    }

    int fdSkt_accept;
    while(1)
    {
        readSet = set;
        SYSCALL(select(maxFd + 1, &readSet, NULL, NULL, NULL), "select(fd_num + 1, &rdset, NULL, NULL, NULL)")

        for(int i = 0; i <= maxFd; i++)
        {
            if (FD_ISSET(i, &readSet))
            {
                if (i == fdSkt)
                {
                    RETURN_SYSCALL(fdSkt_accept, accept(fdSkt, NULL, 0), "fdSkt_accept = accept(fdSkt, NULL, 0)")
                    FD_SET(fdSkt_accept, &set);
                    if (fdSkt_accept > maxFd) {
                        maxFd = fdSkt_accept;
                    }
                    continue;
                }

                if(i == pipeW2M_Read)
                {
                    int pipeFdSoccket;
                    SYSCALL(read(pipeW2M_Read, &pipeFdSoccket, sizeof(int)), "Errore")
                    printf("%d\n", pipeFdSoccket);

                    FD_SET(pipeFdSoccket, &set);
                    if(pipeFdSoccket > maxFd)
                        maxFd = pipeFdSoccket;

                    continue;
                }

                gestioneCoda(maxFd, i, set);
            }
        }
    }
    SYSCALL(close(fdSkt), "Errore close - fdSkt")
}

static void *clientFun(void *pipeW2M_WriteF)
{
    puts("Entro");
    int pipeW2M_Write = *((int *) pipeW2M_WriteF);

    LOCK(&mutex)
    while (testaPtr == NULL)
    {
        WAIT(&emptyFd, &mutex)
    }
    int fdAccept = pop(&testaPtr);
    printf("Fd in thread: %d\n", fdAccept);
    UNLOCK(&mutex)

    char buffer[DIM_BUFFER];
    memset(buffer, '\0', DIM_BUFFER);

    int lenghtRead;
    RETURN_SYSCALL(lenghtRead, read(fdAccept, buffer, DIM_BUFFER), "Errore: read(fdSkt_com, buffer, DIM_BUFFER)")

    if(lenghtRead == 0)
    {
        SYSCALL(write(pipeW2M_Write, &fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, &fdAccept, sizeof(int))")
        return NULL;
    }

    //lenghtRead comprende conta tutti i caretteri letti (compreso il '\0' se è presente)
    for(int i = 0; i < lenghtRead-1; i++)
    {
        buffer[i] = toupper((unsigned char) buffer[i]);
    }
    SYSCALL(writen(fdAccept, buffer, lenghtRead), "Errore: writen(fdSkt_com, buffer, lengthBuffer)")

    SYSCALL(write(pipeW2M_Write, &fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, &fdAccept, sizeof(int))")
    puts("Esco");
    return NULL;

}

unsigned int updateMaxSelect(int maxFd, fd_set set)
{
    for(int i = maxFd - 1; i >= 0; i--)
    {
        if(FD_ISSET(i, &set))
        {
            return i;
        }
    }
    return -1;
}

void gestioneCoda(int maxFdF, int fd, fd_set set)
{
    LOCK(&mutex)

    push(&testaPtr, &codaPtr, fd);
    FD_CLR(fd, &set);
    if(fd == maxFdF)
        maxFdF = updateMaxSelect(fd, set);

    SIGNAL(&emptyFd)
    UNLOCK(&mutex)
}

void push(NodoPtr *testaPtrF, NodoPtr *codaPtrF, int fdF)
{
    NodoPtr nuovoPtr = NULL;
    RETURN_NULL_SYSCALL(nuovoPtr, malloc(sizeof(Nodo)), "nuovoPtr = malloc(sizeof(Nodo))")

    nuovoPtr->fd = fdF;
    nuovoPtr->prossimoPtr = NULL;

    if(*testaPtrF == NULL)
    {
        *testaPtrF = nuovoPtr;
        *codaPtrF = nuovoPtr;
    }
    else
    {
        (*codaPtrF)->prossimoPtr = nuovoPtr;
        *codaPtrF = nuovoPtr;
    }
}

int pop(NodoPtr *lPtrF)
{
    if(*lPtrF != NULL)
    {
        int value = (*lPtrF)->fd;
        NodoPtr tempPtr = *lPtrF;
        *lPtrF = (*lPtrF)->prossimoPtr;
        free(tempPtr);

        return value;
    }
    else
    {
        puts("la lista è vuota");
        exit(EXIT_FAILURE);
    }
}

void stampa(NodoPtr lPtrF)
{
    if(lPtrF != NULL)
    {
        printf("Parola: %d\n", lPtrF->fd);

        stampa(lPtrF->prossimoPtr);
    }
    else
        puts("NULL");
}

void cleanup()
{
    unlink(SOCKNAME);
}

Client code:

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include "utils.h"
#include "conn.h"
//Librerie per i socket:
#include <sys/socket.h>
#include <sys/un.h>

#define DIM_BUFFER 256


int main()
{
    int fdSkt;
    //Creazione socket - si usano (quasi) sempre questi parametri
    RETURN_SYSCALL(fdSkt, socket(AF_UNIX, SOCK_STREAM, 0), "Errore creazione socket - fdSkt")


    //Connect
    //sckAddr deve essere uguale a quello del server
    struct sockaddr_un sckAddr;
    strncpy(sckAddr.sun_path, SOCKNAME, 108);
    sckAddr.sun_family = AF_UNIX;

    //il socket potrebbe non aver ancora fatto la listen (per via dello scheduler)
    //il prof nelle correzioni mette solamente: SYSCALL(connect(fdSkt, (struct sockaddr *) &sckAddr, sizeof(sckAddr)), "")
    while(connect(fdSkt, (struct sockaddr *) &sckAddr, sizeof(sckAddr)) == -1)
    {
        puts("Bloccato");
        if(errno != ENOENT)
        {
            perror("Errore connect - fdSkt");
            exit(EXIT_FAILURE);
        }
    }

    while (1)
    {
        char buffer[DIM_BUFFER];
        memset(buffer, '\0', DIM_BUFFER);
        SCANF_STRINGA(buffer);

        if(strncmp(buffer, "quit", strlen("quit")) == 0) {
            break;
        }

        int lenghtBuffer = strlen(buffer)+1;
        SYSCALL(writen(fdSkt, buffer, lenghtBuffer), "Errore: writen(fdSkt, buffer, strlen(buffer)+1)")

        SYSCALL(readn(fdSkt, buffer, lenghtBuffer), "Errore: writen(fdSkt, buffer, strlen(buffer)+1)")
        printf("%s\n", buffer);

    }

    SYSCALL(close(fdSkt), "Errore close - fdSkt")

    return 0;
}

Utils.h code:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>

#define READ_END 0
#define WRITE_END 1

#define RETURN_SYSCALL(r,c,e) if((r=c)==-1) { perror(e);exit(errno); }
#define SYSCALL(c,e) if(c==-1) { perror(e);exit(errno);}
#define THREAD_CREATE(a, b, c, d, text) if(pthread_create(a, b, c, d) != 0) { perror(text);exit(EXIT_FAILURE);}
#define THREAD_JOIN(a, b, text) if(pthread_join(a, b) != 0) { perror(text);exit(EXIT_FAILURE);}
//usare con le funzioni che ritornano NULL quando falliscono e di cui si vuole memorizzare il valore di ritorno (es: fopen)
#define RETURN_NULL_SYSCALL(retrunVar, fun, text) if((retrunVar=fun) == NULL) { perror(text);exit(errno); }
//usare per le syscall che quando falliscono ritornano un valore != 0
#define SYSCALL_ZERO(syscall, text) if(syscall != 0) {perror(text);exit(errno);}

#define LOCK(l)                                         \
if (pthread_mutex_lock(l) != 0)                         \
{                                                       \
    fprintf(stderr, "ERRORE FATALE lock\n");            \
    pthread_exit((void*)EXIT_FAILURE);                  \
}

#define UNLOCK(l)                                       \
if (pthread_mutex_unlock(l) != 0)                       \
{                                                       \
    fprintf(stderr, "ERRORE FATALE unlock\n");          \
    pthread_exit((void*)EXIT_FAILURE);                  \
}

#define SIGNAL(c)                                       \
if (pthread_cond_signal(c) != 0)                        \
{                                                       \
    fprintf(stderr, "ERRORE FATALE signal\n");          \
    pthread_exit((void*)EXIT_FAILURE);                  \
}

#define WAIT(c, l)                                      \
if (pthread_cond_wait(c,l) != 0)                        \
{                                                       \
    fprintf(stderr, "ERRORE FATALE wait\n");            \
    pthread_exit((void*)EXIT_FAILURE);                  \
}

#define SCANF_STRINGA(stringa)                \
if(scanf("%s", stringa) == 0)                 \
{                                             \
    perror("Impossibile leggere la stringa"); \
    exit(EXIT_FAILURE);                       \
}

Conn.h code:

#if !defined(CONN_H)
#define CONN_H

#include <sys/types.h> 
#include <sys/socket.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>

#define SOCKNAME "./cs_sock"
#define MAXBACKLOG 108

/** Evita letture parziali
 *
 *   \retval -1   errore (errno settato)
 *   \retval  0   se durante la lettura da fd leggo EOF
 *   \retval size se termina con successo
 */
static inline int readn(long fd, void *buf, size_t size) {
    size_t left = size;
    int r;
    char *bufptr = (char*)buf;
    while(left>0) {
    if ((r=read((int)fd ,bufptr,left)) == -1) {
        if (errno == EINTR) continue;
        return -1;
    }
    if (r == 0) return 0;   // EOF
        left    -= r;
    bufptr  += r;
    }
    return size;
}

/** Evita scritture parziali
 *
 *   \retval -1   errore (errno settato)
 *   \retval  0   se durante la scrittura la write ritorna 0
 *   \retval  1   se la scrittura termina con successo
 */
static inline int writen(long fd, void *buf, size_t size) {
    size_t left = size;
    int r;
    char *bufptr = (char*)buf;
    while(left>0) {
    if ((r=write((int)fd ,bufptr,left)) == -1) {
        if (errno == EINTR) continue;
        return -1;
    }
    if (r == 0) return 0;  
        left    -= r;
    bufptr  += r;
    }
    return 1;
}


#endif /* CONN_H */

Upvotes: 1

Views: 722

Answers (2)

Clifford
Clifford

Reputation: 93456

Threads do not normally run to completion; rather they wait in an executive loop for some trigger event such as arrival of a message or semaphore. The pattern of a typical thread function is:

void* func( void* arg )
{
    // Thread initialisation

    while( !terminated )
    {
        // Block waiting
        ...
 
        // Do stuff (handle event/message for example)
        ...
    }

    // Clean-up
    ...
}

The signature of a thread function depends on the threading library/OS; the above is normal for pthreads for example. Also a thread may run indefinitely with a while(1) or for(;;) loop. The exact termination mechanism (i.e. the terminate in my example is up to you; it is not required.

Thread-loops are often implementations of state-machines.

You can of course have a run-to-completion thread, but in that case you have to create a new thread for every event - it is rather inefficient to to that.

The blocking call could be as simple as a sleep() call for periodic tasks. It is possible to block at multiple places throughout a task, but to do so makes the design more complex to code and debug, and a state-machine pattern is normally a better solution.

Upvotes: 4

Armali
Armali

Reputation: 19375

Now that Clifford gave general information, I'll address concrete issues in your program and suggest appropriate changes.

  • As it is, your clientFun fails to handle not only multiple (consecutive) client connections, but also multiple individual messages from one connection (handling messages from one connection in multiple threads is not sensible since the client anyway is not designed to send parallel requests); as a first step we need a loop around the processing of one message, i. e. around the lines

        RETURN_SYSCALL(lenghtRead, read(fdAccept, buffer, DIM_BUFFER), "Errore: read(fdSkt_com, buffer, DIM_BUFFER)")
    …
        SYSCALL(writen(fdAccept, buffer, lenghtRead), "Errore: writen(fdSkt_com, buffer, lengthBuffer)")
    

    put a for (; ; ) { … } loop and replace the return NULL; therein with break;.

  • Now in order to handle multiple connections we need an outer for (; ; ) { … } loop around

        LOCK(&mutex)
    …
        SYSCALL(write(pipeW2M_Write, &fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, &fdAccept, sizeof(int))")
    
  • To work together with these changes, adjustments in run_server are needed. There's no more need for the server to watch the accepted socket descriptor, so change

                        FD_SET(fdSkt_accept, &set);
    

    to

                        gestioneCoda(maxFd, fdSkt_accept, set);
    

    and remove the later call of gestioneCoda, so the accepted socket is immediately passed to the worker thread. Finally change

                        FD_SET(pipeFdSoccket, &set);
    

    to

                        close(pipeFdSoccket);
    

    - when a connection is finished, it must not be added to the watch list, but rather closed to avoid socket descriptor leaking.

Upvotes: 0

Related Questions