Reputation: 43
I folks, i need some help. I have to implement a server/client project for transfer files from clients to server. Server receives N number of sockets, one socket per server-process, using fork(), and more than 1 client per socket(threads). Clients sends a file to server and it has to backup it and show TX speed and bytes count. I resolved the TX issue, for many server-process, but when i try to apply threads for concurrency on same socket it does not work, messages and files are not received.
Server use 1 parameter: number_of_sockets
Client use 3 parameters: server_ip server_port file_name
server.c
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<string.h>
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<fcntl.h>
#include<errno.h>
#include<sys/resource.h>
#include<sys/wait.h>
#include<signal.h>
#include<unistd.h>
#include<time.h>
// accepting connections
short int aceptar_conexiones = 1;
//struct for threads
typedef struct{
int fd_socket;
struct sockaddr_in address;
socklen_t sock_length;
}thread_args;
//struct for associate server-process and sockets
typedef struct{
int fd_socket;
pid_t pid_proceso;
}fd_socket_proceso;
//max sockets open in server
unsigned int max_sockets;
//max server-process in server
unsigned int cant_procesos;
//dynamic array for associated struct
fd_socket_proceso *fdsp;
//mutex for sync threads
pthread_mutex_t mutex_thread;
#define TAM_BUFFER 1024
//handler for threads
//void *connection_handler(void *);
void connection_handler(void *);
void finaliza_sockets();
void finaliza_procesos();
void error(const char *);
int main(int argc, char *argv[]) {
int fd_listen_socket, fd_communication_socket, i;
max_sockets = atoi(argv[1]);
struct sockaddr_in listen_address, connection_address;
socklen_t con_addr_len = sizeof(connection_address);
//allocating memory for dynamic array
fdsp = (fd_socket_proceso *)malloc(sizeof(fd_socket_proceso)*max_sockets);
//create and open sockets, grabbing it on array, it starts on 1024 and on, max 5 clients per socket
for(i=0 ; i<max_sockets ; i++){
fd_listen_socket = socket(AF_INET, SOCK_STREAM, 0);
if(fd_listen_socket < 0) error("No se pudo crear el socket.\n");
bzero(&listen_address, sizeof(struct sockaddr_in));
listen_address.sin_family = AF_INET;
listen_address.sin_port = htons(1024+i); // Puede utilizarse cualquier puerto
listen_address.sin_addr.s_addr = htonl(INADDR_ANY); // Cualquier direccion propia
if(bind(fd_listen_socket, (struct sockaddr *)&listen_address, sizeof(struct sockaddr)) < 0) error("No se puede enlazar el socket.\n");
printf("Servidor escuchando en puerto: %d\n",ntohs(listen_address.sin_port));
listen(fd_listen_socket, 5);
fdsp[i].fd_socket = fd_listen_socket;
}
printf("Comenzamos a escuchar conexiones...\n");
fflush(stdout);
//fork per socket
for(i=0 ; i<max_sockets ; i++ , cant_procesos++){
if(!(fdsp[i].pid_proceso=fork())){
while(aceptar_conexiones){
bzero(&connection_address, sizeof(connection_address));
if((fd_communication_socket = accept(fdsp[i].fd_socket, (struct sockaddr *)&connection_address, &con_addr_len))==-1){
finaliza_sockets();
finaliza_procesos();
perror("Error en la comunicacion con el socket");
exit(EXIT_FAILURE);
}
pthread_t thread_cliente;
pthread_attr_t thread_attr;
thread_args t_args;
pthread_attr_init(&thread_attr);
pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
t_args.address = connection_address;
t_args.fd_socket = fd_communication_socket;
t_args.sock_length = con_addr_len;
//pthread_create(&thread_cliente, NULL, connection_handler, (void *)&t_args);
connection_handler((void *)&t_args);
}
}
}
finaliza_sockets();
finaliza_procesos();
printf("Server offline.\n");
}
//void *connection_handler(void *t_args)
void connection_handler(void *t_args)
{
thread_args *t = (thread_args *)t_args;
char buffer[TAM_BUFFER];
char *buffer2;
FILE *fp;
char *datos[2];
char file_name[TAM_BUFFER];
char *ip_cliente = inet_ntoa(t->address.sin_addr);
int port_cliente = ntohs(t->address.sin_port);
long int file_size, bytes_restantes;
ssize_t recv_size;
int t_inicio ,t_fin , t_transferencia;
int i;
bzero(buffer, sizeof(buffer));
//1st msg recieve file_size concat using char '°' with file_name
recv_size = recvfrom(t->fd_socket, buffer, sizeof(buffer), 0, (struct sockaddr *)&(t->address), &(t->sock_length));
//parse file_size and file_name
buffer2=strtok(buffer,"°");
for(i=0 ; buffer2 ; i++){
datos[i]=buffer2;
buffer2=strtok(NULL,"°");
}
file_size = atoi(datos[0]);
strcpy(file_name, datos[1]);
//concat file_name for backup
strcat(file_name, ".bkp");
if(fp=fopen(file_name, "r")){
printf("\nError, el fichero %s provisto por el cliente %s ya existe! Este sera omitido por el servidor.\n",file_name, ip_cliente);
fflush(stdout);
close(t->fd_socket);
return;
}else{
if((fp=fopen(file_name,"w"))==NULL){
close(t->fd_socket);
printf("\nError en la creacion del fichero %s provisto por el cliente %s.\n",file_name, ip_cliente);
fflush(stdout);
exit(errno);
}else{
bytes_restantes = file_size;
bzero(buffer, sizeof(buffer));
//calculate begin TX
t_inicio = clock();
//recieve file from sendfile() from client
recv_size = recv(t->fd_socket, buffer, sizeof(buffer), 0);
printf("llego, recv: %ld\n",recv_size);
while(recv_size > 0 && bytes_restantes > 0){
//calculate finish TX
t_fin = clock();
bytes_restantes -= recv_size;
//write on file descriptor
fwrite(buffer, sizeof(char), recv_size, fp);
//calculate TX speed
t_transferencia = t_fin-t_inicio/CLOCKS_PER_SEC;
//print data about TX
printf("Cliente:%s:%d, Fichero:%*s, Bytes recibidos:%*ld, Bytes restantes:%*ld, Velocidad TX:%d (bytes/seg)\r",
ip_cliente, port_cliente, strlen(file_name)*1L, file_name ,sizeof(long int), file_size-bytes_restantes, sizeof(long int),
bytes_restantes, t_transferencia);
fflush(stdout);
bzero(buffer, sizeof(buffer));
//recieve file from sendfile() from client
recv_size = recv(t->fd_socket, buffer, sizeof(buffer), 0);
t_inicio = clock();
}
}
}
printf("\nCliente:%s:%d\tFichero:%s\tTransferencia finalizada.\n",ip_cliente, port_cliente, file_name);
fflush(stdout);
fclose(fp);
close(t->fd_socket);
//pthread_exit(0);
}
//close sockets
void finaliza_sockets(){
int i;
for(i=0 ; i<max_sockets ; i++)
close(fdsp[i].fd_socket);
free(fdsp);
}
//wait for all server-process
void finaliza_procesos(){
int i;
for(i=0 ; i<cant_procesos ; i++)
wait(0);
}
void error(const char *msg){
perror(msg);
exit(errno);
}
client.c
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<signal.h>
#include<time.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<sys/sendfile.h>
#define TAM_BUFFER 1024
int main(int argc, char *argv[])
{
int fd_socket;
int i;
int tini, tfin, ttransferencia;
struct sockaddr_in server_address;
char *path, *buffer, *vect[30], linea[TAM_BUFFER], linea2[TAM_BUFFER], file_name[TAM_BUFFER/4];
int fd_file;
long int send_bytes, offset, remain_data;
struct stat file_stat;
//file_name , maybe use path
path = argv[3];
if((fd_file = open(path, O_RDONLY))==-1){
perror("error en la apertura del archivo\n");
exit(EXIT_FAILURE);
}
if(fstat(fd_file, &file_stat)<0){
perror("error fstat");
exit(EXIT_FAILURE);
}
//connection with server
fd_socket = socket(AF_INET, SOCK_STREAM, 0);
server_address.sin_family = AF_INET;
server_address.sin_port = htons(atoi(argv[2]));
server_address.sin_addr.s_addr = inet_addr(argv[1]);
bzero(&(server_address.sin_zero),8);
signal(SIGINT,SIG_DFL);
//establishing connection
if(connect(fd_socket,(struct sockaddr *)&server_address,sizeof(struct sockaddr)) < -1){
perror("error on connection with server");
exit(errno);
}
//parse file_name from path
i=0;
buffer=strtok(path,"/");
while(buffer != NULL)
{
vect[i]=buffer;
buffer=strtok(NULL,"/");
i++;
}
//concat file_size with char '°' and file_name
strcpy(file_name, vect[i-1]);
sprintf(linea,"%ld",file_stat.st_size);
strcat(linea,"°");
strcat(linea, file_name);
//send concatenated char
send_bytes = send(fd_socket,linea,strlen(linea),0);
//send file to server
offset = 0;
remain_data = file_stat.st_size;
while(((send_bytes = sendfile(fd_socket, fd_file,(off_t *) &offset, TAM_BUFFER)) > 0) && (remain_data > 0)){
printf("enviados: %*ld bytes\toffset: %*ld\tbytes restantes: %ld\r",sizeof(send_bytes), send_bytes, sizeof(offset), offset, remain_data);
sizeof(stdout);
remain_data -= send_bytes;
}
close(fd_file);
close(fd_socket);
return 0;
}
As i said, my problem is i need concurrency on each socket. How i manage and sync them? I tried basic things but it does not work, so i comment thread_create and thread_exit func.
Thanks for your time.-
Upvotes: 2
Views: 2819
Reputation: 552
You can use fork() to create child process and parent process can listening and when client is connected parent can handover that client to child process and keep listening for other clients. Then when another client is connected again fork() and again you can make another child process to handle that client. Like this you can concurrently handle clients.
pid = fork()
when pid = 0 it is child process and if pid > 0 it is parent process and if pid < 0 then there is an error in forking.
Upvotes: 1
Reputation: 2559
I see nothing wrong with the main structure. You're listening on several ports, spawning a process for each port. Each process (should) spawn a new thread for each client.
You do have a race condition in the server with thread_args.address (could get overwritten if two clients connect in quick succession), but it's easy to avoid. Don't use recvfrom on TCP sockets -- it doesn't really make sense. Use read(), or even better fread(), which avoids EINTR issues. Read one byte at a time in the beginning until you encounter the 1st delimiter (though it would be better to change the protocol to send a 32-bit word instead of an ascii representation).
But the main problem is that you have a large program and you simply say "it doesn't work". How about posting some diagnostics so long as you want us to debug it for you? Do the server processes get created? What gets printed? Does it fail for the first client, 2nd client or what?
Upvotes: 2