Spark/Hadoop throws exception for large LZO files

I'm running an EMR Spark job on some LZO-compressed log-files stored in S3. There are several logfiles stored in the same folder, e.g.:

...
s3://mylogfiles/2014-08-11-00111.lzo
s3://mylogfiles/2014-08-11-00112.lzo
...

In the spark-shell I'm running a job that counts the lines in the files. If I count the lines individually for each file, there is no problem, e.g. like this:

// Works fine
...
sc.textFile("s3://mylogfiles/2014-08-11-00111.lzo").count()
sc.textFile("s3://mylogfiles/2014-08-11-00112.lzo").count()
...

If I use a wild-card to load all the files with a one-liner, I get two kinds of exceptions.

// One-liner throws exceptions
sc.textFile("s3://mylogfiles/*.lzo").count()

The exceptions are:

java.lang.InternalError: lzo1x_decompress_safe returned: -6
    at com.hadoop.compression.lzo.LzoDecompressor.decompressBytesDirect(Native Method)

and

java.io.IOException: Compressed length 1362309683 exceeds max block size 67108864 (probably corrupt file)
    at com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:291)

It seems to me that the solution is hinted by the text given with the last exception, but I don't know how to proceed. Is there a limit to how big LZO files are allowed to be, or what is the issue?

My question is: Can I run Spark queries that load all LZO-compressed files in an S3 folder, without getting I/O related exceptions?

There are 66 files of roughly 200MB per file.

EDIT: The exception only occurs when running Spark with Hadoop2 core libs (ami 3.1.0). When running with Hadoop1 core libs (ami 2.4.5), things work fine. Both cases were tested with Spark 1.0.1.

Upvotes: 5

Views: 4371

Answers (3)

Eric Eijkelenboom
Eric Eijkelenboom

Reputation: 7021

kgeyti's answer works fine, but:

LzoTextInputFormat introduces a performance hit, since it checks for an .index file for each LZO file. This can be especially painful with many LZO files on S3 (I've experienced up to several minutes delay, caused by thousands of requests to S3).

If you know up front that your LZO files are not splittable, a more performant solution is to create a custom, non-splittable input format:

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

class NonSplittableTextInputFormat extends TextInputFormat {
    override def isSplitable(context: JobContext, file: Path): Boolean = false
}

and read the files like this:

context.newAPIHadoopFile("s3://mylogfiles/*.lzo",
  classOf[NonSplittableTextInputFormat],
  classOf[org.apache.hadoop.io.LongWritable],
  classOf[org.apache.hadoop.io.Text])
.map(_._2.toString)

Upvotes: 5

jkgeyti
jkgeyti

Reputation: 2404

I haven't run into this specific issue myself, but it looks like .textFile expects files to be splittable, much like the Cedrik's problem of Hive insisting on using CombineFileInputFormat

You could either index your lzo files, or try using the LzoTextInputFormat - I'd be interested to hear if that works better on EMR:

sc.newAPIHadoopFile("s3://mylogfiles/*.lz", 
    classOf[com.hadoop.mapreduce.LzoTextInputFormat],
    classOf[org.apache.hadoop.io.LongWritable],
    classOf[org.apache.hadoop.io.Text])
  .map(_._2.toString) // if you just want a RDD[String] without writing a new InputFormat
  .count

Upvotes: 4

Cedrik Neumann
Cedrik Neumann

Reputation: 151

yesterday we deployed Hive on a EMR cluster and had the same problem with some LZO files in S3 which have been taken without any problem by another non EMR cluster. After some digging in the logs I noticed, that the map tasks read the S3 files in 250MB chunks, although the files are definitely not splittable.

It turned out that the paramter mapreduce.input.fileinputformat.split.maxsize was set to 250000000 ~ 250MB. That resulted in LZO opening a stream from within a file and a ultimately a corrupt LZO block.

I set the parameter mapreduce.input.fileinputformat.split.maxsize=2000000000 bigger as the maximum file size of our input data and everything works now.

I'm not exactly sure how that correlates to Spark exactly, but changing the InputFormat might help, which seems like the problem in first place, as it has been mentioned in How Amazon EMR Hive Differs from Apache Hive.

Upvotes: 4

Related Questions