vibhas
vibhas

Reputation: 1509

Multiple ExecutorService finish after main

I need some input from you regarding a scenario for which I am doing some POC(Proof of Concept). I am novice to multithreading in java and trying to some test. My requirement is I want to load millions of records using simple java and then after doing some conversion with the data will insert in a database table. For that I want to perform a simple test related to finishing of all task.

Currently I want to try that my main method finishes only after my executor services finishes. Below is the code which I have tried. Can anyone please help me to know if thats the correct way to finish the main method after finishing the executor threads.

Your suggestion will be highly appreciated.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimpleThreadPool {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            executor = Executors.newFixedThreadPool(5);
           // Runnable worker = new WorkerThread("Thread executor :" + i);
            executor.execute(new WorkerThread("Thread executor :" + i));
          }
        executor.shutdown();
        while (!executor.isTerminated()) {
            //System.out.println("Waiting");
        }

        System.out.println("Will start for Executor 1");

        ExecutorService executor1 = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
           // Runnable worker = new WorkerThread("Thread executor1 :" + i);
            executor1.execute(new WorkerThread("Thread executor1 :" + i));
          }
        executor1.shutdown();
        while (!executor1.isTerminated()) {
            //System.out.println("Waiting");
        }

        System.out.println("Finished all threads");
        //
        String s=null;
        s.toUpperCase();
    }
}

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class WorkerThread implements Runnable {

    private String command;

    public WorkerThread(String s){
        this.command=s;
    }


    public void run() {
        ExecutorService executor2 = Executors.newFixedThreadPool(5);

        Future loadData=executor2.submit(new LoadData());

        System.out.println(" Start. Command = "+command);

        try {
            List listOfData=(List) loadData.get();

            for(int i=0;i<listOfData.size();i++){
                //Thread.sleep(500);
                //System.out.println("Printing the value from list:"+listOfData.get(i));

                ConversionLogic conversion= new ConversionLogic();
                conversion.doConversion(command);
            }



        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println(" End."+command);

    }



    public String toString(){
        return this.command;
    }
}

class LoadData implements Callable{

    public List call() throws Exception {

        List<String> dataList= new ArrayList<String>();

        for(int i=0;i<100;i++){
            String data="Data_"+i;
            //System.out.println("Data Added in List:"+data);
            dataList.add(data);
        }
        Thread.sleep(10000);

        return dataList;
    }

}



public class ConversionLogic {

    public void doConversion(String threadName){

        try {

            System.out.println("Converting Data for Thread starts:"+threadName);
            Thread.sleep(5000);
            System.out.println("Converting Data for Thread ends:"+threadName);


        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

SO this is what I have understood from the answers provided below:

package stackoverflow.test;

import java.util.List;
import java.util.concurrent.*;

class SimpleThreadPool {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executor = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor = new ExecutorCompletionService<List>(executor);

        ExecutorService executor2 = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor2 = new ExecutorCompletionService<List>(executor2);


        //start loading data
        int procCount = 0;
        for (int i = 0; i < 10; i++) {
            Future loadData = processor.submit(new LoadData("Org"));
            procCount++;
        }

        //now all loading tasks have been submitted and are being executed
        System.out.println("All LOADING tasks have been submitted and are being executed");


        //new work queue using the same executor (or another one if you want more parallelism)
        ExecutorCompletionService<Void> converter = new ExecutorCompletionService<Void>(executor);

        while (procCount-- > 0) {
            Future<List> listOfDataFuture = processor.take(); //blocks until a 'LoadData' finishes
            System.out.println("A loading task just completed");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic(procCount + "_" + i);
                converter.submit(conversion);
            }
        }
        System.out.println("All LOADING tasks have been COMPLETED for Org");

        //now all conversion tasks have been submitted and are being executed
        System.out.println("All CONVERSION task have been submitted and are being executed for Org:");

        /* You don't need to wait for conversion tasks to complete:
          * they will be completed nonetheless. Wait for them (with take())
          * if you need the results.
         * */    
        executor.shutdown();
        try {
            System.out.println("Waiting for finish");
            executor.awaitTermination(1000, TimeUnit.SECONDS);
            System.out.println("Stopped nicely");
        } catch (InterruptedException e) {
            System.out.println("Could not stop in alloted time");
        }

        System.out.println("Fund Data Loading Starts:");
        //___________________________________________________________________//

        // Some argument to get Fund Data
        int procCount1 = 0;
        for (int i = 0; i < 5; i++) {
            Future loadData = processor2.submit(new LoadData("Fund"));
            procCount1++;
        }

        //now all loading tasks have been submitted and are being executed
        System.out.println("All LOADING tasks have been submitted and are being executed for Fund:");


        //new work queue using the same executor (or another one if you want more parallelism)
        ExecutorCompletionService<Void> converter1 = new ExecutorCompletionService<Void>(executor2);

        while (procCount1-- > 0) {
            Future<List> listOfDataFuture = processor2.take(); //blocks until a 'LoadData' finishes
            System.out.println("A loading task just completed for Fund:");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic(procCount + "_" + i);
                converter1.submit(conversion);
            }
        }
        System.out.println("All LOADING tasks have been COMPLETED for Org");

        //now all conversion tasks have been submitted and are being executed
        System.out.println("All CONVERSION task have been submitted and are being executed for Org:");

        /* You don't need to wait for conversion tasks to complete:
          * they will be completed nonetheless. Wait for them (with take())
          * if you need the results.
         * */    
        executor2.shutdown();
        try {
            System.out.println("Waiting for finish");
            executor.awaitTermination(1000, TimeUnit.SECONDS);
            System.out.println("Stopped nicely");
        } catch (InterruptedException e) {
            System.out.println("Could not stop in alloted time");
        }


              System.out.println("<<<<<<<<<< End>>>>>>>>");
              System.exit(0);



    }
}


package stackoverflow.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

class LoadData implements Callable {
    String dataType;

    public List call() throws Exception {

        List<String> dataList = new ArrayList<String>();

        for (int i = 0; i < 20; i++) {
            String data = "Data_" + i;
           System.out.println("Processing Data of Type :" + dataType + "Data is:"+data);
            dataList.add(data);
        }
        Thread.sleep(2000);

        return dataList;
    }

    LoadData(String type){
        this.dataType=type;
    }

}

package stackoverflow.test;

import java.util.concurrent.Callable;

class ConversionLogic implements Callable {

    private String threadName;

    public ConversionLogic(String threadName) {

        this.threadName = threadName;
    }

    public Void call() throws Exception {
        try {

            System.out.println("Converting Data for Thread starts:" + threadName);
            Thread.sleep(1000);
            System.out.println("Converting Data for Thread ends:" + threadName);


        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return null;
    }
}

Updating the code for one set of requirement.Any suggestion to improve the performance is welcome.


 package stackoverflow.tesst;

import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.*;

import connection.java.JdbcConnection;

class SimpleThreadPool {

    public static void main(String[] args) throws InterruptedException,
            ExecutionException {

        ExecutorService executor = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor = new ExecutorCompletionService<List>(
                executor);

        ExecutorService executor2 = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor2 = new ExecutorCompletionService<List>(
                executor2);

        System.out.println("Connecting to DB...");
        try {
            System.out.println("Connection is :"
                    + JdbcConnection.getConnection());
        } catch (ClassNotFoundException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } catch (SQLException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        // start loading data

        int priceRange1 = 200;
        int priceRange2 = priceRange1 + 200;

        int procCount = 0;
        for (int i = 0; i < 10; i++) {

            String query = "select code,name,price from Product where price ";

            if (i == 0) {
                String finalQuery = query + " <= " + priceRange1;
                Future loadData = processor.submit(new LoadData("Org",
                        finalQuery));
            } else {
                String finalQuery = query + " <= " + priceRange2
                        + " and price > " + priceRange1;
                Future loadData = processor.submit(new LoadData("Org",
                        finalQuery));
            }
            priceRange1 = priceRange2;
            priceRange2 = priceRange2 + 200;

            procCount++;
        }

        System.out.println("All LOADING tasks have been COMPLETED for Org");

        ExecutorCompletionService<Void> converter = new ExecutorCompletionService<Void>(
                executor);

        while (procCount-- > 0) {
            Future<List> listOfDataFuture = processor.take();
            System.out.println("A loading task just completed");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic(procCount
                        + "_" + i, listOfData);
                converter.submit(conversion);
            }
        }

        System.out
                .println("All CONVERSION task have been submitted and are being executed for Org:");

        executor.shutdown();
        try {
            System.out.println("Waiting for finish");
            executor.awaitTermination(1000, TimeUnit.SECONDS);
            System.out.println("Stopped nicely for Org");
        } catch (InterruptedException e) {
            System.out.println("Could not stop in alloted time");
        }

        System.out.println("<<<<<<<<<< End>>>>>>>>");
        System.exit(0);

    }
}

package stackoverflow.tesst;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

----------------------------------------------
import connection.java.JdbcConnection;

class LoadData implements Callable {
    String dataType;
    Connection conn;
    String query;

    public List call() throws Exception {
        List<Product> dataList = new ArrayList<Product>();
        try {
             conn=JdbcConnection.getConnection();
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery(this.query);
            while(rs.next()){
                System.out.println(rs.getString("code"));
                System.out.println(rs.getString("name"));
                System.out.println(rs.getInt("price"));
                Product p= new Product(rs.getString("code"),rs.getString("name"),rs.getInt("price"));
                dataList.add(p);
            }
            rs.close();//conn.close();
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

      Thread.sleep(2000);

        return dataList;
    }

    LoadData(String type,String query){
        this.dataType=type;
        this.query=query;
    }

}


    }
}

---------------------------
package stackoverflow.tesst;

import java.util.List;
import java.util.concurrent.Callable;

class ConversionLogic implements Callable {

    private String threadName;
    List<Product> productList;

    public ConversionLogic(String threadName,List<Product> productList) {

        this.threadName = threadName;
        this.productList=productList;
    }

    public Void call() throws Exception {
        try {

            System.out.println("Converting Data for Thread starts:" + threadName);
            Thread.sleep(1000);
            int listSize=productList.size();
            for(int i=0;i<listSize;i++){
                //Do conversion for product let say
                System.out.println("Print product in Conversion:"+productList.get(i).getPrice());
            }
            System.out.println("Converting Data for Thread ends:" + threadName);


        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return null;
    }
}
------------------------------------
package connection.java;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class JdbcConnection {

    static Connection conn;
    static String user;
    static String pass;
    static String dbURL;

    public static Connection getConnection() throws ClassNotFoundException,
            SQLException {
        Class.forName("org.postgresql.Driver");

        dbURL = "jdbc:postgresql://localhost:5433:postgres";
        user = "postgres";
        pass = "root";
        Connection conn = DriverManager.getConnection(dbURL, user, pass);
        Statement stmt = conn.createStatement();
        System.out.println("Created DB Connection....");
        return conn;

    }

}

package stackoverflow.tesst;

import java.io.Serializable;
import java.math.BigDecimal;



public class Product implements Serializable {

    /**
     * Product Class using Annotation
     */
    private static final long serialVersionUID = 1L;
    private Integer id;
    private String code;
    private String name;
    private int price;

    Product(String code,String name,int price){
        this.code=code;
        this.name=name;
        this.price=price;
    }

   public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }


    public String getCode() {
        return code;
    }
    public void setCode(String code) {
        this.code = code;
    }


    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }


    public int getPrice() {
        return price;
    }
    public void setPrice(int price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "Product [id=" + id + ", code=" + code + ", name="
                + name + ", price=" + price + "]";
    } 

}

Here is the final version of my POC, which I am actually looking for. Only suggestion required is do I suppose to synchronize the insert query part?

package stackoverflow.tesst;

import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.*;

import connection.java.JdbcConnection;

class SimpleThreadPool {

    public static void main(String[] args) throws InterruptedException,
            ExecutionException {

        ExecutorService executor = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor = new ExecutorCompletionService<List>(
                executor);

        ExecutorService executor2 = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor2 = new ExecutorCompletionService<List>(
                executor2);

        /*System.out.println("Connecting to DB...");
        try {
            System.out.println("Connection is :"
                    + JdbcConnection.getConnection());
        } catch (ClassNotFoundException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } catch (SQLException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }*/

        // start loading data

        int priceRange1 = 200;
        int priceRange2 = priceRange1 + 200;

        int procCount = 0;
        for (int i = 0; i < 10; i++) {

            String query = "select code,name,price from Product where price ";

            if (i == 0) {
                String finalQuery = query + " <= " + priceRange1 + " order by price";
                Future loadData = processor.submit(new LoadData("Org",
                        finalQuery));
            } else {
                String finalQuery = query + " <= " + priceRange2
                        + " and price > " + priceRange1 + " order by price";
                Future loadData = processor.submit(new LoadData("Org",
                        finalQuery));
            }
            priceRange1 = priceRange2;
            priceRange2 = priceRange2 + 200;

            procCount++;
        }

        System.out.println("All LOADING tasks have been COMPLETED for Org");

        ExecutorCompletionService<Void> converter = new ExecutorCompletionService<Void>(
                executor);

        while (procCount-- > 0) {
            Future<List> listOfDataFuture = processor.take();
            System.out.println("A loading task just completed");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic("<<Org>>"
                        + "_" + i, listOfData);
                converter.submit(conversion);
            }
        }

        System.out
                .println("All CONVERSION task have been submitted and are being executed for Org:");

        executor.shutdown();
        try {
            System.out.println("Waiting for finish");
            executor.awaitTermination(1000, TimeUnit.SECONDS);
            System.out.println("Stopped nicely for Org");
        } catch (InterruptedException e) {
            System.out.println("Could not stop in alloted time");
        }

        System.out.println("Fund Data Loading Starts:");
        // ___________________________________________________________________//


        int fundRange1 = 200;
        int fundRange2 = fundRange1 + 200;


        int procCount1 = 0;
        for (int i = 0; i < 10; i++) {

            String query = "select code,name,price from Product where price ";

            if (i == 0) {
                String finalQuery = query + " <= " + fundRange1;
                Future loadData = processor2.submit(new LoadData("Fund",
                        finalQuery));
            } else {
                String finalQuery = query + " <= " + fundRange2
                        + " and price > " + fundRange1;
                Future loadData = processor2.submit(new LoadData("Fund",
                        finalQuery));
            }
            fundRange1 = fundRange2;
            fundRange2 = fundRange2 + 200;

            procCount1++;
        }

        System.out.println("All LOADING tasks have been COMPLETED for Fund");

        ExecutorCompletionService<Void> converter1 = new ExecutorCompletionService<Void>(
                executor2);

        while (procCount1-- > 0) {
            Future<List> listOfDataFuture = processor2.take();
            System.out.println("A loading task just completed");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic("<<Fund>>"
                        + "_" + i, listOfData);
                converter1.submit(conversion);
            }
        }

        System.out
                .println("All CONVERSION task have been submitted and are being executed for Fund:");

        executor2.shutdown();
        try {
            System.out.println("Waiting for finish");
            executor.awaitTermination(1000, TimeUnit.SECONDS);
            System.out.println("Stopped nicely for Fund");
        } catch (InterruptedException e) {
            System.out.println("Could not stop in alloted time");
        }



        System.out.println("<<<<<<<<<< End>>>>>>>>");
        System.exit(0);

    }
}


package stackoverflow.tesst;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import connection.java.JdbcConnection;

class LoadData implements Callable {
    String dataType;
    Connection conn;
    String query;

    public List call() throws Exception {
        List<Product> dataList = new ArrayList<Product>();
        try {
            System.out.println("Connection establishing for Loading Org Data:");
            conn = JdbcConnection.getConnection();
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery(this.query);
            while (rs.next()) {
                System.out.println(rs.getString("code"));
                System.out.println(rs.getString("name"));
                System.out.println(rs.getInt("price"));
                Product p = new Product(rs.getString("code"),
                        rs.getString("name"), rs.getInt("price"));
                dataList.add(p);
            }
            rs.close();
            conn.close();
            System.out.println("Connection Closed While loading for :"+dataType);
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        Thread.sleep(2000);

        return dataList;
    }

    LoadData(String type, String query) {
        this.dataType = type;
        this.query = query;
    }

}


package stackoverflow.tesst;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.Callable;

import connection.java.JdbcConnection;

class ConversionLogic implements Callable {

    private String threadName;
    List<Product> productList;
    Connection conn;

    public ConversionLogic(String threadName, List<Product> productList) {

        this.threadName = threadName;
        this.productList = productList;
    }

    public Void call() throws Exception {
        int listSize = productList.size();
        try {

            conn = JdbcConnection.getConnection();
            System.out.println("Connection establishing for Converting Org Data:");
            String insertTableSQL = "INSERT INTO item"
                    + "(code, name, price) VALUES" + "(?,?,?)";

            PreparedStatement preparedStatement = conn
                    .prepareStatement(insertTableSQL);


            for (int i = 0; i < listSize; i++) {

                preparedStatement.setString(1, productList.get(i)
                        .getCode());
                preparedStatement.setString(2, productList.get(i)
                        .getName());
                preparedStatement.setInt(3, productList.get(i)
                        .getPrice());
                //Guess we suppose to synchronize the insert part in case 
                // In case mutiple threads trying to insert some records in a table and we might end up loosing data
                //
                preparedStatement.executeUpdate();


            }


            preparedStatement.close();
            conn.close();
            System.out.println("Connection Closed While Converting for Org :");
        } catch (ClassNotFoundException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } catch (SQLException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        return null;
    }
}

Upvotes: 3

Views: 2322

Answers (3)

skY
skY

Reputation: 111

executor.awaitTermination() method sounds convenient, but this is not an efficient one because what if all your executor services finishes before/after 1000 seconds (or any time you mention as an argument in awaitTermination() method).Either you are wasting the time or not letting your services finish their task. Instead i'd suggest you use CountDownLatch as explained in below example.

public class app {

public static void main(String[] args) throws InterruptedException {

    CountDownLatch latch = new CountDownLatch(3); // here assuming you know number of threads
    ExecutorService executor = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 3; i++) {
        executor.submit(new processor(latch));
    }
    latch.await();
    System.out.println("main resume");

}

}


class processor extends Thread{
    private CountDownLatch latch;

    processor(CountDownLatch latch){
        this.latch=latch;
    }

    public void run(){
        System.out.println("in thread");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        latch.countDown();
    }
}

Thus, you don't waste single ms of time.

Upvotes: 0

drakyoko
drakyoko

Reputation: 505

There are way to many thread pools in your code and, in general, spawning new threads inside a spawned thread is not a good idea: it can easily get out of control. In your case, you don't need the WorkerThread: you already have the thread pool provided by the Java framework (ExecutorService).

Since you need the result from a thread (LoadData) to be processed (ConversionLogic) I would use also an ExecutorCompletionService to help with gathering results from LoadData.

Following the refactored code. I've ditched the WorkerThread and used only a threadpool (altough you can use two if you want more parallelism), also the ConversionLogic now implements Callable so that it can be easily processed by the thread pool.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

class SimpleThreadPool {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor = new ExecutorCompletionService<List>(executor);


        //start loading data
        int procCount = 0;
        for (int i = 0; i < 10; i++) {
            Future loadData = processor.submit(new LoadData());
            procCount++;
        }
        //now all loading tasks have been submitted and are being executed
        System.out.println("All LOADING tasks have been submitted and are being executed");


        //new work queue using the same executor (or another one if you want more parallelism)
        ExecutorCompletionService<Void> converter = new ExecutorCompletionService<Void>(executor);

        while (procCount-- > 0) {
            Future<List> listOfDataFuture = processor.take(); //blocks until a 'LoadData' finishes
            System.out.println("A loading task just completed");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic(procCount + "_" + i);
                converter.submit(conversion);
            }
        }
        System.out.println("All LOADING tasks have been COMPLETED");

        //now all conversion tasks have been submitted and are being executed
        System.out.println("All CONVERSION task have been submitted and are being executed");

        /* You don't need to wait for conversion tasks to complete:
          * they will be completed nonetheless. Wait for them (with take())
          * if you need the results.
         * */    
        executor.shutdown();
        System.out.println(" End.");


    }
}

class LoadData implements Callable {

    public List call() throws Exception {

        List<String> dataList = new ArrayList<String>();

        for (int i = 0; i < 20; i++) {
            String data = "Data_" + i;
            System.out.println("Data Added in List:" + data);
            dataList.add(data);
        }
        Thread.sleep(2000);

        return dataList;
    }

}


class ConversionLogic implements Callable {

    private String threadName;

    public ConversionLogic(String threadName) {

        this.threadName = threadName;
    }

    @Override
    public Void call() throws Exception {
        try {

            System.out.println("Converting Data for Thread starts:" + threadName);
            Thread.sleep(1000);
            System.out.println("Converting Data for Thread ends:" + threadName);


        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return null;
    }
}

Upvotes: 0

Gergely Bacso
Gergely Bacso

Reputation: 14661

Instead of that while loop I would advise you to use the built-in await function like this:

    executor.shutdown();
    try {
        System.out.println("Waiting for finish");
        executor.awaitTermination(1000, TimeUnit.SECONDS);
        System.out.println("Stopped nicely");
    } catch (InterruptedException e) {
        System.out.println("Could not stop in alloted time");
    }

    System.exit(0);

Place this code wherever you want to make sure that your executorService finished before you continue any further.

Upvotes: 1

Related Questions