Reputation:
I'm using boosts interprocess library to share memory between multiple programs. 3 or 4 other programs will be reading from and writing to the shared memory contents in cust_order. Access to the memory space needs to be serialized.
In the example program below, engine loops over contents of risk_queue and if populated, takes the first cust_order number and locates that order for processing.
in objects.h header file used by every program:
struct cust_order {
int ID;
char CLID[128];
int CUST_ID;
char ORDER_STATUS;
};
cust_order o;
boost::interprocess::managed_shared_memory
cust_order_segment(boost::interprocess::open_or_create, "cust_order",
65536 * 100);
typedef int cust_order_KeyType;
typedef cust_order cust_order_MappedType;
typedef std::pair<const int, cust_order> cust_order_ValueType;
typedef boost::interprocess::allocator<
cust_order_ValueType,
boost::interprocess::managed_shared_memory::segment_manager>
cust_order_ShmemAllocator;
typedef boost::interprocess::map<cust_order_KeyType, cust_order_MappedType,
std::less<cust_order_KeyType>,
cust_order_ShmemAllocator>
cust_order_MySHMMap;
cust_order_MySHMMap::iterator cust_order_iter;
boost::interprocess::managed_shared_memory
risk_queue_segment(boost::interprocess::open_or_create, "risk_queue",
65536 * 100);
typedef int risk_queue_KeyType;
typedef int risk_queue_MappedType;
typedef std::pair<const int, int> risk_queue_ValueType;
typedef boost::interprocess::allocator<
risk_queue_ValueType,
boost::interprocess::managed_shared_memory::segment_manager>
risk_queue_ShmemAllocator;
typedef boost::interprocess::map<risk_queue_KeyType, risk_queue_MappedType,
std::less<risk_queue_KeyType>,
risk_queue_ShmemAllocator>
risk_queue_MySHMMap;
risk_queue_MySHMMap::iterator risk_queue_iter;
in engine.cpp:
int main() {
risk_queue_ShmemAllocator risk_queue_alloc_inst(
risk_queue_segment.get_segment_manager());
cust_order_ShmemAllocator cust_order_alloc_inst(
cust_order_segment.get_segment_manager());
for (; 0 < 1;) {
boost::interprocess::offset_ptr<risk_queue_MySHMMap> risk_queue_m_pmap =
risk_queue_segment.find<risk_queue_MySHMMap>("risk_queue").first;
boost::interprocess::offset_ptr<cust_order_MySHMMap> cust_order_m_pmap =
cust_order_segment.find<cust_order_MySHMMap>("cust_order").first;
risk_queue_iter = risk_queue_m_pmap->begin();
if (risk_queue_iter != risk_queue_m_pmap->end()) {
ordid = risk_queue_iter->second;
cust_order_m_pmap->find(ordid)->second = o;
o.ORDER_STATUS = '0';
o = cust_order_m_pmap->find(ordid)->second;
risk_queue_m_pmap->erase(ordid);
}
};
return 0;
}
This is the error i get after a few seconds of the programs running perfectly:
engine: /usr/include/boost/intrusive/bstree.hpp:1331: boost::intrusive::bstree_impl<ValueTraits, VoidOrKeyOfValue, VoidOrKeyComp, SizeType, ConstantTimeSize, AlgoType, HeaderHolder>::iterator boost::intrusive::bstree_impl<ValueTraits, VoidOrKeyOfValue, VoidOrKeyComp, SizeType, ConstantTimeSize, AlgoType, HeaderHolder>::insert_unique_commit(boost::intrusive::bstree_impl<ValueTraits, VoidOrKeyOfValue, VoidOrKeyComp, SizeType, ConstantTimeSize, AlgoType, HeaderHolder>::reference, const insert_commit_data&) [with ValueTraits = boost::intrusive::bhtraits<boost::container::container_detail::tree_node<std::pair<const int, event>, boost::interprocess::offset_ptr<void>, (boost::container::tree_type_enum)0, true>, boost::intrusive::rbtree_node_traits<boost::interprocess::offset_ptr<void>, true>, (boost::intrusive::link_mode_type)0, boost::intrusive::dft_tag, 3>; VoidOrKeyOfValue = void; VoidOrKeyComp = boost::container::value_to_node_compare<boost::container::container_detail::tree_node<std::pair<const int, event>, boost::interprocess::offset_ptr<void>, (boost::container::tree_type_enum)0, true>, boost::intrusive::tree_value_compare<boost::interprocess::offset_ptr<std::pair<const int, event>, long int, long unsigned int, 0>, std::less<int>, boost::container::container_detail::select1st<int>, false> >; SizeType = long unsigned int; bool ConstantTimeSize = true; boost::intrusive::algo_types AlgoType = (boost::intrusive::algo_types)5; HeaderHolder = void; boost::intrusive::bstree_impl<ValueTraits, VoidOrKeyOfValue, VoidOrKeyComp, SizeType, ConstantTimeSize, AlgoType, HeaderHolder>::iterator = boost::intrusive::tree_iterator<boost::intrusive::bhtraits<boost::container::container_detail::tree_node<std::pair<const int, event>, boost::interprocess::offset_ptr<void>, (boost::container::tree_type_enum)0, true>, boost::intrusive::rbtree_node_traits<boost::interprocess::offset_ptr<void>, true>, (boost::intrusive::link_mode_type)0, boost::intrusive::dft_tag, 3>, false>; boost::intrusive::bstree_impl<ValueTraits, VoidOrKeyOfValue, VoidOrKeyComp, SizeType, ConstantTimeSize, AlgoType, HeaderHolder>::reference = boost::container::container_detail::tree_node<std::pair<const int, event>, boost::interprocess::offset_ptr<void>, (boost::container::tree_type_enum)0, true>&; boost::intrusive::bstree_impl<ValueTraits, VoidOrKeyOfValue, VoidOrKeyComp, SizeType, ConstantTimeSize, AlgoType, HeaderHolder>::insert_commit_data = boost::intrusive::insert_commit_data_t<boost::interprocess::offset_ptr<boost::intrusive::compact_rbtree_node<boost::interprocess::offset_ptr<void> >, long int, long unsigned int, 0> >]: Assertion `( p == this->end() || !this->comp()(*p, value) )' failed.
Aborted (core dumped)
Please could you help me to understand the error. And let me know, if there is a better way for me to do what I need to do.
Upvotes: 1
Views: 928
Reputation: 392843
but to make this work i must allocate memory within the loop
I don't see why that would be true. Your shared memory size capacity is limited to 64kb (65536 bytes), so that's a no-brainer: allocate it once, upfront.
My application is low latency oriented, and I wonder if there is a way to read updated objects in the memory without constantly reallocating in every loop iteration?
Yes. Index into a region of shared memory mapped into your process address space, without ever allocating.
Below is the code that I must place inside the loop in order to receive updates from the other programs accessing the same memory space.
The code you show doesn't allocate a thing. It locates an already-allocated (and constructed) object inside a managed segment. And returns a pointer to it.
If I place that outside the loop, I will not receive updates when other programs manipulate or add data in the shared memory space
Ahhh! You don't tell us what X_MySHMMap
is like, but I can start to guess it might contain something like std::array<char, N>
or another container that you loop.
If you do un-synchronized access to that object from multiple processes, you enter a Data Race. The compiler has a memory model (see that same page) which allows it to reorder memory operations and eliminate load/store cycles. E.g. if you write
static bool ready = false;
int main() {
while (!ready) {
std::cout << "waiting...\n";
}
}
The compiler sees this as a guaranteed infinite loop. Imagining that ready
was in shared memory, there would be no difference to the compiler at all. You'd have to add synchronization (by using mutual exclusion, mutex+condition variable, atomics or in simplistic cases you can mark the object volatile
).
My guess is that when you find
the object /again/ each time in the loop, you accidentally synchronize between the processes (because the segment manager uses mutual exclusion to synchronize access to the segment metadata), and that's what makes the compiler emit new reads.
You don't supply enough code to actually show a fix for you. I think I can tell that managed_shared_memory
might be overcomplicating things a bit. You could probably use a simple shared_memory_object
(which doesn't have the overhead of a segment manager), and just make sure to let the compiler know that your data is volatile/concurrently accessed from another "thread"¹. This keeps it from optimizing away loads that you require.
Simplistic example:
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <array>
#include <atomic>
using SharedBuffer = std::array<std::atomic_char, 65536>;
static_assert(std::is_pod<SharedBuffer>{}, "assumed POD"); // Warning: trait is not correct on some versions of MSVC
namespace bip = boost::interprocess;
int main() {
bip::shared_memory_object smo(bip::open_or_create, "yoho", bip::mode_t::read_write);
smo.truncate(sizeof(SharedBuffer));
bip::mapped_region region(smo, bip::mode_t::read_write, 0, 65536);
auto& buffer = *reinterpret_cast<SharedBuffer*>(region.get_address());
// do something with that buffer
std::fill(buffer.begin(), buffer.end(), 0); // start with all zeroes
// loop over it or something
}
¹ (it makes no difference that the thread is in another process) ² Coliru doesn't allow shared memory access
Upvotes: 1