Reputation:
I'm using boosts interprocess library to share memory between multiple programs. 3 or 4 other programs will be reading from and writing to the shared memory contents in cust_order. Access to the memory space needs to be serialized.
In the example program below, engine loops over contents of risk_queue and if populated, takes the first cust_order number and locates that order for processing.
in objects.h header file used by every program:
struct cust_order {
int ID;
char CLID[128];
int CUST_ID;
char ORDER_STATUS;
};
cust_order o;
boost::interprocess::managed_shared_memory
cust_order_segment(boost::interprocess::open_or_create, "cust_order",
65536 * 100);
typedef int cust_order_KeyType;
typedef cust_order cust_order_MappedType;
typedef std::pair<const int, cust_order> cust_order_ValueType;
typedef boost::interprocess::allocator<
cust_order_ValueType,
boost::interprocess::managed_shared_memory::segment_manager>
cust_order_ShmemAllocator;
typedef boost::interprocess::map<cust_order_KeyType, cust_order_MappedType,
std::less<cust_order_KeyType>,
cust_order_ShmemAllocator>
cust_order_MySHMMap;
cust_order_MySHMMap::iterator cust_order_iter;
boost::interprocess::managed_shared_memory
risk_queue_segment(boost::interprocess::open_or_create, "risk_queue",
65536 * 100);
typedef int risk_queue_KeyType;
typedef int risk_queue_MappedType;
typedef std::pair<const int, int> risk_queue_ValueType;
typedef boost::interprocess::allocator<
risk_queue_ValueType,
boost::interprocess::managed_shared_memory::segment_manager>
risk_queue_ShmemAllocator;
typedef boost::interprocess::map<risk_queue_KeyType, risk_queue_MappedType,
std::less<risk_queue_KeyType>,
risk_queue_ShmemAllocator>
risk_queue_MySHMMap;
risk_queue_MySHMMap::iterator risk_queue_iter;
in engine.cpp:
int main() {
risk_queue_ShmemAllocator risk_queue_alloc_inst(
risk_queue_segment.get_segment_manager());
cust_order_ShmemAllocator cust_order_alloc_inst(
cust_order_segment.get_segment_manager());
for (; 0 < 1;) {
boost::interprocess::offset_ptr<risk_queue_MySHMMap> risk_queue_m_pmap =
risk_queue_segment.find<risk_queue_MySHMMap>("risk_queue").first;
boost::interprocess::offset_ptr<cust_order_MySHMMap> cust_order_m_pmap =
cust_order_segment.find<cust_order_MySHMMap>("cust_order").first;
risk_queue_iter = risk_queue_m_pmap->begin();
if (risk_queue_iter != risk_queue_m_pmap->end()) {
ordid = risk_queue_iter->second;
cust_order_m_pmap->find(ordid)->second = o;
o.ORDER_STATUS = '0';
o = cust_order_m_pmap->find(ordid)->second;
risk_queue_m_pmap->erase(ordid);
}
};
return 0;
}
This is the error i get after a few seconds of the programs running perfectly:
engine: /usr/include/boost/intrusive/bstree.hpp:1331: boost::intrusive::bstree_impl<ValueTraits, VoidOrKeyOfValue, VoidOrKeyComp, SizeType, ConstantTimeSize, AlgoType, HeaderHolder>::iterator boost::intrusive::bstree_impl<ValueTraits, VoidOrKeyOfValue, VoidOrKeyComp, SizeType, ConstantTimeSize, AlgoType, HeaderHolder>::insert_unique_commit(boost::intrusive::bstree_impl<ValueTraits, VoidOrKeyOfValue, VoidOrKeyComp, SizeType, ConstantTimeSize, AlgoType, HeaderHolder>::reference, const insert_commit_data&) [with ValueTraits = boost::intrusive::bhtraits<boost::container::container_detail::tree_node<std::pair<const int, event>, boost::interprocess::offset_ptr<void>, (boost::container::tree_type_enum)0, true>, boost::intrusive::rbtree_node_traits<boost::interprocess::offset_ptr<void>, true>, (boost::intrusive::link_mode_type)0, boost::intrusive::dft_tag, 3>; VoidOrKeyOfValue = void; VoidOrKeyComp = boost::container::value_to_node_compare<boost::container::container_detail::tree_node<std::pair<const int, event>, boost::interprocess::offset_ptr<void>, (boost::container::tree_type_enum)0, true>, boost::intrusive::tree_value_compare<boost::interprocess::offset_ptr<std::pair<const int, event>, long int, long unsigned int, 0>, std::less<int>, boost::container::container_detail::select1st<int>, false> >; SizeType = long unsigned int; bool ConstantTimeSize = true; boost::intrusive::algo_types AlgoType = (boost::intrusive::algo_types)5; HeaderHolder = void; boost::intrusive::bstree_impl<ValueTraits, VoidOrKeyOfValue, VoidOrKeyComp, SizeType, ConstantTimeSize, AlgoType, HeaderHolder>::iterator = boost::intrusive::tree_iterator<boost::intrusive::bhtraits<boost::container::container_detail::tree_node<std::pair<const int, event>, boost::interprocess::offset_ptr<void>, (boost::container::tree_type_enum)0, true>, boost::intrusive::rbtree_node_traits<boost::interprocess::offset_ptr<void>, true>, (boost::intrusive::link_mode_type)0, boost::intrusive::dft_tag, 3>, false>; boost::intrusive::bstree_impl<ValueTraits, VoidOrKeyOfValue, VoidOrKeyComp, SizeType, ConstantTimeSize, AlgoType, HeaderHolder>::reference = boost::container::container_detail::tree_node<std::pair<const int, event>, boost::interprocess::offset_ptr<void>, (boost::container::tree_type_enum)0, true>&; boost::intrusive::bstree_impl<ValueTraits, VoidOrKeyOfValue, VoidOrKeyComp, SizeType, ConstantTimeSize, AlgoType, HeaderHolder>::insert_commit_data = boost::intrusive::insert_commit_data_t<boost::interprocess::offset_ptr<boost::intrusive::compact_rbtree_node<boost::interprocess::offset_ptr<void> >, long int, long unsigned int, 0> >]: Assertion `( p == this->end() || !this->comp()(*p, value) )' failed.
Aborted (core dumped)
Please could you help me to understand the error. And let me know, if there is a better way for me to do what I need to do.
Upvotes: 1
Views: 720
Reputation: 393769
Okay, so I looked over your code. You said it yourself:
Access to the memory space needs to be serialized.
So, why don't you?
Simplifying the header:
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/map.hpp>
struct cust_order {
int ID;
char CLID[128];
int CUST_ID;
char ORDER_STATUS;
};
namespace bip = boost::interprocess;
namespace Shared {
using Segment = bip::managed_shared_memory;
using Manager = Segment::segment_manager;
template <typename T> using Alloc = bip::allocator<T, Manager>;
template <typename K, typename V, typename Cmp = std::less<K> >
using Map = bip::map<K, V, Cmp, Alloc<typename bip::map<K, V>::value_type> >;
}
using OrderTable = Shared::Map<int, cust_order>;
using RiskQueue = Shared::Map<int, int>;
Simplify the main:
int main() {
Shared::Segment cust_order_segment(bip::open_or_create, "cust_order", 65536 * 100);
Shared::Segment risk_queue_segment(bip::open_or_create, "risk_queue", 65536 * 100);
auto risk_queue = risk_queue_segment.find_or_construct<RiskQueue>("risk_queue")(risk_queue_segment.get_segment_manager());
auto order_table = cust_order_segment.find_or_construct<OrderTable>("cust_order")(cust_order_segment.get_segment_manager());
while (true) {
while (!risk_queue->empty()) {
auto it = risk_queue->begin();
auto ordid = it->second;
order_table->at(ordid).ORDER_STATUS = '0';
risk_queue->erase(ordid);
}
}
}
Most notable things:
Most interestingly, since RiskQueue
is ordered by the key, and treated as a queue, we must assume that the Key type indicates risk/priority.
The mapped type clearly is an order ID, which refers to the customer orders table. So far so good.
But then, you do
risk_queue->erase(ordid);
This is completely baffling, because that treats the order id as if it were the key to the "risk queue"? Queues don't have keys, we just concluded that the "key" of the risk indicated priority (so it's not an order id).
Even guessing that it should have been order_table->erase(ordid)
, doesn't really make a lot of sense, because then what is the purpose of setting the order status right before deleting.
I can only conclude that line was a bug and needed to be
risk_queue->erase(it);
Of course, none of that matters until the synchronization is taken care of.
If you have many order processing clients, I'd suggest you could use a semaphore instead. Everything will become a lot safer and also less wasteful of CPU power.
If you really MUST have lockless data access, make sure the data structures are in fact lockfree and also make sure that you maintain the consistency (that's a hard problem since you decided to split the data over two structures).
Upvotes: 1