Neethu Lalitha
Neethu Lalitha

Reputation: 3071

HDFS read using multithreading

I am reading files from HDFS directory using multi-threading using a Producer-Consumer model, leveraging BlockingQueue.

Here is my code;

producer class:

public void readURLS() {
    final int capacity = Integer.MAX_VALUE;

    BlockingQueue<String> queue = new LinkedBlockingQueue<>(capacity);
    try {
        FileSystem hdfs = FileSystem.get(hadoopConf);
        FileStatus[] status = hdfs.listStatus(new Path("MYHDFS_PATH"));

        int i = 0;

       for (FileStatus file : status) {
            LOG.info("Thread {} started: ", i++);
            LOG.info("Reading file {} ", file.getPath().getName());
            new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();
       }
    } catch (IOException e) {
        LOG.error("IOException occured while listing files from HDFS directory");
    }

}

FetchData:

 @Override
    public void run() {
        LOG.info("Inside reader to start reading the files ");

        try (BufferedReader bufferedReader =
                new BufferedReader(new InputStreamReader
                        (FileSystem.get(hadoopConf).open(file), StandardCharsets.UTF_8))) {


            String line;
            while ((line = bufferedReader.readLine()) != null) {
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
                LOG.info("Line is :{}", line);
                queue.put(line);

            }
         
        } catch (IOException e) {
            LOG.error("file : {} ", file.toString());
            throw new IOException(e);
        } catch (InterruptedException e) {
            LOG.error("An error has occurred: ", e);
            Thread.currentThread().interrupt();

        }

While executing the code it throws me InterruptedIOException:

java.io.IOException: Failed on local exception: java.io.**InterruptedIOException**: Interruped while waiting for IO on channel java.nio.channels.SocketChannel[connected 

Any idea why. My idea is to loop over each file and read each file using a separate thread.

Upvotes: 0

Views: 1261

Answers (1)

Oleg Muravskiy
Oleg Muravskiy

Reputation: 775

I'm also getting same behavior when using HDFS from multiple (many!) threads, and do not know the answer to the question "why?", but keeping the number of threads accessing HDFS concurrently seems to help.

In your case I would recommend to use an ExecutorService with limited number of threads, and fine-tune that number to the limit when you do not get exceptions.

So, create the ExecutorService (with 10 threads as a starting point):

final ExecutorService executorService = Executors.newFixedThreadPool(10);

and instead of your

new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();

do

executorService.submit(new FetchData(queue, file.getPath(), hadoopConf));

Another improvement is since org.apache.hadoop.fs.FileSystem implements Closeable, you should close it. In your code every thread creates a new instance of FileSystem, but does not close it. So I would extract it into a variable inside your try:

try (FileSystem fileSystem = FileSystem.get(hadoopConf);
     BufferedReader bufferedReader =
             new BufferedReader(new InputStreamReader
                     (fileSystem.open(file), StandardCharsets.UTF_8))) {

UPDATE:

Although the above code seems to be the right approach for the Closeable objects, by default FileSystem.get will return cached instances from the

/** FileSystem cache */
static final Cache CACHE = new Cache();

and thus things will break horribly when close() will be called on them.

You could either disable the FileSystem cache by setting fs.hdfs.impl.disable.cache config param to true, or make sure the FileSystem instance(s) only closed when all workers have finished. It also seems that you could just use a single instance of FileSystem for all your workers, although I can't find any confirmation in javadocs that this will work properly without extra synchronisation.

Upvotes: 2

Related Questions