ukbaz
ukbaz

Reputation: 547

how to convert RDD[(String, Any)] to Array(Row)?

I've got a unstructured RDD with keys and values. The values is of RDD[Any] and the keys are currently Strings, RDD[String] and mainly contain Maps. I would like to make them of type Row so I can make a dataframe eventually. Here is my rdd :

removed

Most of the rdd follows a pattern except for the last 4 keys, how should this be dealt with ? Perhaps split them into their own rdd, especially for reverseDeltas ?

Thanks

Edit

This is what I've tired so far based on the first answer below.

case class MyData(`type`: List[String], libVersion: Double, id: BigInt)

object MyDataBuilder{
    def apply(s: Any): MyData = {
      // read the input data and convert that to the case class

      s match {
        case Array(x: List[String], y: Double, z: BigInt) => MyData(x, y, z)
        case Array(a: BigInt, Array(x: List[String], y: Double, z: BigInt)) => MyData(x, y, z)
        case _ => null
      }
    }
  }

val parsedRdd: RDD[MyData] = rdd.map(x => MyDataBuilder(x))

how it doesn't see to match any of those cases, how can I match on Map in scala ? I keep getting nulls back when printing out parsedRdd

Upvotes: 1

Views: 2741

Answers (2)

AliSafari186
AliSafari186

Reputation: 113

there is a method for converting an rdd to dataframe use it like below

val rdd = sc.textFile("/pathtologfile/logfile.txt")
val df = rdd.toDF()

no you have dataframe do what ever you want on it using sql queries like below

val textFile = sc.textFile("hdfs://...")
// Creates a DataFrame having a single column named "line"
val df = textFile.toDF("line")
val errors = df.filter(col("line").like("%ERROR%"))
// Counts all the errors
errors.count()
// Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
// Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()

Upvotes: 0

Avishek Bhattacharya
Avishek Bhattacharya

Reputation: 6994

To convert the RDD to a dataframe you need to have fixed schema. If you define the schema for the RDD rest is simple.

something like

val rdd2:RDD[Array[String]] = rdd.map( x => getParsedRow(x)) 
val rddFinal:RDD[Row] = rdd2.map(x => Row.fromSeq(x))

Alternate

case class MyData(....) // all the fields of the Schema I want
object MyDataBuilder {
  def apply(s:Any):MyData ={
    // read the input data and convert that to the case class
  }
}

val rddFinal:RDD[MyData] = rdd.map(x => MyDataBuilder(x))
import spark.implicits._
val myDF = rddFinal.toDF

Upvotes: 1

Related Questions