wonder garance
wonder garance

Reputation: 339

Multiple threads to read and import files in a directory

I have a Java program to do 2 steps:

  1. Read files in a directory recursively with multiple threads (10 threads)
  2. Send these files to a server by HTTP Post

It seems that the first step works well, but the second sends always the same file 10 times.

How to correct this error?

Here is my log:

Import file: 2005_1.xml
Import file: 2005_7.xml
Import file: 2005_6.xml
Import file: 2005_10.xml
Import file: 2005_5.xml
Import file: 2005_11.xml
Import file: 2005_8.xml
Import file: 2005_2.xml
Import file: 2005_3.xml
Import file: 2005_4.xml

Result: {"fileName":"2005_4.xml"
Result: {"fileName":"2005_4.xml"
...

Response: HttpResponseProxy{HTTP/1.1 400  [
Import file: 2005_9.xml
Result: {"fileName":"2005_4.xml"
...
Response: HttpResponseProxy{HTTP/1.1 200  [
Result: {"fileName":"2005_4.xml"

Response: HttpResponseProxy{HTTP/1.1 200  [
Result: {"fileName":"2005_9.xml"

And my code: Read files in a directory with multiple threads:

public void listSendFilesMultiThread(final File folder) {
        ExecutorService service = Executors.newFixedThreadPool(10, getThreadFactory());

        for (final File fileEntry : folder.listFiles()) {
            Runnable r;
            r = new Runnable() {
                @Override
                public void run() {
                    if (fileEntry.isDirectory()) {
                        listSendFilesMultiThread(fileEntry);
                    } else {
                        GetThread thread = new GetThread(fileEntry, errorFilesDestDir);

                        // start the thread
                        thread.start();

                        // join the threads
                        try {
                            thread.join();
                        } catch (InterruptedException e) {
                            LOGGER.error("InterruptedException: " + e);
                        }
                    }
                }
            };
            service.execute(r);
        }
    }

Send files by HTTP Post:

static class GetThread extends Thread {

        private final CloseableHttpClient closeableHttpClient;
        private final File file;
        private final String errorFilesDestDir;
        private final PoolingHttpClientConnectionManager cm;
        private MultipartEntityBuilder builder;

        public GetThread(File file, String errorFilesDestDir) {
            cm = new PoolingHttpClientConnectionManager();
            closeableHttpClient = HttpClients.custom().setConnectionManager(cm).build();

            this.file = file;
            this.errorFilesDestDir = errorFilesDestDir;
        }

        @Override
        public void run() {
            try {
                if (file.exists() && file.length() > 0) {
                    FileBody fileBody = new FileBody(file);
                    // we should create a new builder per file
                    builder = MultipartEntityBuilder.create();
                    builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
                    builder.addPart("xmlFile", fileBody);
                    HttpEntity entity = builder.build();
                    request.setEntity(entity);

                    LOGGER.info("Import file: " + file.getName());
                    CloseableHttpResponse response = closeableHttpClient.execute(request);
                    LOGGER.info("Response: {}", response.toString());

                    try {
                        entity = response.getEntity();
                        printInfo(response, entity);
                    } finally {
                        response.close();
                        closeableHttpClient.close();
                        cm.close();
                    }

                    EntityUtils.consume(entity);
                } else if (file.length() == 0) {
                    LOGGER.error("The import XML file is empty: " + file.getAbsolutePath());
                    Files.copy(file.toPath(), new File(errorFilesDestDir + file.getName()).toPath(), StandardCopyOption.REPLACE_EXISTING);
                } else {
                    LOGGER.error("The import XML file doesn't exist");
                }
            } catch (ClientProtocolException e) {
                // Handle protocol errors
                LOGGER.error("ClientProtocolException: " + e.toString());
            } catch (IOException e) {
                // Handle I/O errors
                LOGGER.error("IOException: " + e.toString());
            }
        }
    }

Connection eviction policy

public static class IdleConnectionMonitorThread extends Thread {

        private final HttpClientConnectionManager connMgr;
        private volatile boolean shutdown;

        public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) {
            super();
            this.connMgr = connMgr;
        }

        @Override
        public void run() {
            try {
                while (!shutdown) {
                    synchronized (this) {
                        wait(5000);
                        // Close expired connections
                        connMgr.closeExpiredConnections();
                        // Optionally, close connections
                        // that have been idle longer than 30 sec
                        connMgr.closeIdleConnections(30, TimeUnit.SECONDS);
                    }
                }
            } catch (InterruptedException e) {
                LOGGER.error("InterruptedException: " + e.toString());
            }
        }

        public void shutdown() {
            shutdown = true;
            synchronized (this) {
                notifyAll();
            }
        }
    }

Upvotes: 1

Views: 3115

Answers (3)

Hulk
Hulk

Reputation: 6573

The variable request you are using in the run()-method of your GetThread is not mentioned anywhere else. As this is probably an inner class (as indicated by the static keyword), I guess that this is some field of the containing class and therefore shared among all threads.

As access to this global variable is not synchronized, the outcome of the overall operation is unpredictable.

Each thread should have its own instance of request. Keeping a glogal reference to a request-Object is almost never a good idea - it can lead to other problems as well, as Application Containers (for example Tomcat) sometimes have a pool of such Objects as well as the underlying resources (Connection Objects etc.) that may get reused later.


That said, in this case you are very likely IO bound anyway (i.e. limited by disk speed and upload bandwith, not by processing power) and don't gain anything by multithreading at all. I would use a single threaded solution here.

Upvotes: 0

Imran
Imran

Reputation: 1902

I recommend to use producer and consumer pattern to simplify the code. One thread or several (one should be enough as it is not doing any processing) will use your logic and find the files to be uploaded. Next, it will place them in the queue. Start as many consumers as you wish to read the records from the queue and upload them to the server. https://dzone.com/articles/concurrency-pattern-producer The consumers will have the logic to read the file from disk and upload to the server.

Upvotes: 1

mic4ael
mic4ael

Reputation: 8300

I guess the problem is that you are running your 10 threads without any separation exclusion - if one thread processes one directory, another thread should not process the same folder. It means they are doing exactly the same work. You should make sure a thread doesn't start processing already visited directory or file.

Upvotes: 0

Related Questions