Reputation: 111
I have text file with String REC as the record delimiter and line break as the column delimiter, and every data has column name attached to it with comma as delimiter, below is the sample data format
REC
Id,19048
Term,milk
Rank,1
REC
Id,19049
Term,corn
Rank,5
Used REC as the record delimiter.Now, i want to create the spark data frame with column names ID, Term and Rank.Please Assist me on this.
Upvotes: 3
Views: 4050
Reputation: 1409
here is working code
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}
object RecordSeparator extends App {
var conf = new
SparkConf().setAppName("test").setMaster("local[1]")
.setExecutorEnv("executor- cores", "2")
var sc = new SparkContext(conf)
val hconf = new Configuration
hconf.set("textinputformat.record.delimiter", "REC")
val data = sc.newAPIHadoopFile("data.txt",
classOf[TextInputFormat], classOf[LongWritable],
classOf[Text], hconf).map(x => x._2.toString.trim).filter(x => x != "")
.map(x => getRecord(x)).map(x => x.split(","))
.map(x => record(x(0), x(2), x(2)))
val sqlContext = new SQLContext(sc)
val df = data.toDF()
df.printSchema()
df.show(false)
def getRecord(in: String): String = {
val ar = in.split("\n").mkString(",").split(",")
val data = Array(ar(1), ar(3), ar(5))
data.mkString(",")
}
}
case class record(Id: String, Term: String, Rank: String)
Output:
root
|-- Id: string (nullable = true)
|-- Term: string (nullable = true)
|-- Rank: string (nullable = true)
+-----+----+----+
|Id |Term|Rank|
+-----+----+----+
|19048|1 |1 |
|19049|5 |5 |
+-----+----+----+
Upvotes: 6
Reputation: 27373
Supposing you have your file on the "normal" filesystem (not HDFS), you have to write a file parser and then use sc.parallelize
to create a RDD
and then a DataFrame
:
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Demo extends App {
val conf = new SparkConf().setMaster("local[1]").setAppName("Demo")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
case class Record(
var id:Option[Int] = None,
var term:Option[String] = None,
var rank:Option[Int] = None)
val filename = "data.dat"
val records = readFile(filename)
val df = sc.parallelize(records).toDF
df.printSchema()
df.show()
def readFile(filename:String) : Seq[Record] = {
import scala.io.Source
val records = mutable.ArrayBuffer.empty[Record]
var currentRecord: Record = null
for (line <- Source.fromFile(filename).getLines) {
val tokens = line.split(',')
currentRecord = tokens match {
case Array("REC") => Record()
case Array("Id", id) => {
currentRecord.id = Some(id.toInt); currentRecord
}
case Array("Term", term) => {
currentRecord.term = Some(term); currentRecord
}
case Array("Rank", rank) => {
currentRecord.rank = Some(rank.toInt); records += currentRecord;
null
}
}
}
return records
}
}
this gives
root
|-- id: integer (nullable = true)
|-- term: string (nullable = true)
|-- rank: integer (nullable = true)
+-----+----+----+
| id|term|rank|
+-----+----+----+
|19048|milk| 1|
|19049|corn| 5|
+-----+----+----+
Upvotes: 1