Reputation: 189626
I have an immutable Iterable<X>
with a large number of elements. (it happens to be a List<>
but never mind that.)
What I would like to do is start a few parallel / asynchronous tasks to iterate over the Iterable<>
with the same iterator, and I'm wondering what interface I should use.
Here's a sample implementation with the to-be-determined interface QuasiIteratorInterface
:
public void process(Iterable<X> iterable)
{
QuasiIteratorInterface<X> qit = ParallelIteratorWrapper.iterate(iterable);
for (int i = 0; i < MAX_PARALLEL_COUNT; ++i)
{
SomeWorkerClass worker = new SomeWorkerClass(qit);
worker.start();
}
}
class ParallelIteratorWrapper<T> implements QuasiIteratorInterface<T>
{
final private Iterator<T> iterator;
final private Object lock = new Object();
private ParallelIteratorWrapper(Iterator<T> iterator) {
this.iterator = iterator;
}
static public <T> ParallelIteratorWrapper<T> iterate(Iterable<T> iterable)
{
return new ParallelIteratorWrapper(iterable.iterator());
}
private T getNextItem()
{
synchronized(lock)
{
if (this.iterator.hasNext())
return this.iterator.next();
else
return null;
}
}
/* QuasiIteratorInterface methods here */
}
Here's my problem:
it doesn't make sense to use Iterator
directly, since hasNext() and next() have a synchronization problem, where hasNext() is useless if someone else calls next() before you do.
I'd love to use Queue
, but the only method I need is poll()
I'd love to use ConcurrentLinkedQueue to hold my large number of elements... except I may have to iterate through the elements more than once, so I can't use that.
Any suggestions?
Upvotes: 4
Views: 1578
Reputation: 4706
Create your own Producer
interface with the poll()
method or equivalent (Guava's Supplier
for instance). The implementation options are many but if you have an immutable random access list then you can simply maintain a thread-safe monotonic counter (AtomicInteger for instance) and call list.get(int) eg:
class ListSupplier<T> implements Supplier<T> {
private final AtomicInteger next = new AtomicInteger();
private final List<T> elements; // ctor injected
…
public <T> get() {
// real impl more complicated due to bounds checks
// and what to do when exhausted
return elements.get(next.getAndIncrement());
}
}
That is thread-safe, but you'd probably want to either return an Option style thing or null when exhausted.
Upvotes: 1
Reputation: 80330
Have one dispatcher thread that iterates over Iterable and dispatches elements to multiple worker threads that perform the work on the elements. You can use ThreadPoolExecutor
to automate this.
Upvotes: 0