uh_big_mike_boi
uh_big_mike_boi

Reputation: 3470

What happens when you do java data manipulations in Spark outside of an RDD

I am reading a csv file from hdfs using Spark. It's going into an FSDataInputStream object. I cant use the textfile() method because it splits up the csv file by line feed, and I am reading a csv file that has line feeds inside the text fields. Opencsv from sourcefourge handles line feeds inside the cells, its a nice project, but it accepts a Reader as an input. I need to convert it to a string so that I can pass it to opencsv as a StringReader. So, HDFS File -> FSdataINputStream -> String -> StringReader -> an opencsv list of strings. Below is the code...

import java.io._
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.fs._
import org.apache.hadoop.conf._
import com.opencsv._
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import java.lang.StringBuilder

val conf = new Configuration()
val hdfsCoreSitePath = new Path("core-site.xml")
val hdfsHDFSSitePath = new Path("hdfs-site.xml")
conf.addResource(hdfsCoreSitePath)
conf.addResource(hdfsHDFSSitePath)
val fileSystem = FileSystem.get(conf)
val csvPath = new Path("/raw_data/project_name/csv/file_name.csv")
val csvFile = fileSystem.open(csvPath)
val fileLen = fileSystem.getFileStatus(csvPath).getLen().toInt

var b = Array.fill[Byte](2048)(0)
var j = 1

val stringBuilder = new StringBuilder()
var bufferString = ""

csvFile.seek(0)
csvFile.read(b)
var bufferString = new String(b,"UTF-8")
stringBuilder.append(bufferString)

while(j != -1) {b = Array.fill[Byte](2048)(0);j=csvFile.read(b);bufferString = new String(b,"UTF-8");stringBuilder.append(bufferString)}

val stringBuilderClean = new StringBuilder()
stringBuilderClean = stringBuilder.substring(0,fileLen)

val reader: Reader = new StringReader(stringBuilderClean.toString()).asInstanceOf[Reader]
val csv = new CSVReader(reader)
val javaContext = new JavaSparkContext(sc)
val sqlContext = new SQLContext(sc)
val javaRDD = javaContext.parallelize(csv.readAll())
//do a bunch of transformations on the RDD

It works but I doubt it is scalable. It makes me wonder how big of a limitation it is to have a driver program which pipes in all the data trough one jvm. My questions to anyone very familiar with spark are:

  1. What happens when you do data manipulations across your whole data set like this, before it even gets dropped into the input RDD? It is just treated as any other program and would be swapping out like crazy I guess?

  2. How would you then make any spark program scalable? Do you always NEED to extract the data directly into an input RDD?

Upvotes: 1

Views: 372

Answers (2)

taigetco
taigetco

Reputation: 570

Your code loads the data into the memory, and then Spark driver will split and send each part of data to executor, of cause, it is not scalable.
There are two ways to resolve your question.

write custom InputFormat to support CSV file format

import java.io.{InputStreamReader, IOException}

import com.google.common.base.Charsets
import com.opencsv.{CSVParser, CSVReader}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Seekable, Path, FileSystem}
import org.apache.hadoop.io.compress._
import org.apache.hadoop.io.{ArrayWritable, Text, LongWritable}
import org.apache.hadoop.mapred._

class CSVInputFormat extends FileInputFormat[LongWritable, ArrayWritable] with JobConfigurable {
  private var compressionCodecs: CompressionCodecFactory = _

  def configure(conf: JobConf) {
    compressionCodecs = new CompressionCodecFactory(conf)
  }

  protected override def isSplitable(fs: FileSystem, file: Path): Boolean = {
    val codec: CompressionCodec = compressionCodecs.getCodec(file)
    if (null == codec) {
      return true
    }
    codec.isInstanceOf[SplittableCompressionCodec]
  }

  @throws(classOf[IOException])
  def getRecordReader(genericSplit: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LongWritable, ArrayWritable] = {
    reporter.setStatus(genericSplit.toString)
    val delimiter: String = job.get("textinputformat.record.delimiter")
    var recordDelimiterBytes: Array[Byte] = null
    if (null != delimiter) {
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8)
    }
    new CsvLineRecordReader(job, genericSplit.asInstanceOf[FileSplit], recordDelimiterBytes)
  }
}

class CsvLineRecordReader(job: Configuration, split: FileSplit, recordDelimiter: Array[Byte])
  extends RecordReader[LongWritable, ArrayWritable] {
  private val compressionCodecs = new CompressionCodecFactory(job)
  private val maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
    LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE)
  private var filePosition: Seekable = _
  private val file = split.getPath
  private val codec = compressionCodecs.getCodec(file)
  private val isCompressedInput = codec != null
  private val fs = file.getFileSystem(job)
  private val fileIn = fs.open(file)

  private var start = split.getStart
  private var pos: Long = 0L
  private var end = start + split.getLength
  private var reader: CSVReader = _
  private var decompressor: Decompressor = _

  private lazy val CSVSeparator =
    if (recordDelimiter == null)
      CSVParser.DEFAULT_SEPARATOR
    else
      recordDelimiter(0).asInstanceOf[Char]

  if (isCompressedInput) {
    decompressor = CodecPool.getDecompressor(codec)
    if (codec.isInstanceOf[SplittableCompressionCodec]) {
      val cIn = (codec.asInstanceOf[SplittableCompressionCodec])
        .createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK)
      reader = new CSVReader(new InputStreamReader(cIn), CSVSeparator)
      start = cIn.getAdjustedStart
      end = cIn.getAdjustedEnd
      filePosition = cIn
    }else {
      reader = new CSVReader(new InputStreamReader(codec.createInputStream(fileIn, decompressor)), CSVSeparator)
      filePosition = fileIn
    }
  } else {
    fileIn.seek(start)
    reader = new CSVReader(new InputStreamReader(fileIn), CSVSeparator)
    filePosition = fileIn
  }

  @throws(classOf[IOException])
  private def getFilePosition: Long = {
    if (isCompressedInput && null != filePosition) {
      filePosition.getPos
    }else
      pos
  }

  private def nextLine: Option[Array[String]] = {
    if (getFilePosition < end){
      //readNext automatical split the line to elements
      reader.readNext() match {
        case null => None
        case elems => Some(elems)
      }
    } else
      None
  }

  override def next(key: LongWritable, value: ArrayWritable): Boolean =
    nextLine
      .exists { elems =>
        key.set(pos)
        val lineLength = elems.foldRight(0)((a, b) => a.length + 1 + b)
        pos += lineLength
        value.set(elems.map(new Text(_)))
        if (lineLength < maxLineLength) true else false
      }

  @throws(classOf[IOException])
  def getProgress: Float =
    if (start == end)
      0.0f
    else
      Math.min(1.0f, (getFilePosition - start) / (end - start).toFloat)

  override def getPos: Long = pos

  override def createKey(): LongWritable = new LongWritable

  override def close(): Unit = {
    try {
      if (reader != null) {
        reader.close
      }
    } finally {
      if (decompressor != null) {
        CodecPool.returnDecompressor(decompressor)
      }
    }
  }

  override def createValue(): ArrayWritable = new ArrayWritable(classOf[Text])
}

Simple test example:

val arrayRdd = sc.hadoopFile("source path", classOf[CSVInputFormat], classOf[LongWritable], classOf[ArrayWritable],
sc.defaultMinPartitions).map(_._2.get().map(_.toString))
arrayRdd.collect().foreach(e => println(e.mkString(",")))

The other way which I prefer uses spark-csv written by databricks, which is well supported for CSV file format, you can take some practices in the github page.

Updated for spark-csv, using univocity as parserLib, which can handle multi-line cells

val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("parserLib", "univocity")
.option("inferSchema", "true") // Automatically infer data types
.load("source path")

Upvotes: 3

Michael Lloyd Lee mlk
Michael Lloyd Lee mlk

Reputation: 14661

What happens when you do data manipulations across your whole data set like this, before it even gets dropped into the input RDD? It is just treated as any other program and would be swapping out like crazy I guess?

You load the whole dataset into local memory. So if you have the memory, it works.

How would you then make any spark program scalable?

You have select the a data format that spark can load, or you change your application so that it can load the data format into spark directly or a bit of both.

In this case you could look at creating a custom InputFormat that splits on something other than newlines. I think you would want to also look at how you write your data so it is partitioned in HDFS at record boundaries not new lines.

However I suspect the simplest answer is to encode the data differently. JSON Lines or encode the newlines in the CSV file during the write or Avro or... Anything that fits better with Spark & HDFS.

Upvotes: 1

Related Questions