Reputation: 1135
#include "cs241.c"
#define THREAD_COUNT 10
int list_s;
int connections[THREAD_COUNT];
char space[THREAD_COUNT];
int done = 0;
pthread_mutex_t muxlock = PTHREAD_MUTEX_INITIALIZER;
int *numbers;
int numbers_count;
void *listener(void *arg) {
int n = *(int *) arg;
FILE *f = fdopen(connections[n], "r");
if (f == NULL)
printf("Could not open file\n");
char *line = NULL;
size_t *len = malloc(sizeof(int));
while(getline(&line, len, f) != -1) {
printf("%s", line);
if (strcmp("END", line) == 0) {
pthread_mutex_lock(&muxlock);
done = 1;
pthread_mutex_unlock(&muxlock);
}
}
space[n] = 't';
fclose(f);
free(len);
close(connections[n]);
return NULL;
}
void initialize() {
int n;
for (n = 0; n < THREAD_COUNT; n++) {
space[n] = 't';
}
}
int check() {
int index;
for (index = 0; index < THREAD_COUNT; index++) {
if (space[index] == 't') {
space[index] = 'f';
return index;
}
}
return 0;
}
int main(int argc, char *argv[]) {
int port = 0;
int binder;
int lis;
int i = 0;
int *j = malloc(sizeof(int));
initialize();
pthread_t threads[THREAD_COUNT];
if ((argc != 2) || (sscanf(argv[1], "%d", &port) != 1)) {
fprintf(stderr, "Usage: %s [PORT]\n", argv[0]);
exit(1);
}
if (port < 1024) {
fprintf(stderr, "Port must be greater than 1024\n");
exit(1);
}
// set the initial conditions for the numbers array.
numbers = malloc(sizeof(int));
numbers_count = 0;
struct sockaddr_in servaddr; // socket address structure
// set all bytes in socket address structure to zero, and fill in the relevant data members
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
list_s = socket(AF_INET, SOCK_STREAM, 0);
if (list_s < 0) {
printf("Could not create socket\n");
exit(EXIT_FAILURE);
}
binder = bind(list_s, (struct sockaddr *) &servaddr, sizeof(servaddr));
if (binder < 0) {
printf("Could not bind socket\n");
exit(EXIT_FAILURE);
}
lis = listen(list_s, SOMAXCONN);
if (lis < 0) {
printf("Could not listen on socket\n");
exit(EXIT_FAILURE);
}
SET_NON_BLOCKING(list_s);
while (done != 1) {
connections[i] = accept(list_s, NULL, NULL);
if (connections[i] < 0) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
continue;
printf("Could not accept connection\n");
exit(EXIT_FAILURE);
}
i = check();
*j = i;
SET_BLOCKING(list_s);
pthread_create(&threads[i], NULL, listener, j);
}
// Verify the array.
//verify(numbers, numbers_count);
free(j);
close(list_s);
exit(EXIT_SUCCESS);
}
So I have a while loop in my main(), which should exit when 'done' = 1. This is set by listener(), when it receives 'END'. First problem is that when I send 'END' on the first iteration, the while loop does not quit, only after another 'END' is sent.
I have two macros SET_NON_BLOCKING and SET_BLOCKING in another file for unblocking and blocking sockets so it does wait for a connection if there is none. Next problem is when I do not use those macros, getline() in listener() does not manage to read everything that is outputted from the stream. When I do use them, it cannot open the stream at all.
I think some of the problems lie in passing 'j' to the threads in that when a second thread starts, it overwrites 'j' before the first thread can read from it. However I've been at it for over a few days and can't get anywhere. Any help would be greatly appreciated. Thanks!
Also I would like to ask if my socket blocking and thread locking are in the right place?
Upvotes: 0
Views: 1823
Reputation: 340208
There seem to be quite a few things wrong with how you're managing the connections[]
and space[]
arrays:
connections[i]
, but the i
you pass to the listener
thread is the one returned by check()
- which could be a completely different i
.j
is allocated only once, and that same allocation is used for every thread creation. If two or more threads get started at nearly the same time, you'll lose the index(s) that should get passed to some of them.listener
marks a connection as being available (I assume that's what the intentions of space[n] = 't';
is), but then uses connections[n]
to close the connection. None of this is performed with any synchronization.Some other comments:
len
?done
in main()
's while
loop should be protected by a mutexgetline()
I assume it's a set of lines terminated with newlines. In that case, keep in mind this bit of information from the getline()
docs: "The buffer is null-terminated and includes the newline character, if one was found". If the "END"
line includes a newline, strcmp("END",line)
won't return 0
.Upvotes: 1
Reputation: 1344
You probably have to send END twice because your main thread blocks on accept()
immediately after spawning the listener thread. Since its blocked on accept()
it cannot see that done == 1
.
You can fix this by staying in non-blocking mode. You probably want to sleep to avoid spinning in a tight loop if you do this. Another alternative would be to send a signal to wake up accept causing it to set EINTR.
In terms of passing the connection index to the listener thread, you are probably correct that you have a race between the threads overwriting the value. Since an int
will require the same or fewer bytes than a void *
, you can actually just pass the int
directly. For example:
pthread_create(&threads[i], NULL, listener, (void *)i);
And then in listener:
int n = (int)arg;
This is kind of a hack. A more complete solution would be to allocate a separate arguments structure for each thread using malloc. The listener thread would then be responsible for free'ing the arguments.
struct params *p = malloc(sizeof(struct params));
p.index = i;
pthread_create(&threads[i], NULL, listener, p);
Then in listener:
struct params *p = args;
if (p == NULL) {
// handle error
return NULL;
}
int n = p->index;
free(p);
Upvotes: 1