Reputation: 2534
This is for a custom UDTF in a hive query, CreateLogTable
is the UDTF class which I am using as a temp for testing. I am creating one thread per file to be downloaded from Amazon S3
and waiting until another thread becomes available before allocating another file to the thread.
Main Test logic:
CreateLogTable CLT = new CreateLogTable();
int numThreads = 2;
int index = 0;
DownloadFileThread[] dlThreads = new DownloadFileThread[numThreads];
for (S3ObjectSummary oSummary : bucketKeys.getObjectSummaries()) {
while (dlThreads[index] != null && dlThreads[index].isAlive()) {
index += 1;
index = index % numThreads;
}
dlThreads[index] = new DownloadFileThread(CLT , getBucket(oSummary.getBucketName() + "/"
+ oSummary.getKey()), getFile(oSummary.getKey()), index);
dlThreads[index].start();
index += 1;
index = index % numThreads;
}
Thread class (run()
method):
try {
System.out.println("Creating thread " + this.threadnum);
this.fileObj = this.S3CLIENT.getObject(new GetObjectRequest(this.filePath, this.fileName));
this.fileIn = new Scanner(new GZIPInputStream(this.fileObj.getObjectContent()));
while (this.fileIn.hasNext()) {
this.parent.forwardToTable(fileIn.nextLine());
}
System.out.println("Finished " + this.threadnum);
} catch (Throwable e) {
System.out.println("Downloading of " + this.fileName + " failed.");
}
The while loop before the thread creation should be looping until it finds a null thread
or a dead thread
until it exits the loop, in which case a new thread
will be created and started. Since I included logging to console, I am able to observe this process, but the output is unexpected:
Creating thread 0
Creating thread 1
Creating thread 0
Creating thread 1
Creating thread 0
Creating thread 1
Creating thread 0
...
Creating thread 1
Creating thread 0
Creating thread 1
Finished 0
Finished 1
Finished 1
Finished 0
Finished 1
Finished 1
...
Finished 0
Finished 1
Finished 0
Finished 1
The above is only the first few lines of output. The issue is that more than two threads are created before any threads complete their tasks.
Why is this happening and how can I fix this?
Upvotes: 2
Views: 144
Reputation: 2534
The reason why the above code wasn't working was because of something wacky going on with the call to isAlive()
.
For some reason, no matter what state a thread is in, isAlive()
will always return false
for me, causing the creation of more and more threads, which replace the old ones in the array, dlThreads
.
I solved the issue by creating a custom isWorking()
method which simply returns a boolean of whether or not the thread's run()
method has completed. Here is what the Thread
class looks like now:
//this.isWorking initialized to true during instantiation
@Override
public void run() {
try {
System.out.println("Creating thread " + this.threadnum + " for " + filePath + "/" + fileName);
this.fileObj = this.S3CLIENT.getObject(new GetObjectRequest(this.filePath, this.fileName));
this.fileIn = new Scanner(new GZIPInputStream(this.fileObj.getObjectContent()));
while (this.fileIn.hasNext()) {
this.parent.forwardToTable(fileIn.nextLine());
}
System.out.println("Finished " + this.threadnum);
this.isWorking = false;
} catch (Throwable e) {
System.out.println("Downloading of " + this.fileName + " failed.");
e.printStackTrace();
this.isWorking = false;
}
}
public boolean isWorking(){
return this.isWorking;
}
However, after implementing this and being satisfied that my multithreaded script works, I switched over to using an Executor
, as suggested by other users, which slightly improved performance and made the code much cleaner.
Upvotes: 1
Reputation: 25380
I reduced your code to this test case:
public class ThreadTest {
private static class SleepThread extends Thread {
private final int index;
SleepThread(int ii) { index = ii; }
@Override
public void run() {
System.out.println("Creating thread " + this.index);
try {
Thread.sleep(5_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Finished " + this.index);
}
}
public static void main(String[] args) {
int numThreads = 2;
int index = 0;
SleepThread[] dlThreads = new SleepThread[numThreads];
for (int ii = 0; ii < 10; ++ii) {
while (dlThreads[index] != null && dlThreads[index].isAlive()) {
index += 1;
index = index % numThreads;
}
dlThreads[index] = new SleepThread(index);
dlThreads[index].start();
index += 1;
index = index % numThreads;
}
}
}
Using Sun JDK 1.7.0_75, running this produces the result that you'd expect--two threads start, they exit after five seconds, two more threads start, and so on.
The next thing I'd suspect is that your JVM's implementation of Thread.isAlive()
isn't returning true for threads immediately after they are started, although that seems contrary to the documentation for the Thread
class.
Upvotes: 3
Reputation: 5028
Try to see this example:
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
Runnable worker = new WorkerThread("" + i);
executor.execute(worker);
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
It's a thread pool using Java 8. A very simple and esay way to make it using the Executors. Very staraight forward way to make it.
Upvotes: 2