ylun
ylun

Reputation: 2534

Why is this program creating more threads than possible?

This is for a custom UDTF in a hive query, CreateLogTable is the UDTF class which I am using as a temp for testing. I am creating one thread per file to be downloaded from Amazon S3 and waiting until another thread becomes available before allocating another file to the thread.

Main Test logic:

CreateLogTable CLT = new CreateLogTable();

int numThreads = 2;
int index = 0;
DownloadFileThread[] dlThreads = new DownloadFileThread[numThreads];
for (S3ObjectSummary oSummary : bucketKeys.getObjectSummaries()) {
    while (dlThreads[index] != null && dlThreads[index].isAlive()) {
        index += 1;
        index = index % numThreads;
    }
    dlThreads[index] = new DownloadFileThread(CLT , getBucket(oSummary.getBucketName() + "/"
                    + oSummary.getKey()), getFile(oSummary.getKey()), index);
    dlThreads[index].start();
    index += 1;
    index = index % numThreads;
}

Thread class (run() method):

try {
    System.out.println("Creating thread " + this.threadnum);
    this.fileObj = this.S3CLIENT.getObject(new GetObjectRequest(this.filePath, this.fileName));
    this.fileIn = new Scanner(new GZIPInputStream(this.fileObj.getObjectContent()));
    while (this.fileIn.hasNext()) {         
        this.parent.forwardToTable(fileIn.nextLine());
    }
    System.out.println("Finished " + this.threadnum);
} catch (Throwable e) {
    System.out.println("Downloading of " + this.fileName + " failed.");
}

The while loop before the thread creation should be looping until it finds a null thread or a dead thread until it exits the loop, in which case a new thread will be created and started. Since I included logging to console, I am able to observe this process, but the output is unexpected:

Creating thread 0
Creating thread 1
Creating thread 0
Creating thread 1
Creating thread 0
Creating thread 1
Creating thread 0
...
Creating thread 1
Creating thread 0
Creating thread 1
Finished 0
Finished 1
Finished 1
Finished 0
Finished 1
Finished 1
...
Finished 0
Finished 1
Finished 0
Finished 1

The above is only the first few lines of output. The issue is that more than two threads are created before any threads complete their tasks.

Why is this happening and how can I fix this?

Upvotes: 2

Views: 144

Answers (3)

ylun
ylun

Reputation: 2534

The reason why the above code wasn't working was because of something wacky going on with the call to isAlive().

For some reason, no matter what state a thread is in, isAlive() will always return false for me, causing the creation of more and more threads, which replace the old ones in the array, dlThreads.

I solved the issue by creating a custom isWorking() method which simply returns a boolean of whether or not the thread's run() method has completed. Here is what the Thread class looks like now:

//this.isWorking initialized to true during instantiation

@Override
public void run() {
    try {
        System.out.println("Creating thread " + this.threadnum + " for " + filePath + "/" + fileName);
        this.fileObj = this.S3CLIENT.getObject(new GetObjectRequest(this.filePath, this.fileName));
        this.fileIn = new Scanner(new GZIPInputStream(this.fileObj.getObjectContent()));
        while (this.fileIn.hasNext()) {
            this.parent.forwardToTable(fileIn.nextLine());
        }
        System.out.println("Finished " + this.threadnum);
        this.isWorking = false;
    } catch (Throwable e) {
        System.out.println("Downloading of " + this.fileName + " failed.");
        e.printStackTrace();
        this.isWorking = false;
    }
}

public boolean isWorking(){
    return this.isWorking;
}   

However, after implementing this and being satisfied that my multithreaded script works, I switched over to using an Executor, as suggested by other users, which slightly improved performance and made the code much cleaner.

Upvotes: 1

Kenster
Kenster

Reputation: 25380

I reduced your code to this test case:

public class ThreadTest {
    private static class SleepThread extends Thread {
        private final int index;
        SleepThread(int ii) { index = ii; }

        @Override
        public void run() {
            System.out.println("Creating thread " + this.index);
            try {
                Thread.sleep(5_000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Finished " + this.index);
        }
    }

    public static void main(String[] args) {
        int numThreads = 2;
        int index = 0;
        SleepThread[] dlThreads = new SleepThread[numThreads];
        for (int ii = 0; ii < 10; ++ii) {
            while (dlThreads[index] != null && dlThreads[index].isAlive()) {
                index += 1;
                index = index % numThreads;
            }
            dlThreads[index] = new SleepThread(index);
            dlThreads[index].start();
            index += 1;
            index = index % numThreads;
        }
    }
}

Using Sun JDK 1.7.0_75, running this produces the result that you'd expect--two threads start, they exit after five seconds, two more threads start, and so on.

The next thing I'd suspect is that your JVM's implementation of Thread.isAlive() isn't returning true for threads immediately after they are started, although that seems contrary to the documentation for the Thread class.

Upvotes: 3

roeygol
roeygol

Reputation: 5028

Try to see this example:

public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            Runnable worker = new WorkerThread("" + i);
            executor.execute(worker);
          }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
}

It's a thread pool using Java 8. A very simple and esay way to make it using the Executors. Very staraight forward way to make it.

Upvotes: 2

Related Questions