Reputation: 22474
My goal is to limit memory usage when processing a big file. To do that I'm using a thread pool implementation which is supposed to make it so that it will be impossible to load more data from the file then it is processed at a given time.
try (CSVParser parser = new CSVParser(new File("...."))) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 5, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
}
});
for (Item item; (item = parser.nextItem()) != null;) {
executor.submit(new ItemsProcessor(item));
}
executor.shutdown();
executor.awaitTermination(12, TimeUnit.HOURS);
} catch (Exception e) {
e.printStackTrace();
}
My understanding is that RejectedExecutionHandler
's rejectedExecution
method will run on the main thread, the thread on which the ThreadPoolExecutor
was created. Is that so ?
Are the rejected tasks run on the same thread that created the thread pool ?
As I understand it, this approach should only load a maximum of 12 items in memory. 10 that are being processed by the thread pool, one in the thread pool's queue and one that has been rejected and it is run on the same thread as the loop (pausing the loop).
Upvotes: 1
Views: 2169
Reputation: 480
You are right, RejectedExecutionHander is run in main thread.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestRejectedExecution
{
public static void main( String[] args )
{
Runnable r = () -> {
Thread cur = Thread.currentThread();
System.out.println( String.format( "in runnable, thread id: %s, name: %s, group name %s",
cur.getId(), cur.getName(), cur.getThreadGroup().getName() ) );
try
{
Thread.sleep( 5000 );
}
catch ( InterruptedException e )
{
e.printStackTrace();
}
};
Thread cur = Thread.currentThread();
System.out.println( String.format( "in main, thread id: %s, name: %s, group name %s",
cur.getId(), cur.getName(), cur.getThreadGroup().getName() ) );
try {
ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MINUTES, new ArrayBlockingQueue<>( 2),
( r1, executor1 ) -> {
Thread cur1 = Thread.currentThread();
System.out.println( String.format( "in REH, thread id: %s, name: %s, group name %s",
cur1.getId(), cur1
.getName(), cur1
.getThreadGroup().getName() ) );
} );
for (int i=0; i<5; i++ ) {
executor.submit( r );
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
Here is the output:
in main, thread id: 1, name: main, group name main
in REH, thread id: 1, name: main, group name main
in REH, thread id: 1, name: main, group name main
in runnable, thread id: 11, name: pool-1-thread-1, group name main
in runnable, thread id: 11, name: pool-1-thread-1, group name main
in runnable, thread id: 11, name: pool-1-thread-1, group name main
Process finished with exit code 0
Upvotes: 1
Reputation: 517
RejectExecutionHandler is a facility provided in case of the first executor stopped working.
RejectExecutionHandler will be invoked if first executor reject thread from execution.
// Create Executor for Normal Execution
public static ThreadPoolExecutor executor=(ThreadPoolExecutor) Executors.newFixedThreadPool(10);
// Create Executor for Alternate Execution in case of first Executor shut down
public static ThreadPoolExecutor alternateExecutor=(ThreadPoolExecutor) Executors.newFixedThreadPool(10);
/ Create Rejected ExecutionHandler Class override rejectedExecution method
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable worker, ThreadPoolExecutor executor) {
// TODO Auto-generated method stub
System.out.println(worker.toString()+" is Rejected");
System.out.println("Retrying to Execute");
try{
//Re-executing with alternateExecutor
RejectedExecutionHandlerExample.alternateExecutor.execute(worker);
System.out.println(worker.toString()+" Execution Started");
}
catch(Exception e)
{
System.out.println("Failure to Re-exicute "+e.getMessage());
}
}
}
// Register RejectedExecutionHandler in Main Class
RejectedExecutionHandler handler=new MyRejectedExecutionHandler();
executor.setRejectedExecutionHandler(handler);
Upvotes: 0