Reputation: 3470
I am reading a csv file from hdfs using Spark. It's going into an FSDataInputStream object. I cant use the textfile() method because it splits up the csv file by line feed, and I am reading a csv file that has line feeds inside the text fields. Opencsv from sourcefourge handles line feeds inside the cells, its a nice project, but it accepts a Reader as an input. I need to convert it to a string so that I can pass it to opencsv as a StringReader. So, HDFS File -> FSdataINputStream -> String -> StringReader -> an opencsv list of strings. Below is the code...
import java.io._
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.fs._
import org.apache.hadoop.conf._
import com.opencsv._
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import java.lang.StringBuilder
val conf = new Configuration()
val hdfsCoreSitePath = new Path("core-site.xml")
val hdfsHDFSSitePath = new Path("hdfs-site.xml")
conf.addResource(hdfsCoreSitePath)
conf.addResource(hdfsHDFSSitePath)
val fileSystem = FileSystem.get(conf)
val csvPath = new Path("/raw_data/project_name/csv/file_name.csv")
val csvFile = fileSystem.open(csvPath)
val fileLen = fileSystem.getFileStatus(csvPath).getLen().toInt
var b = Array.fill[Byte](2048)(0)
var j = 1
val stringBuilder = new StringBuilder()
var bufferString = ""
csvFile.seek(0)
csvFile.read(b)
var bufferString = new String(b,"UTF-8")
stringBuilder.append(bufferString)
while(j != -1) {b = Array.fill[Byte](2048)(0);j=csvFile.read(b);bufferString = new String(b,"UTF-8");stringBuilder.append(bufferString)}
val stringBuilderClean = new StringBuilder()
stringBuilderClean = stringBuilder.substring(0,fileLen)
val reader: Reader = new StringReader(stringBuilderClean.toString()).asInstanceOf[Reader]
val csv = new CSVReader(reader)
val javaContext = new JavaSparkContext(sc)
val sqlContext = new SQLContext(sc)
val javaRDD = javaContext.parallelize(csv.readAll())
//do a bunch of transformations on the RDD
It works but I doubt it is scalable. It makes me wonder how big of a limitation it is to have a driver program which pipes in all the data trough one jvm. My questions to anyone very familiar with spark are:
What happens when you do data manipulations across your whole data set like this, before it even gets dropped into the input RDD? It is just treated as any other program and would be swapping out like crazy I guess?
How would you then make any spark program scalable? Do you always NEED to extract the data directly into an input RDD?
Upvotes: 1
Views: 372
Reputation: 570
Your code loads the data into the memory, and then Spark driver will split and send each part of data to executor, of cause, it is not scalable.
There are two ways to resolve your question.
write custom InputFormat to support CSV file format
import java.io.{InputStreamReader, IOException}
import com.google.common.base.Charsets
import com.opencsv.{CSVParser, CSVReader}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Seekable, Path, FileSystem}
import org.apache.hadoop.io.compress._
import org.apache.hadoop.io.{ArrayWritable, Text, LongWritable}
import org.apache.hadoop.mapred._
class CSVInputFormat extends FileInputFormat[LongWritable, ArrayWritable] with JobConfigurable {
private var compressionCodecs: CompressionCodecFactory = _
def configure(conf: JobConf) {
compressionCodecs = new CompressionCodecFactory(conf)
}
protected override def isSplitable(fs: FileSystem, file: Path): Boolean = {
val codec: CompressionCodec = compressionCodecs.getCodec(file)
if (null == codec) {
return true
}
codec.isInstanceOf[SplittableCompressionCodec]
}
@throws(classOf[IOException])
def getRecordReader(genericSplit: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LongWritable, ArrayWritable] = {
reporter.setStatus(genericSplit.toString)
val delimiter: String = job.get("textinputformat.record.delimiter")
var recordDelimiterBytes: Array[Byte] = null
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8)
}
new CsvLineRecordReader(job, genericSplit.asInstanceOf[FileSplit], recordDelimiterBytes)
}
}
class CsvLineRecordReader(job: Configuration, split: FileSplit, recordDelimiter: Array[Byte])
extends RecordReader[LongWritable, ArrayWritable] {
private val compressionCodecs = new CompressionCodecFactory(job)
private val maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE)
private var filePosition: Seekable = _
private val file = split.getPath
private val codec = compressionCodecs.getCodec(file)
private val isCompressedInput = codec != null
private val fs = file.getFileSystem(job)
private val fileIn = fs.open(file)
private var start = split.getStart
private var pos: Long = 0L
private var end = start + split.getLength
private var reader: CSVReader = _
private var decompressor: Decompressor = _
private lazy val CSVSeparator =
if (recordDelimiter == null)
CSVParser.DEFAULT_SEPARATOR
else
recordDelimiter(0).asInstanceOf[Char]
if (isCompressedInput) {
decompressor = CodecPool.getDecompressor(codec)
if (codec.isInstanceOf[SplittableCompressionCodec]) {
val cIn = (codec.asInstanceOf[SplittableCompressionCodec])
.createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK)
reader = new CSVReader(new InputStreamReader(cIn), CSVSeparator)
start = cIn.getAdjustedStart
end = cIn.getAdjustedEnd
filePosition = cIn
}else {
reader = new CSVReader(new InputStreamReader(codec.createInputStream(fileIn, decompressor)), CSVSeparator)
filePosition = fileIn
}
} else {
fileIn.seek(start)
reader = new CSVReader(new InputStreamReader(fileIn), CSVSeparator)
filePosition = fileIn
}
@throws(classOf[IOException])
private def getFilePosition: Long = {
if (isCompressedInput && null != filePosition) {
filePosition.getPos
}else
pos
}
private def nextLine: Option[Array[String]] = {
if (getFilePosition < end){
//readNext automatical split the line to elements
reader.readNext() match {
case null => None
case elems => Some(elems)
}
} else
None
}
override def next(key: LongWritable, value: ArrayWritable): Boolean =
nextLine
.exists { elems =>
key.set(pos)
val lineLength = elems.foldRight(0)((a, b) => a.length + 1 + b)
pos += lineLength
value.set(elems.map(new Text(_)))
if (lineLength < maxLineLength) true else false
}
@throws(classOf[IOException])
def getProgress: Float =
if (start == end)
0.0f
else
Math.min(1.0f, (getFilePosition - start) / (end - start).toFloat)
override def getPos: Long = pos
override def createKey(): LongWritable = new LongWritable
override def close(): Unit = {
try {
if (reader != null) {
reader.close
}
} finally {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor)
}
}
}
override def createValue(): ArrayWritable = new ArrayWritable(classOf[Text])
}
Simple test example:
val arrayRdd = sc.hadoopFile("source path", classOf[CSVInputFormat], classOf[LongWritable], classOf[ArrayWritable],
sc.defaultMinPartitions).map(_._2.get().map(_.toString))
arrayRdd.collect().foreach(e => println(e.mkString(",")))
The other way which I prefer uses spark-csv written by databricks, which is well supported for CSV file format, you can take some practices in the github page.
Updated for spark-csv, using univocity as parserLib, which can handle multi-line cells
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("parserLib", "univocity")
.option("inferSchema", "true") // Automatically infer data types
.load("source path")
Upvotes: 3
Reputation: 14661
What happens when you do data manipulations across your whole data set like this, before it even gets dropped into the input RDD? It is just treated as any other program and would be swapping out like crazy I guess?
You load the whole dataset into local memory. So if you have the memory, it works.
How would you then make any spark program scalable?
You have select the a data format that spark can load, or you change your application so that it can load the data format into spark directly or a bit of both.
In this case you could look at creating a custom InputFormat
that splits on something other than newlines. I think you would want to also look at how you write your data so it is partitioned in HDFS at record boundaries not new lines.
However I suspect the simplest answer is to encode the data differently. JSON Lines or encode the newlines in the CSV file during the write or Avro or... Anything that fits better with Spark & HDFS.
Upvotes: 1