Reputation: 15
I am trying to build an application which have multiple processes. These processes need to write concurrently through the same message queue. At the other side, there will be just one process reading that queue.
Is that possible using boost? Or do I have to implement that mutual exclusion?
I took a look at the example source code but it is not working properly for my needs. I don't know if I'm missing something.
This is the code on the client:
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <vector>
#include <unistd.h>
using namespace boost::interprocess;
int main ()
{
try{
//Erase previous message queue
//message_queue::remove("message_queue");
//Create a message_queue.
message_queue mq
(open_or_create //only create
,"message_queue" //name
,100 //max message number
,sizeof(int) //max message size
);
//Send 100 numbers
for(int i = 0; i < 100; ++i){
printf("Sending: %d\n", i);
usleep(1000000);
mq.send(&i, sizeof(i), 0);
}
}
catch(interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
And server code:
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <vector>
using namespace std;
using namespace boost::interprocess;
int main ()
{
try{
//message_queue::remove("message_queue");
//Open a message queue.
message_queue mq
(open_only //only create
,"message_queue" //name
);
unsigned int priority;
message_queue::size_type recvd_size;
//Receive 100 numbers
for(int i = 0; i < 100; ++i){
int number;
mq.receive(&number, sizeof(number), recvd_size, priority);
if(number != i || recvd_size != sizeof(number))
return 1;
cout << number << endl;
}
}
catch(interprocess_exception &ex){
message_queue::remove("message_queue");
std::cout << ex.what() << std::endl;
return 1;
}
//message_queue::remove("message_queue");
return 0;
}
Thanks in advance.
Upvotes: 1
Views: 3096
Reputation: 503795
The given examples for boost::interprocess::message_queue
work for me. These classes are already thread-safe, so intra-process threads are not a problem.
Here's a full example of a shared message queue. Let me know if you have trouble using it.
shared_mq.hpp
:
#include <boost/interprocess/ipc/message_queue.hpp>
// could easily be made a template; make sure T is a POD!
class shared_mq {
public:
shared_mq(const char* const name,
const unsigned max_queue_size) :
shared_mq{ name, max_queue_size, delete_queue(name) }
{}
shared_mq(const char* const name) :
mq_{ boost::interprocess::open_only, name }
{}
void send(int i) {
mq_.send(&i, sizeof(i), 0 /* priority */);
}
int receive() {
int result;
boost::interprocess::message_queue::size_type recvsize;
unsigned recvpriority;
mq_.receive(&result, sizeof(result), recvsize, recvpriority);
return result;
}
private:
struct did_delete_t {};
did_delete_t delete_queue(const char* const name) {
boost::interprocess::message_queue::remove(name);
return did_delete_t{};
}
shared_mq(const char* const name,
const unsigned max_queue_size,
did_delete_t) :
mq_ { boost::interprocess::create_only, name, max_queue_size, sizeof(int) }
{}
boost::interprocess::message_queue mq_;
};
client.cpp
:
#include <iostream>
#include <random>
#include <thread>
#include "shared_mq.hpp"
void send_ints(shared_mq& mq, const unsigned count) {
std::random_device rd;
std::mt19937 mt{ rd() };
std::uniform_int_distribution<int> dist{0, 10000};
for (unsigned i = 0; i != count; ++i) {
mq.send(dist(mt));
}
}
int main ()
{
std::cout << "Starting client." << std::endl;
try {
std::cout << "Creating queue..." << std::endl;
constexpr unsigned kQueueSize = 100;
shared_mq mq{ "my_queue", kQueueSize };
std::cout << "Sending ints..." << std::endl;
std::thread t1{ send_ints, std::ref(mq), 25};
std::thread t2{ send_ints, std::ref(mq), 25};
t1.join();
t2.join();
mq.send(-1); // magic sentinel value
}
catch (boost::interprocess::interprocess_exception& ex) {
std::cerr << ex.what() << std::endl;
return 1;
}
std::cout << "Finished client." << std::endl;
return 0;
}
server.cpp
:
#include <iostream>
#include "shared_mq.hpp"
int main ()
{
std::cout << "Starting server." << std::endl;
try {
std::cout << "Opening queue..." << std::endl;
shared_mq mq{ "my_queue" };
std::cout << "Receiving ints..." << std::endl;
for (;;) {
const int x = mq.receive();
if (x == -1) {
// magic sentinel value
break;
}
std::cout << "Received: " << x << std::endl;
}
}
catch (boost::interprocess::interprocess_exception& ex) {
std::cerr << ex.what() << std::endl;
return 1;
}
std::cout << "Finished server." << std::endl;
return 0;
}
Upvotes: 1