Reputation: 1144
Consider the next piece of code.
#include <iostream>
#include <vector>
#include <map>
using namespace std;
map<pthread_t,vector<int>> map_vec;
vector<pair<pthread_t ,int>> how_much_and_where;
pthread_cond_t CV = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void* writer(void* args)
{
while(*some condition*)
{
int howMuchPush = (rand() % 5) + 1;
for (int i = 0; i < howMuchPush; ++i)
{
// WRITE
map_vec[pthread_self()].push_back(rand() % 10);
}
how_much_and_where.push_back(make_pair(pthread_self(), howMuchPush));
// Wake up the reader - there's something to read.
pthread_cond_signal(&CV);
}
cout << "writer thread: " << pthread_self() << endl;
return nullptr;
}
void* reader(void* args) {
pair<pthread_t, int> to_do;
pthread_cond_wait(&CV, &mutex);
while(*what condition??*)
{
to_do = how_much_and_where.front();
how_much_and_where.erase(how_much_and_where.begin());
// READ
cout << to_do.first << " wrote " << endl;
for (int i = 0; i < to_do.second; i++)
{
cout << map_vec[to_do.first][i] << endl;
}
// Done reading. Go to sleep.
pthread_cond_wait(&CV, &mutex);
}
return nullptr;
}
//----------------------------------------------------------------------------//
int main()
{
pthread_t threads[4];
// Writers
pthread_create(&threads[0], nullptr, writer, nullptr);
pthread_create(&threads[1], nullptr, writer, nullptr);
pthread_create(&threads[2], nullptr, writer, nullptr);
// reader
pthread_create(&threads[4], nullptr, reader, nullptr);
pthread_join(threads[0], nullptr);
pthread_join(threads[1], nullptr);
pthread_join(threads[2], nullptr);
pthread_join(threads[3], nullptr);
return 0;
}
Background
Every writer have his own container to which he writes data. And suppose that there's a reader who knows when a writer finished writing chunk of data, and what is the size of that chunk (The reader has a container to which writers write pairs of this data).
Questions
map_vec
and how_much_and_where
. But i don't understand what ,in this case, is the -
efficent way to to position locks on this resources (For example, locking map_vec
before every push_back
in the for loop? Or maybe lock it before the for loop - But isn't pushing to a queue is a wasteful and long operation, that may cause the reader to wait too much?) /
safe way to position locks in order to prevent deadlocks.how_much_and_where
is
not empty, but obviously a situation in which the reader emptied how_much_and_where
right before a writer added a pair may accour.Upvotes: 3
Views: 3435
Reputation: 3736
To simplify things we should decouple the implementation of the general-purpose/reusable producer-consumer queue (or simply "blocking queue" as I usually call it) from the implementation of the actual producers and the consumer (that aren't general-purpose/reusable - they are specific to your program). This will make the code much more clear and manageable from a design perspective.
First you should implement a "blocking queue" that can manage multiple multiple producers and a single consumer. This blocking queue will contain the code that handles multithreading/synchronization and it can be used by a consumer thread to receive items from several producer threads. Such a blocking queue can be implemented in a lot of different ways (not only with mutex+cond combo) depending whether you have 1 or more consumers and 1 or more producers (sometimes it is possible to introduce different kinds of [platform specific] optimizations when you have only 1 consumer or 1 producer). The simplest queue implementation with mutex+cond pair automatically handles multiple producers and multiple consumers if needed.
The queue has only an internal container (it can be a non-thread safe std::queue, vector or list) that holds the items and an associated mutex+cond pair that protects this container from concurrent access of multiple threads. The queue has to provide two operations:
produce(item)
: puts one item into the queue and returns immediately. The pseudo code looks like this:
wait_and_get()
: if there is at least one item in the queue then it removes the oldest one and returns immediately, otherwise it waits util someone puts an item to the queue with the produce(item)
operation.
if container is empty:
pthread_cond_wait
)remove oldest item
Now that you have a reusable blocking queue to build on we can implement the producers and the consumer along with the main thread that controls things.
They just throw a bunch of items into the queue (by calling produce(item)
of the blocking queue) and then they exit. If the production of items isn't computation heavy or doesn't require waiting for a lot of IO operations then this will finish very quickly in your example program. To simulate real world scenarios where the threads do heavy work you could the the following: On each producer thread you put only X (lets say 5) number of items to the queue but between each item you wait for a random number of seconds let's say between 1 and 3 seconds. Note that after some time your producer threads quit by themselves when they finished their job.
The consumer has an infinite loop in which it always gets the next item from the queue with wait_and_get()
and processes it somehow. If it is a special item that signals the end of processing then it breaks out of the infinite loop instead of processing the item. Pseudo code:
Infinite loop:
wait_and_get()
)Wait for all producer threads to finish (pthread_join()
them).
Remember that producers finish and quit by themselves after some time without external stimuli. When you finish join-ing all producers it means that every producer has quit so no one will call the produce(item)
operation of the queue again. However the queue may still have unprocessed items and consumer may still work on crunching those.
Put the last special "end of processing" item to the queue for the consumer.
When the consumer finishes processing the last item produced by the producers it will still ask the queue for the next item with wait_and_get()
- this may result in a deadlock because of waiting for the next item that never arrives. To aid this on the main thread we put the last special item to the queue that signals the end of processing for the consumer. Remember that our consumer implementation contains a check for this special item to find out when to finish processing. Important that this special item has to be placed to the queue on the main thread only after the producers have finished (after joining them)!
If you have multiple consumers then its easier to put multiple special "end of processing" items to the queue (1 for each consumer) than making the queue smarter to be able to handle multiple consumers with only 1 "end of processing" item. Since the main thread orchestrates the whole thing (thread creation, thread joining, etc) it knows exactly the number of consumers so it's easy to put the same number of "end of processing" items to the queue.
Wait for the consumer thread to terminate by joining it.
After putting the end-of-processing special item to the queue we wait for the consumer thread to process the remaining items (produced by the producers) along with our last special item (produced by the main "coordinator" thread) that asks consumer to finish. We do this waiting on the main thread by pthread_join()
-in the consumer thread.
NULL
job pointer for this purpose. In your case you will have to find out what kind of special value can you use in the queue to signal the end of processing for the consumer.pthread_cond_wait
documentation says that this function can wake up without actual signaling (although I've never seen a single bug caused by the spurious wakup of this function in my life). To aid this the if container is empty then pthread_cond_wait
part of the code should be replaced to while the container is empty pthread_cond_wait
but again, this spurious wakeup thing is probably a lochness monster that is present only on some architectures with specific linux implementations of threading primitives so your code would probably work on desktop machines without caring about this problem.Upvotes: 5