Eduard Rostomyan
Eduard Rostomyan

Reputation: 6566

Signal from one process to another C++

I know the title is a bit broad so let me elaborate.
I have 2 processes running, one is writing into the shared memory, the other is reading from it.
To achieve shared memory effect I am using boost::interprocess (btw let me know if there are more convenient libraries).

So I implemented the following:

//Writer

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>

namespace ip = boost::interprocess;
class SharedMemory
{
public:
    template<typename OpenOrCreate>
    SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode, size_t size) :
        name_(name),
        sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode, size))
    {
    }

    template<typename OpenOrCreate>
    SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode) :
        name_(name),
        sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode))
    {
    }

    std::shared_ptr<ip::windows_shared_memory> getSM()
    {
        return sm_;
    }
private:
    std::function<void()> destroyer_;
    std::string name_;
    std::shared_ptr<ip::windows_shared_memory> sm_;
};


int main()
{
    SharedMemory creator(ip::create_only, "SharedMemory", ip::read_write, 10);
    ip::mapped_region region(*creator.getSM(), ip::read_write);
    std::memset(region.get_address(), 1, region.get_size());

    int status = system("reader.exe");
    std::cout << status << std::endl;
}

So I am creating shared memory, writing 1 to it then calling the reader exe. (I skip the reader part as its pretty much the same but instead of write it reads)

This code works fine, I write into memory and the other process reads it and prints my 1's.
But what if I have this 2 exes running at the same time and I want to write into memory then notify the other process that there is an update? How to signal from one exe/process to another?

The scenario is that I am streaming some live data, writing into memory and then telling the other process that there is an update.

Upvotes: 3

Views: 1205

Answers (2)

Qaler
Qaler

Reputation: 11

For error C2664 under msvc mentioned above, it could be solved by changing

data_.emplace_back(std::move(message));

to:

data_.emplace_back(std::move(message.data()));

Hope it could help anyone.

Upvotes: 1

sehe
sehe

Reputation: 393799

I think there are more convenient approaches indeed.

In principle to synchronize between processes you use all the same approaches as synchronizing inside a process (between threads): using synchronization primitives (mutex/critical section, condition variable, semaphores, barriers etc.).

In addition, you need to have a data structure that you synchronize. This is precisely the Achilles' heel at the moment. There is a total absence of data structure here.

Though you can do raw byte access with your own logic, I don't see the appeal of using a high-level library in doing so. Instead I'd use a managed memory segment, that lets you find or construct typed objects by name. This may include your synchronization primitives.

In fact, you can expedite the process by using a message_queue which has all the synchronization already built-in.

Manual Sync: Writer using Segment Manager

I'll provide portable code because I donot have a windows machine. First let's think of a datastructure. A simple example would be a queue of messages. Let's use a deque<string>.

Not exactly trivial data structures, but the great news is that Boost Interprocess comes with all the nuts and bolts to make things work (using interprocess allocators).

namespace Shared {

    using Segment = ip::managed_shared_memory;
    using Mgr     = Segment::segment_manager;
    template <typename T>
    using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
    template <typename T> using Deque = bc::deque<T, Alloc<T>>;
    using String = bc::basic_string<char, std::char_traits<char>, Alloc<char>>;

    using DataStructure = Deque<String>;

    class Memory {
      public:
        Memory(const char* name, size_t size)
            : name_(name)
            , sm_(ip::open_or_create, name, size)
            , data_(*sm_.find_or_construct<DataStructure>("data")(
                  sm_.get_segment_manager()))
        {
        }

        DataStructure&       get()       { return data_; } 
        DataStructure const& get() const { return data_; } 

      private:
        std::string    name_;
        Segment        sm_;
        DataStructure& data_;
    };

} // namespace Shared

There, now we can have the writer be something like:

int main()
{
    Shared::Memory creator("SharedMemory", 10*1024*1024);

    creator.get().emplace_back("Hello");
    creator.get().emplace_back("World");

    std::cout << "Total queued: " << creator.get().size() << "\n";
}

Which will print e.g.

Total queued: 2
Total queued: 4
Total queued: 6

Depending on the number of times you ran it.

The Reader side

Now lets do the reader side. In fact it's so much the same, let's put it in the same main program:

int main(int argc, char**)
{
    Shared::Memory mem("SharedMemory", 10*1024*1024);
    auto& data = mem.get();

    bool is_reader = argc > 1;

    if (not is_reader) {
        data.emplace_back("Hello");
        data.emplace_back("World");
        std::cout << "Total queued: " << data.size() << "\n";
    } else {
        std::cout << "Found entries: " << data.size() << "\n";
        while (!data.empty()) {
            std::cout << "Dequeued " << data.front() << "\n";
            data.pop_front();
        }
    }

}

Simple for a start. Now running e.g. test.exe READER will conversely print something like:

enter image description here

Locking & Synchronization

The goal is to run writer and reader concurrently. That's not safe as it is now, because of a lack of locking and synchronization. Let's add it:

class Memory {
    static constexpr size_t max_capacity = 100;
  public:
    Memory(const char* name, size_t size)
        : name_(name)
        , sm_(ip::open_or_create, name, size)
        , mx_(*sm_.find_or_construct<Mutex>("mutex")())
        , cv_(*sm_.find_or_construct<Cond>("condition")())
        , data_(*sm_.find_or_construct<DataStructure>("data")(
              sm_.get_segment_manager()))
    { }

    // ... 

  private:
    std::string    name_;
    Segment        sm_;
    Mutex&         mx_;
    Cond&          cv_;
    DataStructure& data_;
};

Now let's be careful. Because we want all operations on the data_ queue to be synchronized, we shall not expose it as we did before (with the get() member function). Instead we expose the exact interface of operations we support:

size_t queue_length() const;
void enqueue(std::string message); // blocking when queue at max_capacity
std::string dequeue();             // blocking dequeue
std::optional<std::string> try_dequeue(); // non-blocking dequeue

These all do the locking as required, simply as you'd expect:

size_t queue_length() const {
    ip::scoped_lock<Mutex> lk(mx_);
    return data_.size();
}

It gets more interesting on the potentially blocking operations. I chose to have a maximum capacity, so enqueue needs to wait for capacity:

// blocking when queue at max_capacity
void enqueue(std::string message) {
    ip::scoped_lock<Mutex> lk(mx_);
    cv_.wait(lk, [this] { return data_.size() < max_capacity; });

    data_.emplace_back(std::move(message));
    cv_.notify_one();
}

Conversely, dequeue needs to wait for a message to become available:

// blocking dequeue
std::string dequeue() {
    ip::scoped_lock<Mutex> lk(mx_);
    cv_.wait(lk, [this] { return not data_.empty(); });

    return do_pop();
}

Alternatively, you could make it non-blocking, just optionally returning a value:

// non-blocking dequeue
std::optional<std::string> try_dequeue() {
    ip::scoped_lock<Mutex> lk(mx_);

    if (data_.empty())
        return std::nullopt;
    return do_pop();
}

Now in main let's have three versions: writer, reader and continuous reader (where the latter demonstrates the blocking interface):

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition_any.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>

#include <boost/container/scoped_allocator.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/containers/string.hpp>

#include <iostream>
#include <iomanip>
#include <optional>

namespace ip = boost::interprocess;
namespace bc = boost::container;

namespace Shared {

    using Segment = ip::managed_shared_memory;
    using Mgr     = Segment::segment_manager;
    template <typename T>
    using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
    template <typename T> using Deque = ip::deque<T, Alloc<T>>;
    using String = ip::basic_string<char, std::char_traits<char>, Alloc<char>>;

    using DataStructure = Deque<String>;
    using Mutex         = ip::interprocess_mutex;
    using Cond          = ip::interprocess_condition;

    class Memory {
        static constexpr size_t max_capacity = 100;
      public:
        Memory(const char* name, size_t size)
            : name_(name)
            , sm_(ip::open_or_create, name, size)
            , mx_(*sm_.find_or_construct<Mutex>("mutex")())
            , cv_(*sm_.find_or_construct<Cond>("condition")())
            , data_(*sm_.find_or_construct<DataStructure>("data")(
                  sm_.get_segment_manager()))
        { }

        size_t queue_length() const {
            ip::scoped_lock<Mutex> lk(mx_);
            return data_.size(); // caution: racy by design!
        }

        // blocking when queue at max_capacity
        void enqueue(std::string message) {
            ip::scoped_lock<Mutex> lk(mx_);
            cv_.wait(lk, [this] { return data_.size() < max_capacity; });

            data_.emplace_back(std::move(message));

            cv_.notify_one();
        }

        // blocking dequeue
        std::string dequeue() {
            ip::scoped_lock<Mutex> lk(mx_);
            cv_.wait(lk, [this] { return not data_.empty(); });

            return do_pop();
        }

        // non-blocking dequeue
        std::optional<std::string> try_dequeue() {
            ip::scoped_lock<Mutex> lk(mx_);

            if (data_.empty())
                return std::nullopt;
            return do_pop();
        }

      private:
        std::string    name_;
        Segment        sm_;
        Mutex&         mx_;
        Cond&          cv_;
        DataStructure& data_;

        // Assumes mx_ locked by current thread!
        std::string do_pop() {
            auto&& tmp = std::move(data_.front());
            data_.pop_front();
            cv_.notify_all(); // any of the waiters might be a/the writer
            return std::string(tmp.begin(), tmp.end());
        }
    };

} // namespace Shared

int main(int argc, char**)
{
    Shared::Memory mem("SharedMemory", 10*1024*1024);

    switch (argc) {
    case 1:
        mem.enqueue("Hello");
        mem.enqueue("World");
        std::cout << "Total queued: " << mem.queue_length() << "\n";
        break;
    case 2:
        std::cout << "Found entries: " << mem.queue_length() << "\n";
        while (auto msg = mem.try_dequeue()) {
            std::cout << "Dequeued " << *msg << "\n";
        }
        break;
    case 3: 
        std::cout << "Continuous reader\n";
        while (true) {
            std::cout << "Dequeued " << mem.dequeue() << "\n";
        }
        break;
    }
}

Little demo:

enter image description here

Summary, Caution

Note there are some loose ends with the above. Notably, the absence of robust locks in Boost Interprocess needs some extra care for proper shutdown without holding the lock.

I'd suggest to contrast with ip::message_queue as well:

Upvotes: 6

Related Questions