Reputation: 3209
I am using spark 2.4.1 version and java 8.
I have scenario like:
Something like the below:
val classifiers = Seq("classifierOne","classifierTwo","classifierThree");
for( classifier : classifiers ){
// read from CassandraDB table
val acutalData = spark.read(.....).where(<classifier conditition>)
// the data varies depend on the classifier passed in
// this data has many fields along with fieldOne, fieldTwo and fieldThree
Depend on the classifier I need to filter the data. Currently I am doing it as below:
if(classifier.===("classifierOne")) {
val classifierOneDs = acutalData.filter(col("classifierOne").notEqual(lit("")).or(col("classifierOne").isNotNull()));
writeToParquet(classifierOneDs);
} else if(classifier.===("classifierTwo")) {
val classifierTwoDs = acutalData.filter(col("classifierTwo").notEqual(lit("")).or(col("classifierTwo").isNotNull()));
writeToParquet(classifierOneDs);
} else (classifier.===("classifierThree")) {
val classifierThreeDs = acutalData.filter(col("classifierThree").notEqual(lit("")).or(col("classifierThree").isNotNull()));
writeToParquet(classifierOneDs);
}
Is there any way to avoid the if
-else
block here?
Any other way to do/achieve the same in spark distrubated way?
Upvotes: 1
Views: 5303
Reputation: 74779
Your question seems more about how to structure your application than Spark itself. There are two parts really.
Is there any way to avoid the if-else block here?
"Avoid"? In what sense? Spark can't magically "discover" your way of doing distributed processing. You should help Spark a bit.
For this case I'd propose a lookup table with all possible filter conditions and their names to look up by, e.g.
val classifiers = Map(
"classifierOne" -> col("classifierOne").notEqual(lit("")).or(col("classifierOne").isNotNull()),
"classifierTwo" -> ...,
"classifierThree" -> ...)
In order to use it you simply iterate over all the classifiers (or look up as many as needed), e.g.
val queries = classifiers.map { case (name, cond) =>
spark
.read(.....)
.where(cond)
.filter(col(name).notEqual(lit("")).or(col(name).isNotNull()))
}
queries
is a collection of DataFrames to be writeToParquet
and it's up to you how to make the queries executed in parallel (Spark will take care of doing it in distributed way). Use Scala Futures or another parallel API.
I think the following could make it just fine:
queries.par.foreach(writeToParquet)
With queries.par.foreach
you essentially execute all writeToParquet
in parallel. Since writeToParquet
executes a DataFrame action to writing in parquet format that follows all the rules of Spark for any other action. It will run a Spark job (perhaps even more than one) and the job is executed in distributed fashion using Spark machinery.
Think of queries.par
as a way to execute the queries one by one without waiting for earlier queries to finish to start a new one. You are strongly recommended to configure FAIR scheduling mode:
Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads.
Under fair sharing, Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share of cluster resources.
Upvotes: 1
Reputation: 1859
So, you need to select, what column to check, based on classifier name, that will be passed as a list?
val classifiers = Seq("classifierOne","classifierTwo","classifierThree");
for(classifier : classifiers) {
val acutalData = spark.read(.....).where(<classifier conditition>)
val classifierDs = acutalData.filter(col(classifier).notEqual(lit("")).or(col(classifier).isNotNull()));
writeToParquet(classifierDs);
}
As you're iterating through list, you would be going through all the classifiers anyway.
If column name can be different from actual classifier name, you can make it List[Classifier]
, where Classifier
is something like
case class Classifier(colName: String, classifierName: String)
Upvotes: 1