Daniel
Daniel

Reputation: 2592

Boost interprocess: string in a *not managed* shared memory?

I know that construction of a string in a shared memory needs an allocator.

That's fine, but I can't find out how can I do that, because all examples are using a Managed Shared Memory which has a method of get_segment_manager() which has to be used as allocator (if I'm not wrong).

Let's see this example copied from here: https://www.boost.org/doc/libs/1_77_0/doc/html/interprocess/synchronization_mechanisms.html#interprocess.synchronization_mechanisms.conditions.conditions_anonymous_example

doc_anonymous_condition_shared_data.hpp

#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>

struct trace_queue
{
   enum { LineSize = 100 };

   trace_queue()
      :  message_in(false)
   {}

   //Mutex to protect access to the queue
   boost::interprocess::interprocess_mutex      mutex;

   //Condition to wait when the queue is empty
   boost::interprocess::interprocess_condition  cond_empty;

   //Condition to wait when the queue is full
   boost::interprocess::interprocess_condition  cond_full;

   //Items to fill
   char   items[LineSize];

   //Is there any message
   bool message_in;
};

Main process

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstdio>
#include "doc_anonymous_condition_shared_data.hpp"

using namespace boost::interprocess;

int main ()
{

   //Erase previous shared memory and schedule erasure on exit
   struct shm_remove
   {
      shm_remove() { shared_memory_object::remove("MySharedMemory"); }
      ~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
   } remover;

   //Create a shared memory object.
   shared_memory_object shm
      (create_only               //only create
      ,"MySharedMemory"           //name
      ,read_write                //read-write mode
      );
   try{
      //Set size
      shm.truncate(sizeof(trace_queue));

      //Map the whole shared memory in this process
      mapped_region region
         (shm                       //What to map
         ,read_write //Map it as read-write
         );

      //Get the address of the mapped region
      void * addr       = region.get_address();

      //Construct the shared structure in memory
      trace_queue * data = new (addr) trace_queue;

      const int NumMsg = 100;

      for(int i = 0; i < NumMsg; ++i){
         scoped_lock<interprocess_mutex> lock(data->mutex);
         if(data->message_in){
            data->cond_full.wait(lock);
         }
         if(i == (NumMsg-1))
            std::sprintf(data->items, "%s", "last message");
         else
            std::sprintf(data->items, "%s_%d", "my_trace", i);

         //Notify to the other process that there is a message
         data->cond_empty.notify_one();

         //Mark message buffer as full
         data->message_in = true;
      }
   }
   catch(interprocess_exception &ex){
      std::cout << ex.what() << std::endl;
      return 1;
   }

   return 0;
}

Second process:

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstring>
#include "doc_anonymous_condition_shared_data.hpp"

using namespace boost::interprocess;

int main ()
{
   //Create a shared memory object.
   shared_memory_object shm
      (open_only                    //only create
      ,"MySharedMemory"              //name
      ,read_write                   //read-write mode
      );

   try{
      //Map the whole shared memory in this process
      mapped_region region
         (shm                       //What to map
         ,read_write //Map it as read-write
         );

      //Get the address of the mapped region
      void * addr       = region.get_address();

      //Obtain a pointer to the shared structure
      trace_queue * data = static_cast<trace_queue*>(addr);

      //Print messages until the other process marks the end
      bool end_loop = false;
      do{
         scoped_lock<interprocess_mutex> lock(data->mutex);
         if(!data->message_in){
            data->cond_empty.wait(lock);
         }
         if(std::strcmp(data->items, "last message") == 0){
            end_loop = true;
         }
         else{
            //Print the message
            std::cout << data->items << std::endl;
            //Notify the other process that the buffer is empty
            data->message_in = false;
            data->cond_full.notify_one();
         }
      }
      while(!end_loop);
   }
   catch(interprocess_exception &ex){
      std::cout << ex.what() << std::endl;
      return 1;
   }

   return 0;
}

I'd like to replace char items[LineSize]; to a more convenient string in the trace_queue struct.

How can I do that without the Managed Shared Memory?

Or this is somewhat completely not recommended to do without the managed Boost libraries?

Upvotes: 1

Views: 313

Answers (1)

sehe
sehe

Reputation: 392893

Or this is somewhat completely not recommended to do without the managed Boost libraries?

I cannot recommend it. It's fine to do it unmanaged, but I'd 100% suggest the exact approach they gave with the fixed char array. What's wrong with that?

You cannot have your cake and eat it. You can't wish for "highlevel dynamic strings" and "no heap management overhead" magically at the same time.

That said, you may be able to find some trade-offs. Specifically, you might want to emulate something like a polymorphic memory resource in such a shared byte array. Then you could use std::pmr::string on top of that. Tragedy has it that memory_resource isn't shared-memory safe.

SIMPLIFY

However, I suppose all you need is some nice abstraction, where the interface is using C++ vocabulary types. Why not simplfy the entire deal to that point?

Here's a quick draft:

struct trace_queue {
  private:
    bip::interprocess_mutex     mutex;
    bip::interprocess_condition cond;

    std::array<char, 300> buffer{};
    bool message_in{false}; // Is there any message

    auto wait(bool state) {
        bip::scoped_lock lock(mutex);
        cond.wait(lock, [=,this] { return message_in == state; });
        return lock;
    }

  public:
    void send(std::string_view msg) {
        auto lock = wait(false); // !message_in

        auto n = std::min(buffer.size(), msg.size());
        std::fill(buffer.begin(), buffer.end(), '\0');
        std::copy_n(msg.data(), n, buffer.begin());

        message_in = true;
        cond.notify_one();
    }

    std::string receive() {
        auto lock = wait(true); // message_in

        std::string msg(buffer.data(), strnlen(buffer.data(), buffer.size()));

        message_in = false;
        cond.notify_one();
        return msg;
    }
};

In my opinion the code is already easier to read. And it's certainly easier to use! The entire server side:

// Create a shared memory object.
bip::shared_memory_object shm(bip::create_only, "MySharedMemory",
                              bip::read_write);
shm.truncate(sizeof(trace_queue));

// Map the whole shared memory in this process
bip::mapped_region region(shm, bip::read_write);
trace_queue& data = *new (region.get_address()) trace_queue;

for (int i = 0; i < 99; ++i)
    data.send("my_trace_" + std::to_string(i));

data.send("TEARDOWN");

And the client side:

bip::shared_memory_object shm(bip::open_only, "MySharedMemory",
                              bip::read_write);
bip::mapped_region region(shm, bip::read_write);
trace_queue& data = *static_cast<trace_queue*>(region.get_address());

while (true) {
    auto msg = data.receive();
    if (msg == "TEARDOWN")
        break;
    std::cout << msg << "\n";
};

See it Live On Coliru

#include <array>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>

namespace bip = boost::interprocess;

struct trace_queue {
  private:
    bip::interprocess_mutex     mutex;
    bip::interprocess_condition cond;

    std::array<char, 300> buffer{};
    bool message_in{false}; // Is there any message

    auto wait(bool state) {
        bip::scoped_lock lock(mutex);
        cond.wait(lock, [=,this] { return message_in == state; });
        return lock;
    }

  public:
    void send(std::string_view msg) {
        auto lock = wait(false); // !message_in

        auto n = std::min(buffer.size(), msg.size());
        std::fill(buffer.begin(), buffer.end(), '\0');
        std::copy_n(msg.data(), n, buffer.begin());

        message_in = true;
        cond.notify_one();
    }

    std::string receive() {
        auto lock = wait(true); // message_in

        std::string msg(buffer.data(), strnlen(buffer.data(), buffer.size()));

        message_in = false;
        cond.notify_one();
        return msg;
    }
};

int main(int argc, char**) {
    try {
        if (argc < 2) {
            // Erase previous shared memory and schedule erasure on exit
            struct shm_remove {
                shm_remove() { bip::shared_memory_object::remove("MySharedMemory"); }
                ~shm_remove() { bip::shared_memory_object::remove("MySharedMemory"); }
            } remover;

            // Create a shared memory object.
            bip::shared_memory_object shm(bip::create_only, "MySharedMemory",
                                          bip::read_write);
            shm.truncate(sizeof(trace_queue));

            // Map the whole shared memory in this process
            bip::mapped_region region(shm, bip::read_write);
            trace_queue& data = *new (region.get_address()) trace_queue;

            for (int i = 0; i < 99; ++i)
                data.send("my_trace_" + std::to_string(i));

            data.send("TEARDOWN");
        } else {
            bip::shared_memory_object shm(bip::open_only, "MySharedMemory",
                                          bip::read_write);
            bip::mapped_region region(shm, bip::read_write);
            trace_queue& data = *static_cast<trace_queue*>(region.get_address());

            while (true) {
                auto msg = data.receive();
                if (msg == "TEARDOWN")
                    break;
                std::cout << msg << "\n";
            };
        }
    } catch (std::exception const& ex) {
        std::cout << ex.what() << std::endl;
        return 1;
    }
}

Output, as expected:

enter image description here

Upvotes: 1

Related Questions