Reputation: 1706
I made a circular buffer with multiple clients writing a message of different length into a buffer. The server reads them out. It based the code an the consumer/producer problem. The problem is when the buffer is full and the server removes all the data from the buffer the client is signaled to resume it's writing operations but instead another client (in another thread) start writing it message in the buffer. I want client that was already writing before the buffer was full to resume it's operations so that the message doesn't arrive out of order.
This is my code (i removed a lot of test code)
#include <stdio.h>
#include <malloc.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#define BUFFER_SIZE 8
#define NUM_THREADS 4
struct cBuf{
char *buf;
int size;
int start;
int end;
pthread_mutex_t mutex;
pthread_cond_t buffer_full;
pthread_cond_t buffer_empty;
};
struct cBuf cb;
void buf_Init(struct cBuf *cb, int size) {
int i;
cb->size = size + 1;
cb->start = 0;
cb->end = 0;
cb->buf = (char *)calloc(cb->size, sizeof(char));
for (i=0;i<size;i++) cb->buf[i]='_';
}
void buf_Free(struct cBuf *cb) {
free(cb->buf);
}
int buf_IsFull(struct cBuf *cb) {
return (cb->end + 1) % cb->size == cb->start;
}
int buf_IsEmpty(struct cBuf *cb) {
return cb->end == cb->start;
}
int buf_Insert(struct cBuf *cb, char *elem) {
int i,j;
pthread_mutex_lock(&(cb->mutex));
for (i=0; i < strlen(elem); ++ i){
if (buf_IsFull(cb)==1) printf("\nProducer (buf_Insert) is waiting because of full buffer");
while(buf_IsFull(cb)){
pthread_cond_signal(&(cb->buffer_full));
pthread_cond_wait(&(cb->buffer_empty),&(cb->mutex));
}
cb->buf[cb->end] = elem[i];
cb->end = (cb->end + 1) % cb->size;
printf("%c [INPUT]",elem[i]);
}
pthread_cond_signal(&(cb->buffer_full));
pthread_mutex_unlock(&(cb->mutex));
return 0;
}
int buf_Read(struct cBuf *cb, char *out) {
int i,j;
pthread_mutex_lock(&(cb->mutex));
if (buf_IsEmpty(cb))printf("\nConsumer (buf_Read) is waiting because of empty buffer\n");
while(buf_IsEmpty(cb)){
pthread_cond_wait(&(cb->buffer_full),&(cb->mutex));
}
for (i=0;i<BUFFER_SIZE-1;i++){
printf("\n");
if (cb->start == cb->end) break;
out[i] = cb->buf[cb->start];
cb->buf[cb->start] = '_';
cb->start = (cb->start + 1) % cb->size;
printf("%c [OUTPUT]",out[i]);
}
pthread_cond_signal(&(cb->buffer_empty));
pthread_mutex_unlock(&(cb->mutex));
return 0;
}
void * client(void *cb){
pthread_detach(pthread_self());
struct cBuf *myData;
myData = (struct cBuf*) cb;
char input[]="Hello World!";
if (buf_Insert(myData, input)){
//succes on return 0
printf("\n");
}
return 0;
}
int main(void) {
char out[60];
pthread_t thread;
int i;
/* Initialise conditioners*/
pthread_cond_init(&(cb.buffer_full),NULL);
pthread_cond_init(&(cb.buffer_empty),NULL);
buf_Init(&cb, BUFFER_SIZE);
for (i = 0; i<NUM_THREADS; i++){
if(pthread_create (&thread,NULL, client, (void *) &cb) !=0){
} else {
}
}
while (1){
if (buf_Read(&cb,out)){
}
}
//empty the buffer; free the allocated memory
buf_Free(&cb);
return 0;
}
Upvotes: 3
Views: 1033
Reputation: 76236
I already explained in comment in Producer/consumer seems to be in deadlock when buffer is smaller than input from producer, but those are comments, so here goes as answer:
You should never ever have partial message in the queue. Make sure you never write one.
You can check whether there is enough space before starting to write the message and wait for buffer_empty straight away if there's not, or you can change the queue to send shared pointers to allocated data (either pass ownership to consumer or reference-counted) or something, so each message only takes up one slot in the queue and allocated memory for the rest. What's best will depend on the exact nature of your message. Anything will do as long as there are no partial messages.
While it would be possible to record which particular writer needs to finish a message and wake just that, it would be awfully complicated. Synchronization is hard as it is, don't make it any harder by placing additional requirements on it.
In fact unless this is a homework (in a sense you do it to learn how synchronization works), just look for ready-made message queues. The SysV-IPC ones or unix-domain sockets in datagram mode are two options that come to mind, or look for some library that does.
Upvotes: 3