Rheel
Rheel

Reputation: 420

Inconsistent output from multithreaded FTP InputStreams

I'm trying to create a java program that downloads certain asset files from an FTP server to a local file. Because my (free) FTP server doesn't support file sizes over a few megabytes, I decided to split up the files when they are uploaded and recombine them when the program downloads them. This works, but it is rather slow, because for each file, it has to get the InputStream, which takes some time.

The FTP server I use has a way to download the files without actually logging into the server, so I'm using this code to get the InputStream:

private static final InputStream getInputStream(String file) throws IOException {
    return new URL("http://site.website.com/path/" + file).openStream();
}

To get the InputStream of a part of the asset file I'm using this code:

public static InputStream getAssetInputStream(String asset, int num) throws IOException, FTPException {
    try {
        return getInputStream("assets/" + asset + "_" + num + ".raf");
    } catch (Exception e) {
        // error handling
    }
}

Because the getAssetInputStreams(String, int) method takes some time to run (especially if the file size is more then a megabyte), I decided to make the code that actually downloads the file multi-threaded. Here is where my problem lies.

final Map<Integer, Boolean> done = new HashMap<Integer, Boolean>();
final Map<Integer, byte[]> parts = new HashMap<Integer, byte[]>();

for (int i = 0; i < numParts; i++) {
    final int part = i;
    done.put(part, false);

    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                InputStream is = FTP.getAssetInputStream(asset, part);
                ByteArrayOutputStream baos = new ByteArrayOutputStream();

                byte[] buf = new byte[DOWNLOAD_BUFFER_SIZE];
                int len = 0;

                while ((len = is.read(buf)) > 0) {
                    baos.write(buf, 0, len);
                    curDownload.addAndGet(len);
                    totAssets.addAndGet(len);
                }

                parts.put(part, baos.toByteArray());
                done.put(part, true);
            } catch (IOException e) {
                // error handling
            } catch (FTPException e) {
                // error handling
            }
        }
    }, "Download-" + asset + "-" + i).start();
}

while (done.values().contains(false)) {
    try {
        Thread.sleep(100);
    } catch(InterruptedException e) {
        e.printStackTrace();
    }
}

File assetFile = new File(dir, "assets/" + asset + ".raf");
assetFile.createNewFile();
FileOutputStream fos = new FileOutputStream(assetFile);

for (int i = 0; i < numParts; i++) {
    fos.write(parts.get(i));
}

fos.close();

This code works, but not always. When I run it on my desktop computer, it works almost always. Not 100% of the time, but often it works just fine. On my laptop, which has a far worse internet connection, it almost never works. The result is a file that is incomplete. Sometimes, it downloads 50% of the file. Sometimes, it downloads 90% of the file, it differs every time.

Now, if I replace the .start() by .run(), the code works just fine, 100% of the time, even on my laptop. It is, however, incredibly slow, so I'd rather not use .run().

Is there a way I could change my code so it does work multi-threaded? Any help will be appreciated.

Upvotes: 0

Views: 177

Answers (1)

initramfs
initramfs

Reputation: 8415

Firstly, get your FTP server replaced, there are plenty of free FTP servers that support arbitrary file size serving with additional features, but I digress...

Your code seems to have many unrelated problems that could potentially all cause the behavior you are seeing, addressed below:

  1. You have race conditions from accessing the done and parts maps from unprotected/unsynchronized access from multiple threads. This could cause data corruption and loss of synchronization for these variables between threads, potentially causing done.values().contains(false) to return true even when it's really not.

  2. You are calling done.values().contains() repeatedly at a high frequency. Whilst the javadoc doesn't explicitly state, a hash map likely traverses every value in a O(n) fashion to check if a given map contains a value. Coupled with the fact that other threads are modifying the map, you'll get undefined behavior. According to values() javadoc:

    If the map is modified while an iteration over the collection is in progress (except through the iterator's own remove operation), the results of the iteration are undefined.

  3. You are somehow calling new URL("http://site.website.com/path/" + file).openStream(); but stating you are using FTP. The http:// in the link defines the protocol openStream() tries to open in and http:// is not ftp://. Not sure if this is a typo or did you mean HTTP (or do you have an HTTP server serving identical files).

  4. Any thread raising any type of Exception will cause the code to fail given that not all parts will have "completed" (based on your busy-wait loop design). Granted, you may be redacted some other logic to guard against this, but otherwise this is a potential problem with the code.

  5. You aren't closing any streams that you've opened. This could mean that the underlying socket itself is also left open. Not only does this constitute resource leakage, if the server itself has some sort of maximum number of simultaneous connection limit, you are only causing new connections to fail because the old, completed transfers are not closed.

Based on the issues above, I propose moving the download logic into a Callable task and running them through an ExecutorService as follows:

LinkedList<Callable<byte[]>> tasksToExecute = new LinkedList<>();

// Populate tasks to run
for(int i = 0; i < numParts; i++){
    final int part = i;

    // Lambda to 
    tasksToExecute.add(() -> {
        InputStream is = null;

        try{
            is = FTP.getAssetInputStream(asset, part);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();

            byte[] buf = new byte[DOWNLOAD_BUFFER_SIZE];
            int len = 0;

            while((len = is.read(buf)) > 0){
                baos.write(buf, 0, len);
                curDownload.addAndGet(len);
                totAssets.addAndGet(len);
            }

            return baos.toByteArray();
        }catch(IOException e){
            // handle exception
        }catch(FTPException e){
            // handle exception
        }finally{
            if(is != null){
                try{
                    is.close();
                }catch(IOException ignored){}
            }
        }

        return null;
    });
}

// Retrieve an ExecutorService instance, note the use of work stealing pool is Java 8 only
// This can be substituted for newFixedThreadPool(nThreads) for Java < 8 as well for tight control over number of simultaneous links
ExecutorService executor = Executors.newWorkStealingPool(4);

// Tells the executor to execute all the tasks and give us the results
List<Future<byte[]>> resultFutures = executor.invokeAll(tasksToExecute);

// Populates the file
File assetFile = new File(dir, "assets/" + asset + ".raf");
assetFile.createNewFile();

try(FileOutputStream fos = new FileOutputStream(assetFile)){
    // Iterate through the futures, writing them to file in order
    for(Future<byte[]> result : resultFutures){
        byte[] partData = result.get();

        if(partData == null){
            // exception occured during downloading this part, handle appropriately
        }else{
            fos.write(partData);
        }
    }
}catch(IOException ex(){
    // handle exception
}

Using the executor service, you further optimize your multi-threading scenario since the output file will start writing as soon as pieces (in order) are available and that threads themselves are reused to save on thread creation costs.

As mentioned, there could be the case where too many simultaneous links causes the server to reject connections (or even more dangerously, write an EOF to make you think the part was downloaded). In this case, the number of worker threads can be tweaked by newFixedThreadPool(nThreads) to ensure at any given time, only nThreads amount of downloads can happen concurrently.

Upvotes: 1

Related Questions