Reputation: 1460
In a C program, some threads (pthread1
, pthread2
, ...) generate a message and the produced messages are handled by an additional thread (pthreadprint
), which prints them.
The messages could "amass" before being handled: more than one message can stack at a certain time as input to pthreadprint
, according to the time required to pthreadprint
to print. I obviously don't know the maximum number of messages that can be waiting for being processed in the worst scenario.
What could it be the best way to send these message (from pthread1
, pthread2
, ... that "produce" them) to the pthreadprint
that prints them? They should not only be transmitted, but also "stockpiled". I know about mutexes and condition variables, but they don't represent a queue: is it possible to use a FIFO queue which is accessible by all the threads?
Upvotes: 0
Views: 1333
Reputation: 39316
One simple and robust solution to these kinds of producer-consumer problems is using a singly linked list of messages protected by a mutex. Using C99 and pthreads:
#define _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <stdarg.h>
#include <stdio.h>
#include <errno.h>
struct message {
struct message *next;
/* Payload is irrelevant, here just as an example: */
size_t size;
char data[];
};
typedef struct {
pthread_mutex_t lock;
pthread_cond_t more;
struct message *newest;
struct message *oldest;
} queue;
#define QUEUE_INITIALIZER { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, NULL, NULL }
/* Nonblocking variant */
struct message *try_dequeue(queue *const q);
/* Blocking variants */
int enqueue(queue *const q, struct message *const m);
struct message *dequeue(queue *const q);
/* Suggested interface for queueing a new message */
int queue_printf(queue *const q, const char *const format, ...);
The implementation is straightforward.
struct message
singly linked list has oldest messages first, with new messages appended to the end.newest == NULL
and oldest == NULL
.enqueue()
, dequeue()
, try_dequeue()
) take the queue lock
mutex before examining the pointers. (To reduce contention in heavy use, keep the lock duration as short as possible; in other words, construct the message fully first before taking the lock.)more
condition variable (when the queue is empty).newest
and oldest
point to it.more
is signaled, in case there is a blocking dequeue waiting for a new message.newest->next
to point to the new message first, then sets newest
to point to the new message.oldest
member from the list, updating oldest
to point to oldest->next
. If oldest
becomes NULL
(then both newest
and oldest
pointed to the same message, the only message in the queue), also newest
is set to NULL
since the queue becomes empty.lock
mutex fails (and that typically only fails if the C library detects a deadlock situation), or if you have checks that notice the queue structure is in inconsistent state (e.g. one, but not both, of newest
and oldest
is NULL
, for example).
The logic in the above prototype is that it returns 0
if success, and an errno error code otherwise (EINVAL
, EDEADLK
). I also like to set errno
to that error code, too, for symmetry with dequeueing.EWOULDBLOCK
/EAGAIN
). In these cases, the function could return NULL
with errno
set.As you can see, both enqueue and dequeue are O(1) operations, i.e. take constant time; there is no need to traverse the entire list at any point.
An enqueue/dequeue operation can en/dequeue more than one message at once, just by repeating the above. However, it's quite rare to need to do that in practice. (For dequeuing, the main reason is that if you grab more than one message at a time, and you have a failure or error with one message, you have to deal with the error and the yet unhandled but dequeued messages, too; bug-prone. Easier to do things one by one. Besides, if message order is not critical, you could always have more than one consumer working in parallel, if they dequeue messages one by one.)
Advanced notes:
If relying on the C99 standard, you can use the same code for any structure types that begin with struct message *next;
. By C99 rules, such structures are compatible (for that shared initial part), and that is the only part accessed by the queue operations.
In other words, if you have multiple message types, each stored in their own queue, you only need one implementation of enqueue()
/dequeue()
/try_dequeue()
for all different message types, as long as the message structures all begin with struct message *next;
.
For type safety, you need trivial wrapper functions:
static inline int enqueue_yourtype(yourtype_queue *const q, struct yourtype_message *const m)
{
return enqueue((queue *const)q, (struct message *const)m);
}
static inline struct yourtype_message *dequeue_yourtype(yourtype_queue *const q)
{
return dequeue((queue *const)q);
}
static inline struct yourtype_message *try_dequeue_yourtype(yourtype_queue *const q)
{
return try_dequeue((queue *const)q);
}
that when defined in the header file, should not actually generate any overhead -- in fact, should not generate any additional code whatsoever, unless you take the address of one for some reason (in which case one non-inlined version must be emitted in each compilation unit taking the address of one). They do, however, provide type checking at compile time, which is often useful.
Upvotes: 1
Reputation: 1931
This seems to be a typical first year assignment on "producer-consumer" problem in interprocess communications (maybe in an OS class? )
What you need is a way to pass information from a process to another process. There are some ways to do it, with the simpler being the "common memory", i.e. a chunk of memory that all processes can access.
In my point of view, I would suggest to implement a queue. Then, all your process should have access to this queue, but ONLY when it is allowed (i.e. result from mutex). You can also augment the queue by adding functionality that checks the size (e.g. int isQueueFull(void);
given that each queue node should also have an index, e.g. struct{ queueNode* nextNode; unsigned int index; void* data;}
).
If you have all these in the same initial process and then you start threads from it, then all your "sub-processes" would have access to the same memory space as the parent process.
Upvotes: 0