Contango
Contango

Reputation: 80302

tbb:concurrent_unordered_map() - ID for each unique element in a vector?

I'm wondering if this code that I wrote is thread safe. I want to assign a unique ID to each element in a vector, and it would be nice if it works properly:

It uses Intel TBB (Threading Building Blocks) for the concurrent_unordered_map.

C/C++ code:

#include "iostream"
#include <vector>
// Intels Threading Building Blocks (TBB).
// Installing:
// - Intel compiler: tick the "TBB" box in project config.
//-  MSVC compiler: Install vcpkg, then use: vcpkg install tbb:x64-windows.
#include "tbb/concurrent_unordered_map.h"

#include <atomic>

using namespace std;

std::atomic<int> id { 1 };

inline int GetUniqueId()
{
    return id++;
}

int main()
{
    // Imagine this has 10 million elements.
    vector<string> names {"tom", "bob", "harry", "harry", "harry", "harry", "peter"}; 

    // We want each name to have a unique ID.
    tbb::concurrent_unordered_map<string,int> nameToId;

    #pragma omp parallel for
    for(int i=0;i<names.size();i++) {
        if (nameToId.count(names[i]) == 0) {
            nameToId[names[i]] = GetUniqueId();
        }
    }

    for(auto& name : names) {
        cout << name << ": ID=" << nameToId[name] << "\n";
    }
}

Output:

tom: ID=1
bob: ID=2
harry: ID=3
harry: ID=3
harry: ID=3
harry: ID=3
peter: ID=4

Upvotes: 3

Views: 2295

Answers (3)

Alexey Kukanov
Alexey Kukanov

Reputation: 12784

Insertion methods of tbb::concurrent_unordered_map check for existence of the given key and skip insertion if that's the case. So your parallel loop could be like this (C++11 assumed):

#pragma omp parallel for
for(int i=0;i<names.size();i++) {
    nameToId.emplace( names[i], i ); // loop index used as the unique ID
}

This code has no races. Still, there is no guarantee of IDs being contiguous: even if the atomic counter is used to get IDs, conflicts between threads inserting the same key may result in some IDs being thrown away. Also without the check the counter would increment on each iteration, essentially being a costly equivalent of the loop index.

Actually, the operator[] also checks for existence of the given key; but it does not prevent a race between assignments if two threads work with the same key, while insertion methods synchronize internally. The synchronization is lock-free, but comes with the expense of constructing the item to insert upfront and throwing it away if insertion fails. Be aware of that, especially if move semantics is used for insertions.

Also, the return value of most insertion methods allows to distinguish if the insertion actually happened or not:

auto res = nameToId.emplace( names[i], i ); // std::pair of map iterator and bool
if( res.second==true ) {
    // insertion successful
} else {
    // the key already exists
}

So if contiguous IDs are not required, use insertion methods; if it is required, follow the recommendations of @Anton and @Peter Cordes.

Upvotes: 3

Peter Cordes
Peter Cordes

Reputation: 364811

Since you don't mind holes in your unique ID sequence, you can just use the vector index as the unique ID. nameToId[names[i]] = i; instead of bottlenecking all your threads on an atomic increment of a shared counter.

No two strings share the same vector position. Or if it has to be unique wrt. something else, too, get a whole range of contiguous unique IDs and parcel them out to worker threads to increment starting from the base of their range. (e.g. nameToId[names[i]] = start_id + i; then when you're done, update start_id += names.size() or find the highest actually-used ID.)

Unless your strings are mostly duplicates, your current GetUniqueId() will perform very badly compared to a private counter. You definitely want to avoid an atomic RMW operation inside the loop: necessarily a full barrier on x86 for a lock xadd; C++ mo_relaxed can't make it cheaper in asm. Especially of a location that multiple threads are all contending to modify, but it's fairly slow even without contention, and forces the CPU to wait for other stores to drain from the store buffer first before it can even look at the next string and map entry. That destroys memory parallelism.


As @Anton points out, there is a race between check and set but that's ok because you don't mind holes in the sequence. When you're done, the nameToId map entry for a given string will hold an integer value of one of the vector indices that held that string. Often the first one any thread looked at, but sometimes a later one. It's definitely not the same index as any other distinct map key.


It's also worth trying removing the check so access to the hash table is write-only, especially if duplicates are not very common. (Except internally it might still have to read to check for hash collisions.) Then the unique ID will be the last appearance of the string that any thread got to.

Removing contention from the unique-ID allocation by skipping it for duplicates is not a factor; we can already make that free since we don't need a contiguous range.


If you did need a contiguous range of unique IDs

You might consider running with non-contiguous first (as above) to deduplicate, then looping over table entries and replacing them with ascending counters. There is one hash map entry per unique string so looping over values and replacing them will result in contiguous IDs.

You could even parallelize this work, having one thread per range of the table. Maybe count how many used entries are in each range, then communicate between threads so each thread knows its start value to loop over the range again.

Hopefully those hash buckets will still be hot in per-core private cache so this goes fast, benefiting from large aggregate bandwidth of private L2 caches, or at least not having all cores hitting DRAM. If your hash table is too big, do the read / communicate / assign private i++ incrementally in chunks small enough that each worker touches somewhat less than half L2 cache size (256k, or 1M on SKX) of table entries. (Assuming 1 thread per logical core and hyperthreading so two threads compete for the L2 cache of a physical core. Leave some slack because other things consume L2 space. Getting more L2 hits helps a lot when you're touching each cache line twice.) So for example, if you have 10 physical SKX cores, assign contiguous unique IDs to the first 8MiB of hash table using 20 worker threads, with each worker having a working-set of 400kiB of hash table entries.

That might depend on more low-level access to the buckets than tbb::concurrent_unordered_map provides, IDK. Especially for this 2nd step where you're manually ensuring that only 1 thread is looking at any given entry so you don't need atomicity or anything.

(It's a similar parallelization strategy to prefix-sum where you do local prefix sum on sub-ranges, then have each thread go back and add the offset for that start point. Except here the first pass where each thread counts used table entries in a range of buckets is read-only.)

Upvotes: 2

Anton
Anton

Reputation: 6557

There is a data race between the check and the assignment. If there is barrier or another synchronization between assignment of the IDs and their first usage (like in the OP's example), the race can be ignored assuming holes are allowed in the ID continuum.

In the case you want to use "else" branch in the same parallel section (no barrier between IDs assignment and usage) or the holes are not allowed, there is a need in the lock anyway. Because even if it is possible to implement the assignment protocol using atomic CAS and atomic increment, it will have a busy loop anyway because we need a transaction with two atomic operations, which one or other way is equivalent to a spin lock (except when using TSX though). Thus, please don't hesitate to use concurrent_hash_map with its locking mechanism. One trick to escape contention on the lock is to use double-check pattern along with a hidden (but used) internal_fast_find function:

// Can really use tbb::concurrent_hash_map directly if it is not a data analytics app with this function on the hot path
template<typename K>
struct fast_map : public tbb::concurrent_hash_map<K, int> {
  using base_t=tbb::concurrent_hash_map<K, int>;
  using base_t::concurrent_hash_map;

#if !WORKAROUND_BUG // TBB_INTERFACE_VERSION < 11007 && TBB_INTERFACE_VERSION > ???
  typename base_t::const_pointer fast_find(const typename base_t::key_type& k) {
    return this->internal_fast_find(k);
  }
#else
  // See https://github.com/anton-malakhov/nyctaxi/blob/master/group_by.h#L78-L97
#endif
};

template<typename K>
int allocateId(const K &k, fast_map<K> &m) {
    auto *x = m.fast_find(k);
    if(x && x->second >= 0)
        return x->second;
    else {
        typename fast_map<K>::accessor a;
        bool uniq = m.insert(a, make_pair((K)k, int(-1)));
        if (!uniq) {
            return a->second;
        } else {
            return a->second = GetUniqueId();
        }
    }
}

(Full code & run here: http://coliru.stacked-crooked.com/a/4c29a2c95883c945)

Upvotes: 3

Related Questions