prakash
prakash

Reputation: 111

Create spark data frame from custom data format

I have text file with String REC as the record delimiter and line break as the column delimiter, and every data has column name attached to it with comma as delimiter, below is the sample data format

REC
Id,19048
Term,milk
Rank,1
REC
Id,19049
Term,corn
Rank,5

Used REC as the record delimiter.Now, i want to create the spark data frame with column names ID, Term and Rank.Please Assist me on this.

Upvotes: 3

Views: 4050

Answers (2)

Narendra Parmar
Narendra Parmar

Reputation: 1409

here is working code

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}


object RecordSeparator extends App {
  var conf = new
      SparkConf().setAppName("test").setMaster("local[1]")
    .setExecutorEnv("executor- cores", "2")
  var sc = new SparkContext(conf)
  val hconf = new Configuration
  hconf.set("textinputformat.record.delimiter", "REC")
  val data = sc.newAPIHadoopFile("data.txt",
    classOf[TextInputFormat], classOf[LongWritable],
    classOf[Text], hconf).map(x => x._2.toString.trim).filter(x => x != "")
    .map(x => getRecord(x)).map(x => x.split(","))
    .map(x => record(x(0), x(2), x(2)))

  val sqlContext = new SQLContext(sc)
  val df = data.toDF()
  df.printSchema()
  df.show(false)

  def getRecord(in: String): String = {
    val ar = in.split("\n").mkString(",").split(",")
    val data = Array(ar(1), ar(3), ar(5))
    data.mkString(",")
  }
}

case class record(Id: String, Term: String, Rank: String)

Output:

 root
 |-- Id: string (nullable = true)
 |-- Term: string (nullable = true)
 |-- Rank: string (nullable = true)

+-----+----+----+
|Id   |Term|Rank|
+-----+----+----+
|19048|1   |1   |
|19049|5   |5   |
+-----+----+----+

Upvotes: 6

Raphael Roth
Raphael Roth

Reputation: 27373

Supposing you have your file on the "normal" filesystem (not HDFS), you have to write a file parser and then use sc.parallelize to create a RDD and then a DataFrame:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable

object Demo extends App {
  val conf = new SparkConf().setMaster("local[1]").setAppName("Demo")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._


  case class Record(
                     var id:Option[Int] = None,
                     var term:Option[String] = None,
                     var rank:Option[Int] = None)



  val filename = "data.dat"

  val records = readFile(filename)
  val df = sc.parallelize(records).toDF
  df.printSchema()
  df.show()



  def readFile(filename:String) : Seq[Record] = {
    import scala.io.Source

    val records = mutable.ArrayBuffer.empty[Record]
    var currentRecord: Record = null

    for (line <- Source.fromFile(filename).getLines) {
      val tokens = line.split(',')

      currentRecord = tokens match {
        case Array("REC") => Record()
        case Array("Id", id) => {
          currentRecord.id = Some(id.toInt); currentRecord
        }
        case Array("Term", term) => {
          currentRecord.term = Some(term); currentRecord
        }
        case Array("Rank", rank) => {
          currentRecord.rank = Some(rank.toInt); records += currentRecord;
          null
        }
      }
    }
    return records
  }
}

this gives

root
 |-- id: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- rank: integer (nullable = true)

+-----+----+----+
|   id|term|rank|
+-----+----+----+
|19048|milk|   1|
|19049|corn|   5|
+-----+----+----+

Upvotes: 1

Related Questions