Reputation: 11315
I have a S3 bucket that is filled with Gz files that have no file extension. For example s3://mybucket/1234502827-34231
sc.textFile
uses that file extension to select the decoder. I have found many blog post on handling custom file extensions but nothing about missing file extensions.
I think the solution may be sc.binaryFiles
and unzipping the file manually.
Another possibility is to figure out how sc.textFile finds the file format. I'm not clear what these classOf[]
calls work.
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
Upvotes: 4
Views: 4845
Reputation: 55
You can read binary file and do decompression using map function.
JavaRDD<Tuple2<String, PortableDataStream>> rawData = spark.sparkContext().binaryFiles(readLocation, 1).toJavaRDD();
JavaRDD<String> decompressedData = rawData.map((Function<Tuple2<String, PortableDataStream>, String>) stringPortableDataStreamTuple2 -> {
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPInputStream s = new GZIPInputStream(new ByteArrayInputStream(stringPortableDataStreamTuple2._2.toArray()));
IOUtils.copy(s, out);
return new String(out.toByteArray());
});
In case of JSON content you can read that into Dataset using
Dataset co = spark.read().json(decompressedData);
Upvotes: 0
Reputation: 55
You can create your own custom codec for decoding your file. You can start by extending GzipCodec and override getDefaultExtension method where you return empty string as an extension.
EDIT: That soultion will not work in all cases due to how CompressionCodecFactory is implemented. For example: By default codec for .lz4 is loaded. This means if name of a file that you want to load ends with 4, that codec will get picked instead of custom (w/o extension). As that codec does not match extension it will get later ditched and no codec will be used.
Java:
package com.customcodec;
import org.apache.hadoop.io.compress.GzipCodec;
public class GzipCodecNoExtension extends GzipCodec {
@Override
public String getDefaultExtension() {
return "";
}
}
In spark app you just register your codec:
SparkConf conf = new SparkConf()
.set("spark.hadoop.io.compression.codecs", "com.customcodec.GzipCodecNoExtension");
Upvotes: 0
Reputation: 11315
I found several examples out there that almost fit my needs. Here is the final code I used to parse a file compressed with GZ.
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._
def extractBSM(ps: PortableDataStream, n: Int = 1024) = Try {
val gz = new GzipCompressorInputStream(ps.open)
Stream.continually {
// Read n bytes
val buffer = Array.fill[Byte](n)(-1)
val i = gz.read(buffer, 0, n)
(i, buffer.take(i))
}
// Take as long as we've read something
.takeWhile(_._1 > 0)
.map(_._2)
.flatten
.toArray
}
def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8)
val inputFile = "s3://my-bucket/157c96bd-fb21-4cc7-b340-0bd4b8e2b614"
val rdd = sc.binaryFiles(inputFile).flatMapValues(x => extractBSM(x).toOption).map( x => decode()(x._2) )
val rdd2 = rdd.flatMap { x => x.split("\n") }
rdd2.take(10).foreach(println)
Upvotes: 1
Reputation: 10450
Can you try to combine the below solution for ZIP files, with gzipFileInputFormat library?
here - How to open/stream .zip files through Spark? You can see how to do it using ZIP:
rdd1 = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());
gzipFileInputFormat:
Some details about newAPIHadoopFile() can be found here: http://spark.apache.org/docs/latest/api/python/pyspark.html
Upvotes: 2