Reputation: 25535
I have a task in Java that is currently single threaded and may or may not produce output. I need to run this task until I have 100 pieces of output from it. Here is the single threaded (greatly simplified) example version of this:
import java.security.SecureRandom;
public class Test {
private static SecureRandom rand = new SecureRandom();
public static String task() {
return rand.nextDouble() > 0.5 ? "output" : null;
}
public static void main(String[] args) {
int outputCount = 0;
while (outputCount < 100) {
String output = task();
if (output != null) {
outputCount++;
System.out.println(output);
}
}
}
}
I would like to run this task multi-threaded in four (4) threads. How do I implement it?
I've looked into using the Executor interfaces but they appear to be geared at running a task exactly n times, not until needed.
Some additional notes:
Upvotes: 1
Views: 1372
Reputation: 25535
The simplest approach:
invokeAll
a list of Callable and wait until they all finish.SecureRand rand
and String task()
as member elements of the Callablecall()
method, have each thread complete tasks until there are no more tasks to complete.Here is the implementation:
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Test {
private static final int TASKS = 100;
private static final int THREADS = 4;
public static void main(String[] args) throws InterruptedException {
final AtomicInteger outputCount = new AtomicInteger(0);
ExecutorService threadPool = Executors.newFixedThreadPool(THREADS);
Collection<Callable<Object>> tasks = new ArrayList<>(THREADS);
for (int i = 0; i < THREADS; i++) {
tasks.add(new Callable<Object>() {
private SecureRandom rand = new SecureRandom();
private String task() {
return rand.nextDouble() > 0.5 ? Thread.currentThread().getName() : null;
}
@Override public Object call() {
for (int i; (i = outputCount.get()) < TASKS;) {
String output = task();
if (output != null) {
if ((i = outputCount.incrementAndGet()) < TASKS) {
System.out.println(output + ": " + i);
}
}
} return null;
}
});
}
threadPool.invokeAll(tasks);
threadPool.shutdownNow();
System.out.println("done");
}
}
Upvotes: 1
Reputation: 45586
I think in your case you can use just an AtomicInteger that is shared between your tasks.
public class MyTask
implements Runnable
{
private AtomicInteger counter;
public MyTask ( AtomicInteger counter )
{
this.counter = counter;
}
public void run ()
{
while ( true )
{
String output = task();
if ( output != null )
{
int count = counter.getAndIncrement( );
System.out.println(output);
if ( count >= 100 )
{
break;
}
}
}
}
public static String task() {
return rand.nextDouble() > 0.5 ? "output" : null;
}
public static void main (
String[] args
) throws InterruptedException
{
AtomicInteger counter = new AtomicInteger( );
ExecutorService pool = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; ++i)
{
pool.execute( new MyTask( counter ) );
}
// Simplified shutdown, do not use this in production
pool.shutdown( );
pool.awaitTermination(1, TimeUnit.HOURS);
}
}
Upvotes: 2
Reputation: 650
For accessing the not thread safe operation you could make it thread safe by synchronizing on the expensive object which produces the output object
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CountDownTask implements Runnable{
private final CountDownLatch latch;
private final List<String> output;
private final SecureRandom random;
public CountDownTask( CountDownLatch latch, List<String> output, SecureRandom random ) {
super();
this.latch = latch;
this.output = output;
this.random = random;
}
public void run() {
while(latch.getCount() > 0){
if(getNextValueInAThreadSafeWay() > 0.5){
output.add("output");
latch.countDown();
}
}
}
private double getNextValueInAThreadSafeWay(){
synchronized(random){
return random.nextDouble();
}
}
public static void main(String[] args) throws InterruptedException{
CountDownLatch theLatch = new CountDownLatch( 100 );
List<String> output = Collections.synchronizedList( new ArrayList<String>() );
SecureRandom random = new SecureRandom();
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0;i<4;i++){
service.execute( new CountDownTask( theLatch, output, random ) );
}
theLatch.await( 1, TimeUnit.MINUTES );
service.shutdown();
System.out.println(output.size());
}
}
Upvotes: 1
Reputation: 47269
You could still use an Executor service but use some longer-living Runnables to run the execution and pull the reusable objects from some shared map of objects. Here is a quick example with comments that I think could work for you. EDIT: I updated my previous example to meet the requirement of at least 100 pieces of output, but possibly more:
import java.security.SecureRandom;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Example {
private static AtomicInteger outputCount = new AtomicInteger(0);
private static SecureRandom rand = new SecureRandom();
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Object> expensiveObjects = new LinkedBlockingQueue<>();
// Random objects put in
expensiveObjects.put(new Object());
expensiveObjects.put(new Object());
expensiveObjects.put(new Object());
expensiveObjects.put(new Object());
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0 ; i < 4; i++) {
executorService.execute(new MyRunnable(expensiveObjects));
}
// Arbitrary wait in this example.
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("Final: " + outputCount.get());
}
public static String task() {
return rand.nextDouble() > 0.5 ? "output" : null;
}
/**
* Runnable that keeps executing while outputCount is less than 100.
*/
private static class MyRunnable implements Runnable {
private final BlockingQueue<Object> expensiveObjects;
public MyRunnable(final BlockingQueue<Object> expensiveObjects) {
this.expensiveObjects = expensiveObjects;
}
@Override
public void run() {
while (true) {
try {
Object expensiveObject = expensiveObjects.take();
// Use the expensive object that you talked about and put it back for reuse when done. Not needed in this example but leaving it for now
String output = task();
expensiveObjects.put(expensiveObject);
if (output != null) {
int counter = outputCount.getAndIncrement();
System.out.println(counter);
if (counter >= 100) {
break;
}
}
} catch (InterruptedException e) {
System.out.println("Error!");
}
}
}
}
}
Upvotes: 1
Reputation: 9245
Well, here's my shot at it. Worked for me:
public class MakeItConcurrent {
private static final ExecutorService threadPool = Executors.newFixedThreadPool(4);
private static final AtomicInteger outputCount = new AtomicInteger();
private static final ThreadLocal<SecureRandom> threadToStringBuilder = new ThreadLocal<SecureRandom>();
public static String task() {
SecureRandom rand = threadToStringBuilder.get();
if (rand == null) {
threadToStringBuilder.set(new SecureRandom());
rand = threadToStringBuilder.get();
}
return rand.nextDouble() > 0.5 ? "output" : null;
}
public static void doManyTasks() {
int currOutputCount;
while ( ( currOutputCount = outputCount.get() ) < 100) {
String output = task();
if (output != null) {
// outputCount.compareAndSet(currOutputCount, currOutputCount + 1); use this if you want exactly 100 outputs
outputCount.set(currOutputCount + 1);
System.out.println(output);
}
}
threadPool.shutdownNow();
}
public static void main(String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override public void run() {
doManyTasks();
}
};
threadPool.submit(runnable);
threadPool.submit(runnable);
threadPool.submit(runnable);
threadPool.submit(runnable);
while ( ! threadPool.isShutdown() ) {
Thread.sleep(100);
}
System.out.println(outputCount);
}
}
Upvotes: -1