Reputation: 3282
I have a text file that I read from and parse to create a dataframe. However, the columns amount
and code
should be IntegerTypes. Here's what I have:
def getSchema: StructType = {
StructType(Seq(
StructField("carrier", StringType, false),
StructField("amount", StringType, false),
StructField("currency", StringType, false),
StructField("country", StringType, false),
StructField("code", StringType, false),
))
}
def getRow(x: String): Row = {
val columnArray = new Array[String](5)
columnArray(0) = x.substring(40, 43)
columnArray(1) = x.substring(43, 46)
columnArray(2) = x.substring(46, 51)
columnArray(3) = x.substring(51, 56)
columnArray(4) = x.substring(56, 64)
Row.fromSeq(columnArray)
}
Because I have Array[String]
defined, the columns can only be StringTypes and not a variety of both String and Integer. To explain in detail my problem, here's what happens:
First I create an empty dataframe:
var df = spark.sqlContext.createDataFrame(spark.sparkContext.emptyRDD[Row], getSchema)
Then I have a for loop that goes through each file in all the directories. Note: I need to validate every file and cannot read all at once.
for (each file parse):
df2 = spark.sqlContext.createDataFrame(spark.sparkContext.textFile(inputPath)
.map(x => getRow(x)), schema)
df = df.union(df2)
I now have a complete dataframe of all the files. However, columns amount
and code
are StringTypes still. How can I make it so that they are IntegerTypes?
Please note: I cannot cast the columns during the for-loop process because it takes a lot of time. I'd like to keep the current structure I have as similar as possible. At the end of the for loop, I could cast the columns as IntegerTypes, however, what if the column contains a value that is not an Integer? I'd like for the columns to be not NULL.
Is there a way to make the 2 specified columns IntegerTypes without adding a lot of change to the code?
Upvotes: 1
Views: 69
Reputation: 21
What about using datasets?
First create a case class modelling your data:
case class MyObject(
carrier: String,
amount: Double,
currency: String,
country: String,
code: Int)
create an other case class wrapping the first one with additional infos (potential errors, source file):
case class MyObjectWrapper(
myObject: Option[MyObject],
someError: Option[String],
source: String
)
Then create a parser, transforming a line from your file in myObject:
object Parser {
def parse(line: String, file: String): MyObjectWrapper = {
Try {
MyObject(
carrier = line.substring(40, 43),
amount = line.substring(43, 46).toDouble,
currency = line.substring(46, 51),
country = line.substring(51, 56),
code = line.substring(56, 64).toInt)
} match {
case Success(objectParsed) => MyObjectWrapper(Some(objectParsed), None, file)
case Failure(error) => MyObjectWrapper(None, Some(error.getLocalizedMessage), file)
}
}
}
Finally, parse your files:
import org.apache.spark.sql.functions._
val ds = files
.filter( {METHOD TO SELECT CORRECT FILES})
.map( { GET INPUT PATH FROM FILES} )
.map(path => spark.read.textFile(_).map(Parser.parse(_, path))
.reduce(_.union(_))
This should give you a Dataset[MyObjectWrapper] with the types and APIs you wish.
Afterwards you can take those you could parse:
ds.filter(_.someError == None)
Or take those you failed to parse (for investigation):
ds.filter(_.someError != None)
Upvotes: 2