avernus
avernus

Reputation: 326

Race condition in tbb::concurrent_queue

I am having a data race problem on tbb::concurrent_queue. Here is the code, producer and consumer functions are running on separate threads:

tbb::concurrent_queue<std::string> valid_json_queue;

Consumer:

...
std::string valid_json;
while(1) {    
    while(valid_json_queue.try_pop(valid_json)) {
        json_manager_.parseJson(valid_json);
    }
    
    json_manager_.persist();
    std::this_thread::sleep_for(std::chrono::seconds(5));
}
...

Producer:

...read data from socket via boost async_read_some...

for(const auto& valid_json : valid_jsons) {
    valid_json_queue.push(valid_json);
}

...

I have ran the code with -fsanitizer=thread. Here is the relevant part from the output:

WARNING: ThreadSanitizer: data race (pid=21801)

...
WARNING: ThreadSanitizer: data race (pid=21801)
  Write of size 8 at 0x7f861e405c00 by main thread:
    #0 tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page::padded_page() /usr/local/include/oneapi/tbb/detail/_concurrent_queue_base.h:80 (dropcopy_mysql_adapter+0x4b77c1)
    #1 decltype (::new ((void*)(0)) tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page()) std::construct_at<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page>(tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page*) /usr/local/include/c++/10.1.0/bits/stl_construct.h:97 (dropcopy_mysql_adapter+0x4b6c76)
    #2 std::enable_if<std::__and_<std::__and_<std::__not_<std::allocator_traits<tbb::detail::d1::cache_aligned_allocator<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page> >::__construct_helper<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page>::type>, std::is_constructible<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page> > >::value, void>::type std::allocator_traits<tbb::detail::d1::cache_aligned_allocator<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page> >::_S_construct<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page>(tbb::detail::d1::cache_aligned_allocator<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page>&, tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page*) /usr/local/include/c++/10.1.0/bits/alloc_traits.h:259 (dropcopy_mysql_adapter+0x4b6cac)
    #3 decltype (_S_construct({parm#1}, {parm#2})) std::allocator_traits<tbb::detail::d1::cache_aligned_allocator<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page> >::construct<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page>(tbb::detail::d1::cache_aligned_allocator<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page>&, tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page*) /usr/local/include/c++/10.1.0/bits/alloc_traits.h:360 (dropcopy_mysql_adapter+0x4b5ab3)
    #4 tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::prepare_page(unsigned long, tbb::detail::d2::concurrent_queue_rep<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >&, tbb::detail::d1::cache_aligned_allocator<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page>, tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page*&) /usr/local/include/oneapi/tbb/detail/_concurrent_queue_base.h:123 (dropcopy_mysql_adapter+0x4b4466)
    #5 void tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::push<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&>(unsigned long, tbb::detail::d2::concurrent_queue_rep<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >&, tbb::detail::d1::cache_aligned_allocator<tbb::detail::d2::concurrent_queue_rep<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > >&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) /usr/local/include/oneapi/tbb/detail/_concurrent_queue_base.h:149 (dropcopy_mysql_adapter+0x4b3123)
    #6 void tbb::detail::d2::concurrent_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::internal_push<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) /usr/local/include/oneapi/tbb/concurrent_queue.h:177 (dropcopy_mysql_adapter+0x4b1b71)
    #7 tbb::detail::d2::concurrent_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::push(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) /usr/local/include/oneapi/tbb/concurrent_queue.h:119 (dropcopy_mysql_adapter+0x4b0447)
    #8 operator() /home/zhft_003/aydin_space/tools-dropcopy_mysql_adapter/client.cpp:81 (dropcopy_mysql_adapter+0x49e351)
    #9 operator() /usr/local/include/boost/asio/detail/bind_handler.hpp:182 (dropcopy_mysql_adapter+0x4a3318)
    #10 asio_handler_invoke<boost::asio::detail::binder2<Client::do_read()::<lambda(boost::system::error_code, size_t)>, boost::system::error_code, long unsigned int> > /usr/local/include/boost/asio/handler_invoke_hook.hpp:88 (dropcopy_mysql_adapter+0x4a2be0)
    #11 invoke<boost::asio::detail::binder2<Client::do_read()::<lambda(boost::system::error_code, size_t)>, boost::system::error_code, long unsigned int>, Client::do_read()::<lambda(boost::system::error_code, size_t)> > /usr/local/include/boost/asio/detail/handler_invoke_helpers.hpp:54 (dropcopy_mysql_adapter+0x4a26ca)
    #12 complete<boost::asio::detail::binder2<Client::do_read()::<lambda(boost::system::error_code, size_t)>, boost::system::error_code, long unsigned int> > /usr/local/include/boost/asio/detail/handler_work.hpp:501 (dropcopy_mysql_adapter+0x4a20d0)
    #13 do_complete /usr/local/include/boost/asio/detail/reactive_socket_recv_op.hpp:145 (dropcopy_mysql_adapter+0x4a1447)
    #14 boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) /usr/local/include/boost/asio/detail/scheduler_operation.hpp:40 (dropcopy_mysql_adapter+0x41040a)
    #15 boost::asio::detail::epoll_reactor::descriptor_state::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) /usr/local/include/boost/asio/detail/impl/epoll_reactor.ipp:776 (dropcopy_mysql_adapter+0x4aae53)
    #16 boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) /usr/local/include/boost/asio/detail/scheduler_operation.hpp:40 (dropcopy_mysql_adapter+0x41040a)
    #17 boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) /usr/local/include/boost/asio/detail/impl/scheduler.ipp:486 (dropcopy_mysql_adapter+0x413afc)
    #18 boost::asio::detail::scheduler::run(boost::system::error_code&) /usr/local/include/boost/asio/detail/impl/scheduler.ipp:204 (dropcopy_mysql_adapter+0x413511)
    #19 boost::asio::io_context::run() <null> (dropcopy_mysql_adapter+0x414000)
    #20 main /home/zhft_003/aydin_space/tools-dropcopy_mysql_adapter/main.cpp:59 (dropcopy_mysql_adapter+0x40ace6)


  Previous read of size 8 at 0x7f861e405c00 by thread T1:
    #0 tbb::detail::d2::micro_queue_pop_finalizer<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::padded_page> >::~micro_queue_pop_finalizer() /usr/local/include/oneapi/tbb/detail/_concurrent_queue_base.h:382 (dropcopy_mysql_adapter+0x4b4b6b)
    #1 tbb::detail::d2::micro_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::pop(void*, unsigned long, tbb::detail::d2::concurrent_queue_rep<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >&, tbb::detail::d1::cache_aligned_allocator<tbb::detail::d2::concurrent_queue_rep<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > >&) /usr/local/include/oneapi/tbb/detail/_concurrent_queue_base.h:188 (dropcopy_mysql_adapter+0x4b3479)
    #2 tbb::detail::d2::concurrent_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::internal_try_pop(void*) /usr/local/include/oneapi/tbb/concurrent_queue.h:193 (dropcopy_mysql_adapter+0x4b1f26)
    #3 tbb::detail::d2::concurrent_queue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, tbb::detail::d1::cache_aligned_allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::try_pop(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /usr/local/include/oneapi/tbb/concurrent_queue.h:135 (dropcopy_mysql_adapter+0x4b0629)
    #4 Client::jsonParseLoop() /home/zhft_003/aydin_space/tools-dropcopy_mysql_adapter/client.cpp:124 (dropcopy_mysql_adapter+0x49e875)
    #5 void std::__invoke_impl<void, void (Client::*)(), Client*>(std::__invoke_memfun_deref, void (Client::*&&)(), Client*&&) /usr/local/include/c++/10.1.0/bits/invoke.h:73 (dropcopy_mysql_adapter+0x4b941e)
    #6 std::__invoke_result<void (Client::*)(), Client*>::type std::__invoke<void (Client::*)(), Client*>(void (Client::*&&)(), Client*&&) /usr/local/include/c++/10.1.0/bits/invoke.h:95 (dropcopy_mysql_adapter+0x4b92e2)
    #7 void std::thread::_Invoker<std::tuple<void (Client::*)(), Client*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/local/include/c++/10.1.0/thread:264 (dropcopy_mysql_adapter+0x4b91f2)
    #8 std::thread::_Invoker<std::tuple<void (Client::*)(), Client*> >::operator()() /usr/local/include/c++/10.1.0/thread:271 (dropcopy_mysql_adapter+0x4b9182)
    #9 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (Client::*)(), Client*> > >::_M_run() /usr/local/include/c++/10.1.0/thread:215 (dropcopy_mysql_adapter+0x4b910c)
    #10 execute_native_thread_routine ../../../.././libstdc++-v3/src/c++11/thread.cc:80 (libstdc++.so.6+0xcdd5f)


  Thread T1 (tid=21803, running) created by main thread at:
    #0 pthread_create ../../.././libsanitizer/tsan/tsan_interceptors_posix.cpp:962 (libtsan.so.0+0x58b22)
    #1 __gthread_create /home/zhft_003/downloads/gcc/x86_64-pc-linux-gnu/libstdc++-v3/include/x86_64-pc-linux-gnu/bits/gthr-default.h:663 (libstdc++.so.6+0xcdfd4)
    #2 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) ../../../.././libstdc++-v3/src/c++11/thread.cc:135 (libstdc++.so.6+0xcdfd4)
    #3 std::thread std::jthread::_S_create<void (Client::*)(), Client*>(std::stop_source&, void (Client::*&&)(), Client*&&) /usr/local/include/c++/10.1.0/thread:557 (dropcopy_mysql_adapter+0x4b1852)
    #4 std::jthread::jthread<void (Client::*)(), Client*, void>(void (Client::*&&)(), Client*&&) /usr/local/include/c++/10.1.0/thread:455 (dropcopy_mysql_adapter+0x4b0031)
    #5 Client::Client(boost::asio::io_context&, boost::asio::ip::basic_resolver_results<boost::asio::ip::tcp> const&) /home/zhft_003/aydin_space/tools-dropcopy_mysql_adapter/client.cpp:18 (dropcopy_mysql_adapter+0x49d8f8)
    #6 main /home/zhft_003/aydin_space/tools-dropcopy_mysql_adapter/main.cpp:57 (dropcopy_mysql_adapter+0x40acd7)

client.cpp:124 is the line with while(valid_json_queue.try_pop(valid_json)), whereas client.cpp:81 is the line with valid_json_queue.push(valid_json);.

I thought I was safe using tbb::concurrent_queue in a multithreaded context but apparently I am doing something wrong here.

g++ version is 10.1.0

Upvotes: 0

Views: 293

Answers (1)

Noorjahan - Intel
Noorjahan - Intel

Reputation: 99

We observe data races when two or more threads in a single process access the same memory location concurrently.

Concurrent_queue doesn't create any parallelism at all: it just allows multiple threads to safely access the same queue. It ensures there are no race conditions.

We can use a mutex to synchronize the threads and to get rid of data race problems.

Refer to the below links for more details:

https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Concurrent_Queue_Classes.html https://spec.oneapi.io/versions/latest/elements/oneTBB/source/mutual_exclusion.html

Thanks & Regards, Noorjahan.

Upvotes: 0

Related Questions