Reputation: 2371
I have an object based on array, which implements the following interface:
public interface PairSupplier<Q, E> {
public int size();
public Pair<Q, E> get(int index);
}
I would like to create a specific iterator over it:
public boolean hasNext(){
return true;
}
public Pair<Q, E> next(){
//some magic
}
In method next I would like to return some element from PairSupplier.
This element should be unique for thread, other threads should not have this element.
Since PairSupplier has a final size, this situation is not always possible, but I would like to approach it.
The order of elements doesn't matter, thread can take same element at a different time.
Example: 2 Threads
, 5 elements
- {1,2,3,4,5}
Thread 1 | Thread 2
1 2
3 4
5 1
3 2
4 5
My solution:
I create AtomicInteger index, which I increment on every next call.
PairSupplier pairs;
AtomicInteger index;
public boolean hasNext(){
return true;
}
public Pair<Q, E> next(){
int position = index.incrementAndGet() % pairs.size;
if (position < 0) {
position *= -1;
position = pairs.size - position;
}
return pairs.get(position);
}
pairs and index are shared among all threads.
I found this solution not scalable (because all threads go for increment), maybe someone have better ideas?
This iterator will be used by 50-1000 threads.
Upvotes: 12
Views: 1943
Reputation: 65793
Your question details are ambiguous - your example suggests that two threads can be handed the same Pair
but you say otherwise in the description.
As the more difficult to achieve, I will offer an Iterable<Pair<Q,E>>
that will deliver Pair
s one per thread until the supplier cycles - then it will repeat.
public interface Supplier<T> {
public int size();
public T get(int index);
}
public interface PairSupplier<Q, E> extends Supplier<Pair<Q, E>> {
}
public class IterableSupplier<T> implements Iterable<T> {
// The common supplier to use across all threads.
final Supplier<T> supplier;
// The atomic counter.
final AtomicInteger i = new AtomicInteger();
public IterableSupplier(Supplier<T> supplier) {
this.supplier = supplier;
}
@Override
public Iterator<T> iterator() {
/**
* You may create a NEW iterator for each thread while they all share supplier
* and Will therefore distribute each Pair between different threads.
*
* You may also share the same iterator across multiple threads.
*
* No two threads will get the same pair twice unless the sequence cycles.
*/
return new ThreadSafeIterator();
}
private class ThreadSafeIterator implements Iterator<T> {
@Override
public boolean hasNext() {
/**
* Always true.
*/
return true;
}
private int pickNext() {
// Just grab one atomically.
int pick = i.incrementAndGet();
// Reset to zero if it has exceeded - but no spin, let "just someone" manage it.
int actual = pick % supplier.size();
if (pick != actual) {
// So long as someone has a success before we overflow int we're good.
i.compareAndSet(pick, actual);
}
return actual;
}
@Override
public T next() {
return supplier.get(pickNext());
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported.");
}
}
}
NB: I have adjusted the code a little to accommodate both scenarios. You can take an Iterator
per thread or share a single Iterator
across threads.
Upvotes: 4
Reputation: 12257
I prefer a lock and release process.
If a thread is asking for a pair object, the Pair object is removed from the supplier. Before the thread is asking for a new pair, the 'old' pair is added the the suplier again.
You can push from front and put at the end.
Upvotes: 0
Reputation: 2143
This is standard java semaphore usage problem. The following javadoc gives almost similar example as your problem. http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html
If you need more help, let me know?
Upvotes: 0
Reputation: 97
I'm having some trouble understanding what the problem you are trying to solve is?
Does each thread process the whole collection?
Is the concern that no two threads can work on the same Pair at the same time? But each thread needs to process each Pair in the collection?
Or do you want the collection processed once by using all of the threads?
Upvotes: 1
Reputation: 1129
The most easy that o see, is create Hash set or Map, and give a unique hash for every thread. After that just do simple get by this hash code.
Upvotes: 0
Reputation: 1781
There is one key thing which is obscure in your example - what exactly is the meaning this?
The order of elements doesn't matter, thread can take same element at a different time.
"different time" means what? Within N milliseconds of each other? Does it mean that absolutely two threads will never be touching the same Pair at the same time? I will assume that.
If you want to decrease the probability that threads will block on each other contending for the same Pair, and there is a backing array of Pairs, try this:
numPairs / threadCount
sub-arrays (you don't have to actually create sub-arrays, just start at different offsets - but it's easier to think about as sub-array)synchronized(pair)
(synchronize on the instance, not the type!) - there may occasionally be blocking, but you're never blocking all threads on a single thing, as with the AtomicInteger
- threads can only block each other because they are really trying to touch the same objectNote this is not guaranteed never to block - for that, all threads would have to run at exactly the same speed, and processing every Pair object would have to take exactly the same amount of time, and the OS's thread scheduler would have to never steal time from one thread but not another. You cannot assume any of those things. What this gives you is a higher probability that you will get better concurrency, by dividing the areas to work in and making the smallest unit of state that is shared be the lock.
But this is the usual pattern for getting more concurrency on a data structure - partition the data between threads so that they rarely are touching the same lock at the same time.
Upvotes: 0
Reputation: 24473
You have a piece of information ("has anyone taken this Pair
already?") that must be shared between all threads. So for the general case, you're stuck. However, if you have an idea about this size of your array and the number of threads, you could use buckets to make it less painful.
Let's suppose we know that there will be 1,000,000 array elements and 1,000 threads. Assign each thread a range (thread #1 gets elements 0-999, etc). Now instead of 1,000 threads contending for one AtomicInteger, you can have no contention at all!
That works if you can be sure that all your threads will run at about the same pace. If you need to handle the case where sometimes thread #1 is busy doing other things while thread #2 is idle, you can modify your bucket pattern slightly: each bucket has an AtomicInteger. Now threads will generally only contend with themselves, but if their bucket is empty, they can move on to the next bucket.
Upvotes: 4