Belphegor
Belphegor

Reputation: 4766

Read whole text files from a compression in Spark

I have the following problem: suppose that I have a directory containing compressed directories which contain multiple files, stored on HDFS. I want to create an RDD consisting some objects of type T, i.e.:

context = new JavaSparkContext(conf);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String content = fileNameContent._2();

    // Class T has a constructor of taking the filename and the content of each
    // processed file (as two strings)
    T t = new T(content, fileName);

    return t;
});

Now when inputDataPath is a directory containing files this works perfectly fine, i.e. when it's something like:

String inputDataPath =  "hdfs://some_path/*/*/"; // because it contains subfolders

But, when there's a tgz containing multiple files, the file content (fileNameContent._2()) gets me some useless binary string (quite expected). I found a similar question on SO, but it's not the same case, because there the solution is when each compression consists of one file only, and in my case there are many other files which I want to read individually as whole files. I also found a question about wholeTextFiles, but this doesn't work in my case.

Any ideas how to do this?

EDIT:

I tried with the reader from here (trying to test the reader from here, like in the function testTarballWithFolders()), but whenever I call

TarballReader tarballReader = new TarballReader(fileName);

and I get NullPointerException:

java.lang.NullPointerException
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)
    at utils.TarballReader.<init>(TarballReader.java:61)
    at main.SparkMain.lambda$0(SparkMain.java:105)
    at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

The line 105 in MainSpark is the one I showed upper in my edit of the post, and line 61 from TarballReader is

GZIPInputStream gzip = new GZIPInputStream(in);

which gives a null value for the input stream in in the upper line:

InputStream in = this.getClass().getResourceAsStream(tarball);

Am I on the right path here? If so, how do I continue? Why do I get this null value and how can I fix it?

Upvotes: 15

Views: 16575

Answers (2)

Neil Best
Neil Best

Reputation: 823

A slight improvement on the accepted answer is to change

Option(tar.getNextTarEntry)

to

Try(tar.getNextTarEntry).toOption.filter( _ != null)

to contend with malformed / truncated .tar.gzs in a robust way.

BTW, is there anything special about the size of the buffer array? Would it faster on average if it were closer to the average file size, maybe 500k in my case? Or is the slowdown I am seeing more likely the overhead of Stream relative to a while loop that was more Java-ish, I guess.

Upvotes: 0

zero323
zero323

Reputation: 330133

One possible solution is to read data with binaryFiles and extract content manually.

Scala:

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._

def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
  val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
  Stream.continually(Option(tar.getNextTarEntry))
    // Read until next exntry is null
    .takeWhile(_.isDefined)
    // flatten
    .flatMap(x => x)
    // Drop directories
    .filter(!_.isDirectory)
    .map(e => {
      Stream.continually {
        // Read n bytes
        val buffer = Array.fill[Byte](n)(-1)
        val i = tar.read(buffer, 0, n)
        (i, buffer.take(i))}
      // Take as long as we've read something
      .takeWhile(_._1 > 0)
      .map(_._2)
      .flatten
      .toArray})
    .toArray
}

def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = 
  new String(bytes, StandardCharsets.UTF_8)

sc.binaryFiles("somePath").flatMapValues(x => 
  extractFiles(x).toOption).mapValues(_.map(decode()))
libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11"

Full usage example with Java: https://bitbucket.org/zero323/spark-multifile-targz-extract/src

Python:

import tarfile
from io import BytesIO

def extractFiles(bytes):
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
    return [tar.extractfile(x).read() for x in tar if x.isfile()]

(sc.binaryFiles("somePath")
    .mapValues(extractFiles)
    .mapValues(lambda xs: [x.decode("utf-8") for x in xs]))

Upvotes: 36

Related Questions