user2896120
user2896120

Reputation: 3282

Creating column types for schema

I have a text file that I read from and parse to create a dataframe. However, the columns amount and code should be IntegerTypes. Here's what I have:

def getSchema: StructType = {
        StructType(Seq(
          StructField("carrier", StringType, false),
          StructField("amount", StringType, false),
          StructField("currency", StringType, false),
          StructField("country", StringType, false),
          StructField("code", StringType, false),
        ))
      }

  def getRow(x: String): Row = {
    val columnArray = new Array[String](5)
    columnArray(0) = x.substring(40, 43)
    columnArray(1) = x.substring(43, 46)
    columnArray(2) = x.substring(46, 51)
    columnArray(3) = x.substring(51, 56)
    columnArray(4) = x.substring(56, 64)
    Row.fromSeq(columnArray)
  }

Because I have Array[String] defined, the columns can only be StringTypes and not a variety of both String and Integer. To explain in detail my problem, here's what happens:

First I create an empty dataframe:

  var df = spark.sqlContext.createDataFrame(spark.sparkContext.emptyRDD[Row], getSchema)

Then I have a for loop that goes through each file in all the directories. Note: I need to validate every file and cannot read all at once.

for (each file parse):
  df2  = spark.sqlContext.createDataFrame(spark.sparkContext.textFile(inputPath)
    .map(x => getRow(x)), schema)
df = df.union(df2)

I now have a complete dataframe of all the files. However, columns amount and code are StringTypes still. How can I make it so that they are IntegerTypes?

Please note: I cannot cast the columns during the for-loop process because it takes a lot of time. I'd like to keep the current structure I have as similar as possible. At the end of the for loop, I could cast the columns as IntegerTypes, however, what if the column contains a value that is not an Integer? I'd like for the columns to be not NULL.

Is there a way to make the 2 specified columns IntegerTypes without adding a lot of change to the code?

Upvotes: 1

Views: 69

Answers (1)

Haas-Frangi Christian
Haas-Frangi Christian

Reputation: 21

What about using datasets?

First create a case class modelling your data:

case class MyObject(
    carrier: String,
    amount: Double,
    currency: String,
    country: String,
    code: Int)

create an other case class wrapping the first one with additional infos (potential errors, source file):

case class MyObjectWrapper(
                      myObject: Option[MyObject],
                      someError: Option[String],
                      source: String
                      )

Then create a parser, transforming a line from your file in myObject:

object Parser {
  def parse(line: String, file: String): MyObjectWrapper = {
    Try {
      MyObject(
        carrier = line.substring(40, 43),
        amount = line.substring(43, 46).toDouble,
        currency = line.substring(46, 51),
        country = line.substring(51, 56),
        code = line.substring(56, 64).toInt)
    } match {
      case Success(objectParsed) => MyObjectWrapper(Some(objectParsed), None, file)
      case Failure(error) => MyObjectWrapper(None, Some(error.getLocalizedMessage), file)
    }
  }
}

Finally, parse your files:

import org.apache.spark.sql.functions._
val ds = files
  .filter( {METHOD TO SELECT CORRECT FILES})
  .map( { GET INPUT PATH FROM FILES} )
  .map(path => spark.read.textFile(_).map(Parser.parse(_, path))
  .reduce(_.union(_))

This should give you a Dataset[MyObjectWrapper] with the types and APIs you wish.

Afterwards you can take those you could parse:

ds.filter(_.someError == None)

Or take those you failed to parse (for investigation):

ds.filter(_.someError != None)

Upvotes: 2

Related Questions