Buzz97
Buzz97

Reputation: 53

How to create a dataframe from Array[Strings]?

I used rdd.collect() to create an Array and now I want to use this Array[Strings] to create a DataFrame. My test file is in the following format(separated by a pipe |).

TimeStamp
IdC
Name
FileName
Start-0f-fields
column01  
column02 
column03 
column04 
column05 
column06 
column07 
column08 
column010 
column11
End-of-fields
Start-of-data 
G0002B|0|13|IS|LS|Xys|Xyz|12|23|48|  
G0002A|0|13|IS|LS|Xys|Xyz|12|23|45|  
G0002x|0|13|IS|LS|Xys|Xyz|12|23|48|  
G0002C|0|13|IS|LS|Xys|Xyz|12|23|48|
End-of-data
document  

the column name are in between Start-of-field and End-of-Field. I want to store "| " pipe separated in different columns of Dataframe.

like below example:

column01  column02 column03 column04 column05 column06 column07 column08 column010 column11
G0002C      0        13       IS       LS       Xys      Xyz     12        23         48
G0002x      0        13       LS       MS       Xys      Xyz     14        300        400

my code :

    val rdd = sc.textFile("the above text file")
    
    val columns = rdd.collect.slice(5,16).mkString(",") //  it will hold columnnames

    val data = rdd.collect.slice(5,16)
    val rdd1 = sc.parallelize(rdd.collect())
    val df = rdd1.toDf(columns)

but this is not giving me the above desired dataframe

Upvotes: 1

Views: 348

Answers (2)

Nikunj Kakadiya
Nikunj Kakadiya

Reputation: 3008

If the number of columns and the name of the column are fixed then you can do that as below :

val columns = rdd.collect.slice(5,15).mkString(",") //  it will hold columnnames
val data = rdd.collect.slice(17,21)
val d = data.mkString("\n").split('\n').toSeq.toDF()
import org.apache.spark.sql.functions._
val dd = d.withColumn("columnX",split($"value","\\|")).withColumn("column1",$"columnx".getItem(0)).withColumn("column2",$"columnx".getItem(1)).withColumn("column3",$"columnx".getItem(2)).withColumn("column4",$"columnx".getItem(3)).withColumn("column5",$"columnx".getItem(4)).withColumn("column6",$"columnx".getItem(5)).withColumn("column8",$"columnx".getItem(7)).withColumn("column10",$"columnx".getItem(8)).withColumn("column11",$"columnx".getItem(9)).drop("columnX","value")
display(dd)

you can see the output as below:enter image description here

Upvotes: 0

Insung Park
Insung Park

Reputation: 439

Could you try this?

import spark.implicits._ // Add to use `toDS()` and `toDF()`

val rdd = sc.textFile("the above text file")
    
val columns = rdd.collect.slice(5,16) // `.mkString(",")` is not needed

val dataDS = rdd.collect.slice(5,16)
  .map(_.trim())                           // to remove whitespaces
  .map(s => s.substring(0, s.length - 1))  // to remove last pipe '|'
  .toSeq
  .toDS

val df = spark.read
  .option("header", false)
  .option("delimiter", "|")
  .csv(dataDS)
  .toDF(columns: _*)

df.show(false)
+--------+--------+--------+--------+--------+--------+--------+--------+---------+--------+
|column01|column02|column03|column04|column05|column06|column07|column08|column010|column11|
+--------+--------+--------+--------+--------+--------+--------+--------+---------+--------+
|G0002B  |0       |13      |IS      |LS      |Xys     |Xyz     |12      |23       |48      |
|G0002A  |0       |13      |IS      |LS      |Xys     |Xyz     |12      |23       |45      |
|G0002x  |0       |13      |IS      |LS      |Xys     |Xyz     |12      |23       |48      |
|G0002C  |0       |13      |IS      |LS      |Xys     |Xyz     |12      |23       |48      |
+--------+--------+--------+--------+--------+--------+--------+--------+---------+--------+

Calling spark.read...csv() method without schema, can take a long time with huge data, because of schema inferences(e,g. Additional reading).

On that case, you can specify schema like below.

/*
  column01 STRING,
  column02 STRING,
  column03 STRING,

  ...
*/
val schema = columns
  .map(c => s"$c STRING")
  .mkString(",\n")


val df = spark.read
  .option("header", false)
  .option("delimiter", "|")
  .schema(schema)  // schema inferences not occurred
  .csv(dataDS)
// .toDF(columns: _*) => unnecessary when schema is specified

Upvotes: 1

Related Questions