Kevin Lee
Kevin Lee

Reputation: 1135

POSIX Threads with Sockets in C

#include "cs241.c"

#define THREAD_COUNT 10

int list_s;
int connections[THREAD_COUNT];
char space[THREAD_COUNT];

int done = 0;

pthread_mutex_t muxlock = PTHREAD_MUTEX_INITIALIZER;

int *numbers;
int numbers_count;

void *listener(void *arg) {
    int n = *(int *) arg;

    FILE *f = fdopen(connections[n], "r");
    if (f == NULL)
        printf("Could not open file\n");
    char *line = NULL;
    size_t *len = malloc(sizeof(int));
    while(getline(&line, len, f) != -1) {
        printf("%s", line);
        if (strcmp("END", line) == 0) {
                            pthread_mutex_lock(&muxlock);
            done = 1;
            pthread_mutex_unlock(&muxlock);                     
        }
    }

    space[n] = 't';
    fclose(f);
    free(len);
    close(connections[n]);
    return NULL;
}

void initialize() {
    int n;
    for (n = 0; n < THREAD_COUNT; n++) {
        space[n] = 't';
    }
}

int check() {
    int index;
    for (index = 0; index < THREAD_COUNT; index++) {
        if (space[index] == 't') {
            space[index] = 'f';
            return index;
        }
    }
    return 0;
}   

int main(int argc, char *argv[]) {
    int port = 0;
    int binder;
    int lis;
    int i = 0;
    int *j = malloc(sizeof(int));
    initialize();
    pthread_t threads[THREAD_COUNT];

    if ((argc != 2) || (sscanf(argv[1], "%d", &port) != 1)) {
        fprintf(stderr, "Usage: %s [PORT]\n", argv[0]);
        exit(1);
    }

    if (port < 1024) {
        fprintf(stderr, "Port must be greater than 1024\n");
        exit(1);
    }

    // set the initial conditions for the numbers array.
    numbers = malloc(sizeof(int));
    numbers_count = 0;

    struct sockaddr_in servaddr; // socket address structure
    // set all bytes in socket address structure to zero, and fill in the relevant data members
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);

    list_s = socket(AF_INET, SOCK_STREAM, 0);
    if (list_s < 0) {
        printf("Could not create socket\n");
        exit(EXIT_FAILURE);
    }
    binder = bind(list_s, (struct sockaddr *) &servaddr, sizeof(servaddr));
    if (binder < 0) {
        printf("Could not bind socket\n");
        exit(EXIT_FAILURE);
    }
    lis = listen(list_s, SOMAXCONN);
    if (lis < 0) {
        printf("Could not listen on socket\n");
        exit(EXIT_FAILURE);
    }
    SET_NON_BLOCKING(list_s);
    while (done != 1) {
        connections[i] = accept(list_s, NULL, NULL);
        if (connections[i] < 0) {
            if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
                continue;
            printf("Could not accept connection\n");
            exit(EXIT_FAILURE);
        }
        i = check();
        *j = i;
                    SET_BLOCKING(list_s);
        pthread_create(&threads[i], NULL, listener, j);
    }

    // Verify the array.
    //verify(numbers, numbers_count);

    free(j);
    close(list_s);

    exit(EXIT_SUCCESS);

}

So I have a while loop in my main(), which should exit when 'done' = 1. This is set by listener(), when it receives 'END'. First problem is that when I send 'END' on the first iteration, the while loop does not quit, only after another 'END' is sent.

I have two macros SET_NON_BLOCKING and SET_BLOCKING in another file for unblocking and blocking sockets so it does wait for a connection if there is none. Next problem is when I do not use those macros, getline() in listener() does not manage to read everything that is outputted from the stream. When I do use them, it cannot open the stream at all.

I think some of the problems lie in passing 'j' to the threads in that when a second thread starts, it overwrites 'j' before the first thread can read from it. However I've been at it for over a few days and can't get anywhere. Any help would be greatly appreciated. Thanks!

Also I would like to ask if my socket blocking and thread locking are in the right place?

Upvotes: 0

Views: 1823

Answers (2)

Michael Burr
Michael Burr

Reputation: 340208

There seem to be quite a few things wrong with how you're managing the connections[] and space[] arrays:

  • you accept a connection into connections[i], but the i you pass to the listener thread is the one returned by check() - which could be a completely different i.
  • each thread gets passed the same address for a thread parameter - j is allocated only once, and that same allocation is used for every thread creation. If two or more threads get started at nearly the same time, you'll lose the index(s) that should get passed to some of them.
  • when it's finishing, listener marks a connection as being available (I assume that's what the intentions of space[n] = 't'; is), but then uses connections[n] to close the connection. None of this is performed with any synchronization.

Some other comments:

  • why in the world are you dynamically allocating len?
  • the check of done in main()'s while loop should be protected by a mutex
  • I'm not sure exactly what the protocol is for the data being transferred, but since you're using getline() I assume it's a set of lines terminated with newlines. In that case, keep in mind this bit of information from the getline() docs: "The buffer is null-terminated and includes the newline character, if one was found". If the "END" line includes a newline, strcmp("END",line) won't return 0.

Upvotes: 1

Ben Kelly
Ben Kelly

Reputation: 1344

You probably have to send END twice because your main thread blocks on accept() immediately after spawning the listener thread. Since its blocked on accept() it cannot see that done == 1.

You can fix this by staying in non-blocking mode. You probably want to sleep to avoid spinning in a tight loop if you do this. Another alternative would be to send a signal to wake up accept causing it to set EINTR.

In terms of passing the connection index to the listener thread, you are probably correct that you have a race between the threads overwriting the value. Since an int will require the same or fewer bytes than a void *, you can actually just pass the int directly. For example:

pthread_create(&threads[i], NULL, listener, (void *)i);

And then in listener:

int n = (int)arg;

This is kind of a hack. A more complete solution would be to allocate a separate arguments structure for each thread using malloc. The listener thread would then be responsible for free'ing the arguments.

struct params *p = malloc(sizeof(struct params));
p.index = i;

pthread_create(&threads[i], NULL, listener, p);

Then in listener:

struct params *p = args;
if (p == NULL) {
  // handle error
  return NULL;
}
int n = p->index;
free(p);

Upvotes: 1

Related Questions