Derek
Derek

Reputation: 11915

Thread-safe collection gather operation using Java futures

I am trying to use java futures for the first time. I have a class set up to decompress some files. Want to decompress recursively, because I have zip files that contain zip files.

I have a drive class that instantiations an Uncompressor class that implements callable. The Uncompressor starts unzipping, and if it comes across another .zip, it creates an instance of itself, adds it to the pool, and continues on.

Psuedo-code

From DriverClass:

.
.
.
ExecutorService pool = new Executors.newFixedThreadPool(4);
Uncompressor uc = new Uncompressor(pool, compressedFile);
Collection<File> files = uc.uncompress();
for(Future <Collection<File>> f : uc.futures)
    files.addAll(f.get());
// at the end of this loop, files doesnt seem to hold all of my files

And here is my uncompressor class

public class Uncompressor implements Callable<Collection<File>>
{
    public Set<Future<Collection<File>>> futures = new HashSet<Future<Collection<File>>>();
    File compressedFile;
public Uncompressor(ExecutorService pool, File compressedFile)
{
    this.pool = pool;
    this.compressedFile = compressedFile;
}

public Collection<File> call() throws Exception[
   return uncompress();
}

public Collection<File> uncompress()
{
List<File> uncompressedFiles = new ArrayList<File>();
.
.Loop
.//Try to uncompress the file. If the archive entry is a zip file, do the following:

    Callable<Collection<File>> callable = new Uncompressor(this.pool, archiveFileEntry);
    Future f = pool.submit(callable);
    futures.add(f);

 //else, add files to a collection here for returning
 uncompressedFiles.add(archiveFileEntry);

.EndLoop

return uncompressedFiles;
.
.
}

So the problem is in my DriverClass, my Collection of files that should hold all of the uncompressed files from the recursive dive here doesnt seem to be having all of the files in it. I think I am doing something wrong with getting the return values from the Future. Is it because of the way I have a class member variable futures defined?

Thank you

Upvotes: 2

Views: 618

Answers (1)

sharakan
sharakan

Reputation: 6901

Your code won't work if you've got a depth of nesting greater than 1, ie the top level zip contains a zip that contains zips. Your top level Uncompressor won't have a future for the bottom level zip file in that case.

It would be cool to use a ForkJoinPool for this, as that's better suited for recursive type task breakdown. Java 7 not being an option though, what I would do is change the Uncompressor so that the result is a tuple of futures and files, and then change the caller to keep track of all outstanding futures:

--Caller--

Collection<File> alluncompressedFiles = new HashSet<File>();
Collection<Future<UncompressorResult>> futures = new LinkedList<Future<UncompressorResult>>();
Future<UncompressorResult> future = pool.submit(new Uncompressor(pool, compressedFile));
futures.add(future);

while (!futures.isEmpty()) {
    Future<UncompressorResult> future = futures.poll();
    UncompressorResult result = future.get();

    futures.addAll(result.getFutures());
    uncompressedFiles.addAll(result.getFiles());
}

and the Uncompressor changes to something like:

public UncompressorResult call() throws Exception[
    List<File> uncompressedFiles = new ArrayList<File>();

    for (File entry : zipFiles) {
        if (entry is not ZIP) {
            uncompressedFiles.add(entry);
        } else {
            Callable<UncompressorResult> callable = new Uncompressor(this.pool, entry);
            Future<UncompressorResult> f = pool.submit(callable);
            futures.add(f);
        }
    }

    return new UncompressorResult(uncompressedFiles, futures;
}

Upvotes: 3

Related Questions