BowPark
BowPark

Reputation: 1460

Communication between two pthreads

In a C program, some threads (pthread1, pthread2, ...) generate a message and the produced messages are handled by an additional thread (pthreadprint), which prints them.

The messages could "amass" before being handled: more than one message can stack at a certain time as input to pthreadprint, according to the time required to pthreadprint to print. I obviously don't know the maximum number of messages that can be waiting for being processed in the worst scenario.

What could it be the best way to send these message (from pthread1, pthread2, ... that "produce" them) to the pthreadprint that prints them? They should not only be transmitted, but also "stockpiled". I know about mutexes and condition variables, but they don't represent a queue: is it possible to use a FIFO queue which is accessible by all the threads?

Upvotes: 0

Views: 1333

Answers (2)

Nominal Animal
Nominal Animal

Reputation: 39316

One simple and robust solution to these kinds of producer-consumer problems is using a singly linked list of messages protected by a mutex. Using C99 and pthreads:

#define _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <stdarg.h>
#include <stdio.h>
#include <errno.h>

struct message {
    struct message *next;
    /* Payload is irrelevant, here just as an example: */
    size_t          size;
    char            data[];
};

typedef struct {
    pthread_mutex_t lock;
    pthread_cond_t  more;
    struct message *newest;
    struct message *oldest;
} queue;
#define QUEUE_INITIALIZER { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, NULL, NULL }

/* Nonblocking variant */
struct message *try_dequeue(queue *const q);

/* Blocking variants */
int enqueue(queue *const q, struct message *const m);
struct message *dequeue(queue *const q);

/* Suggested interface for queueing a new message */
int queue_printf(queue *const q, const char *const format, ...);

The implementation is straightforward.

  • The struct message singly linked list has oldest messages first, with new messages appended to the end.
  • An empty queue has both newest == NULL and oldest == NULL.
  • All queue operations (enqueue(), dequeue(), try_dequeue()) take the queue lock mutex before examining the pointers. (To reduce contention in heavy use, keep the lock duration as short as possible; in other words, construct the message fully first before taking the lock.)
  • A blocking dequeue call can wait for a new message by waiting on the more condition variable (when the queue is empty).
  • When the first message is enqueued, both newest and oldest point to it.
  • When enqueuing the first message, the condition variable more is signaled, in case there is a blocking dequeue waiting for a new message.
  • Enqueuing further messages sets newest->next to point to the new message first, then sets newest to point to the new message.
  • Dequeuing detaches the oldest member from the list, updating oldest to point to oldest->next. If oldest becomes NULL (then both newest and oldest pointed to the same message, the only message in the queue), also newest is set to NULL since the queue becomes empty.
  • Enqueuing a message can only fail if locking the lock mutex fails (and that typically only fails if the C library detects a deadlock situation), or if you have checks that notice the queue structure is in inconsistent state (e.g. one, but not both, of newest and oldest is NULL, for example). The logic in the above prototype is that it returns 0 if success, and an errno error code otherwise (EINVAL, EDEADLK). I also like to set errno to that error code, too, for symmetry with dequeueing.
  • Dequeuing a message can fail for the same reasons as enqueuing, plus also when the queue is empty (EWOULDBLOCK/EAGAIN). In these cases, the function could return NULL with errno set.

As you can see, both enqueue and dequeue are O(1) operations, i.e. take constant time; there is no need to traverse the entire list at any point.

An enqueue/dequeue operation can en/dequeue more than one message at once, just by repeating the above. However, it's quite rare to need to do that in practice. (For dequeuing, the main reason is that if you grab more than one message at a time, and you have a failure or error with one message, you have to deal with the error and the yet unhandled but dequeued messages, too; bug-prone. Easier to do things one by one. Besides, if message order is not critical, you could always have more than one consumer working in parallel, if they dequeue messages one by one.)


Advanced notes:

If relying on the C99 standard, you can use the same code for any structure types that begin with struct message *next;. By C99 rules, such structures are compatible (for that shared initial part), and that is the only part accessed by the queue operations.

In other words, if you have multiple message types, each stored in their own queue, you only need one implementation of enqueue()/dequeue()/try_dequeue() for all different message types, as long as the message structures all begin with struct message *next;.

For type safety, you need trivial wrapper functions:

static inline int enqueue_yourtype(yourtype_queue *const q, struct yourtype_message *const m)
{
    return enqueue((queue *const)q, (struct message *const)m);
}

static inline struct yourtype_message *dequeue_yourtype(yourtype_queue *const q)
{
    return dequeue((queue *const)q);
}

static inline struct yourtype_message *try_dequeue_yourtype(yourtype_queue *const q)
{
    return try_dequeue((queue *const)q);
}

that when defined in the header file, should not actually generate any overhead -- in fact, should not generate any additional code whatsoever, unless you take the address of one for some reason (in which case one non-inlined version must be emitted in each compilation unit taking the address of one). They do, however, provide type checking at compile time, which is often useful.

Upvotes: 1

Xxxo
Xxxo

Reputation: 1931

This seems to be a typical first year assignment on "producer-consumer" problem in interprocess communications (maybe in an OS class? )

What you need is a way to pass information from a process to another process. There are some ways to do it, with the simpler being the "common memory", i.e. a chunk of memory that all processes can access.

In my point of view, I would suggest to implement a queue. Then, all your process should have access to this queue, but ONLY when it is allowed (i.e. result from mutex). You can also augment the queue by adding functionality that checks the size (e.g. int isQueueFull(void); given that each queue node should also have an index, e.g. struct{ queueNode* nextNode; unsigned int index; void* data;} ).

If you have all these in the same initial process and then you start threads from it, then all your "sub-processes" would have access to the same memory space as the parent process.

Upvotes: 0

Related Questions