Ersin 101
Ersin 101

Reputation: 83

Concurrent Array Checking

I am neither an expert in C++ nor concurrent programming. However, I am implementing a simple reasoning algorithm which requires checking many independent models. The number of possible models is huge, so I want to check them in parallel.

In order to make it as simple as possible, I transform my original problem into a very simple problem: How to determine if an array contains a non-zero value? A simple, sequential solution would be like this:

bool containsNonZero (int* arr, int len) {
    for (int i = 0; i < len; ++i)
        if (arr[i]) return true;
    return false;
}

(Note: In reality, len can not fit into an int, but in my original problem, there is no array, just many combinations that I generate but do not store.)

However, I need a parallel (and efficient) implementation. There are t = std::thread::hardware_concurrency() threads to search the array (Note that t << len. If len % t != 0 then it will not be a problem to make the last thread handle the remaining values). So the first thread will search the indices from 0 to len/t, the second thread will search the indices from len/t to (2*len)/t, etc. The last thread will search the indices from ((t-1)*len)/t to len. If a thread finds a non-zero value, all threads will stop and true will be returned. Otherwise, they will wait for the others to finish and false will be returned if all of the threads agree on it.

It seems very easy, but I couldn't find any answer on the web. Any C++ version is welcome, but I do not want to depend on any third-party library.

Upvotes: 3

Views: 725

Answers (2)

R2RT
R2RT

Reputation: 2146

I've tried to extend Davide Spataro's solution to address problem of synchronization of atomic<bool> using atomic_flag which 'Unlike all specializations of std::atomic, it is guaranteed to be lock-free' http://en.cppreference.com/w/cpp/atomic/atomic_flag

EDIT: Unrelevant to former question, but I've benchmarked which method is faster and what is surprise to me atomic<bool> has around 100 faster then atomic_flag.

Benchmark result:

num_threads:2
400000001 iterations flag
401386195 iterations flag
atomic_flag : it took 24.1202 seconds. Result: 1
400000001 iterations bool
375842699 iterations bool
atomic<bool>: it took 0.334785 seconds. Result: 1
num_threads:3
229922451 iterations flag
229712046 iterations flag
233333335 iterations flag
atomic_flag : it took 21.5974 seconds. Result: 1
219564626 iterations bool
233333335 iterations bool
196877803 iterations bool
atomic<bool>: it took 0.200942 seconds. Result: 1
num_threads:4
151745683 iterations flag
150000001 iterations flag
148849108 iterations flag
148933269 iterations flag
atomic_flag : it took 18.6651 seconds. Result: 1
150000001 iterations bool
112825220 iterations bool
151838008 iterations bool
112857688 iterations bool
atomic<bool>: it took 0.167048 seconds. Result: 1

Benchmark code:

#include <thread>
#include <atomic>
#include <vector>
#include <iostream>
#include <algorithm>



template<typename Iterator>
static void any_of_flag(Iterator & begin, Iterator& end, std::atomic_flag & result)
{
    int counter = 0;
    for (auto it = begin; it != end; ++it)
    {
        counter++;
        if (!result.test_and_set() || (*it) != 0)
        {
            result.clear();
            std::cout << counter << " iterations flag\n";
            return;
        }
    }
}
template<typename Iterator>
static void any_of_atomic(Iterator & begin, Iterator& end, std::atomic<bool> & result)
{
    int counter = 0;
    for (auto it = begin; it != end; ++it)
    {
        counter++;
        if (result || (*it) != 0)
        {
            result = true;
            std::cout << counter << " iterations bool\n";
            return;
        }
    }
}

void test_atomic_flag(std::vector<int>& input, int num_threads)
{

    using namespace std::chrono;

    high_resolution_clock::time_point t1 = high_resolution_clock::now();


    size_t chunk_size = input.size() / num_threads;
    std::atomic_flag result = ATOMIC_FLAG_INIT;
    result.test_and_set();

    std::vector<std::thread> threads;
    for (size_t i = 0; i < num_threads; ++i)
    {
        auto & begin = input.begin() + i *chunk_size;
        auto & end = input.begin() + std::min((i + 1) * chunk_size, input.size());
        // had to use lambda in VS 2017
        threads.emplace_back([&begin, &end, &result] {any_of_flag(begin, end, result); });

    }

    for (auto & thread : threads)
        thread.join();

    bool hasNonZero = !result.test_and_set();


    high_resolution_clock::time_point t2 = high_resolution_clock::now();

    duration<double> time_span = duration_cast<duration<double>>(t2 - t1);

    std::cout << "atomic_flag : it took " << time_span.count() << " seconds. Result: " << hasNonZero << std::endl;
}



void test_atomic_bool(std::vector<int>& input, int num_threads)
{

    using namespace std::chrono;

    high_resolution_clock::time_point t1 = high_resolution_clock::now();


    size_t chunk_size = input.size() / num_threads;
    std::atomic<bool> result(false);

    std::vector<std::thread> threads;
    for (size_t i = 0; i < num_threads; ++i)
    {
        auto & begin = input.begin() + i *chunk_size;
        auto & end = input.begin() + std::min((i + 1) * chunk_size, input.size());
        // had to use lambda in VS 2017
        threads.emplace_back([&begin, &end, &result] {any_of_atomic(begin, end, result); });

    }

    for (auto & thread : threads)
        thread.join();

    bool hasNonZero = result;


    high_resolution_clock::time_point t2 = high_resolution_clock::now();

    duration<double> time_span = duration_cast<duration<double>>(t2 - t1);

    std::cout << "atomic<bool>: it took " << time_span.count() << " seconds. Result: " << hasNonZero << std::endl;
}

int main()
{
    std::vector<int> input(1e9, 0);
    input[1e9 - 1e8] = 1;
    for (int num_threads : {2, 3, 4})
    {
        std::cout << "num_threads:" << num_threads << std::endl;
        test_atomic_flag(input, num_threads);
        test_atomic_bool(input, num_threads);
    }

    int q;
    std::cin >> q;
    return 0;
};

OLD POST: I had some problems with constness of iterators and emplacing thread, but core change, that is usage of atomic_flag seems to work. It will not stop instantly all threads, but in worst case only one per iteration (since only one thread per iteration will get to know that it should stop already due to clearing of flag).

#include <thread>
#include <atomic>
#include <vector>
#include <iostream>
#include <algorithm>

template<typename Iterator>
static void any_of(Iterator & begin, Iterator& end, std::atomic_flag & result)
{
    for (auto it = begin; it != end; ++it)
    {
        if (!result.test_and_set() || (*it) != 0)
        {
            result.clear();
            return;
        }
    }
}

int main()
{
    int num_threads = 3;
    std::vector<int> input = { 0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0, 1,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0};
    size_t chunk_size = input.size() / num_threads;
    std::atomic_flag result = ATOMIC_FLAG_INIT;
    result.test_and_set();

    std::vector<std::thread> threads;
    for (size_t i = 0; i < num_threads; ++i)
    {
        auto & begin = input.begin() + i *chunk_size;
        auto & end = input.begin() + std::min((i + 1) * chunk_size, input.size());
        // had to use lambda in VS 2017
        threads.emplace_back([&begin, &end, &result] {any_of(begin, end, result); });

    }

    for (auto & thread : threads)
        thread.join();

    bool hasNonZero = !result.test_and_set();
    return 0;
};

Upvotes: 3

Davide Spataro
Davide Spataro

Reputation: 7482

What about something like the following?

Each worker check weather the element in its range is non-zero or if an atomic flag is set (meaning some other threads have found it).

The following is the function executed by each thread (each of them with a differet range assigned)

 template<typename Iterator>
static void any_of(Iterator & begin, Iterator& end, std::atomic<bool> & result) 
    {
        for (const auto & it=begin; it!=end; ++it)
        {
            if (result || (*it)!=0)
            {
                result= true;
                return;
            }
       }

You can call it as follows

size_t chunk_size = input.size() / num_threads;
std::atomic<bool> result(false);
std::vector<std::thread> threads;
for (size_t i = 0; i < num_threads; ++i)
{
    const auto & begin = input.begin() + i *chunk_size;
    const auto & end = input.begin() + std::min((i+1) * chunk_size, input.size());
    threads.emplace_back(any_element_of,begin,end,result);
}

for (auto & thread : threads)
    thread.join();

After this point you can safely check return to retrieve your result.

Note that this approach is easily extendable by passing a unary predicate function to the worker to make it more general.

 template<typename Iterator, typename Predicate>
static void any_of(Iterator & begin, Iterator& end, Predicate pred, std::atomic<bool> & result) 
    {
        for (const auto & it=begin; it!=end; ++it)
        {
            if (result || pred(*it))
            {
                result= true;
                return;
            }
       }

Upvotes: 2

Related Questions