jspooner
jspooner

Reputation: 11315

Spark - Read compressed files without file extension

I have a S3 bucket that is filled with Gz files that have no file extension. For example s3://mybucket/1234502827-34231

sc.textFile uses that file extension to select the decoder. I have found many blog post on handling custom file extensions but nothing about missing file extensions.

I think the solution may be sc.binaryFiles and unzipping the file manually.

Another possibility is to figure out how sc.textFile finds the file format. I'm not clear what these classOf[] calls work.

  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

Upvotes: 4

Views: 4845

Answers (4)

Robert
Robert

Reputation: 55

You can read binary file and do decompression using map function.

JavaRDD<Tuple2<String, PortableDataStream>> rawData = spark.sparkContext().binaryFiles(readLocation, 1).toJavaRDD();

JavaRDD<String> decompressedData = rawData.map((Function<Tuple2<String, PortableDataStream>, String>) stringPortableDataStreamTuple2 -> {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    GZIPInputStream s = new GZIPInputStream(new ByteArrayInputStream(stringPortableDataStreamTuple2._2.toArray()));
    IOUtils.copy(s, out);

    return new String(out.toByteArray());
});

In case of JSON content you can read that into Dataset using

Dataset co = spark.read().json(decompressedData);

Upvotes: 0

Robert
Robert

Reputation: 55

You can create your own custom codec for decoding your file. You can start by extending GzipCodec and override getDefaultExtension method where you return empty string as an extension.

EDIT: That soultion will not work in all cases due to how CompressionCodecFactory is implemented. For example: By default codec for .lz4 is loaded. This means if name of a file that you want to load ends with 4, that codec will get picked instead of custom (w/o extension). As that codec does not match extension it will get later ditched and no codec will be used.

Java:

package com.customcodec;

import org.apache.hadoop.io.compress.GzipCodec;

public class GzipCodecNoExtension extends GzipCodec {

    @Override
    public String getDefaultExtension() {
        return "";
    }
}

In spark app you just register your codec:

    SparkConf conf = new SparkConf()
            .set("spark.hadoop.io.compression.codecs", "com.customcodec.GzipCodecNoExtension");

Upvotes: 0

jspooner
jspooner

Reputation: 11315

I found several examples out there that almost fit my needs. Here is the final code I used to parse a file compressed with GZ.

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._

def extractBSM(ps: PortableDataStream, n: Int = 1024) = Try {
  val gz = new GzipCompressorInputStream(ps.open)
  Stream.continually {
    // Read n bytes
    val buffer = Array.fill[Byte](n)(-1)
    val i = gz.read(buffer, 0, n)
    (i, buffer.take(i))
  }
  // Take as long as we've read something
  .takeWhile(_._1 > 0)
  .map(_._2)
  .flatten
  .toArray
}
def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8)
val inputFile = "s3://my-bucket/157c96bd-fb21-4cc7-b340-0bd4b8e2b614"
val rdd = sc.binaryFiles(inputFile).flatMapValues(x => extractBSM(x).toOption).map( x => decode()(x._2) )
val rdd2 = rdd.flatMap { x => x.split("\n") }
rdd2.take(10).foreach(println)

Upvotes: 1

Yaron
Yaron

Reputation: 10450

Can you try to combine the below solution for ZIP files, with gzipFileInputFormat library?

here - How to open/stream .zip files through Spark? You can see how to do it using ZIP:

rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());

gzipFileInputFormat:

https://github.com/bsankaran/internet_routing/blob/master/hadoop-tr/src/main/java/edu/usc/csci551/tools/GZipFileInputFormat.java

Some details about newAPIHadoopFile() can be found here: http://spark.apache.org/docs/latest/api/python/pyspark.html

Upvotes: 2

Related Questions