Reputation: 3179
I am using spark-sql-2.4.1 version. creating a broadcast variable as below
Broadcast<Map<String,Dataset>> bcVariable = javaSparkContext.broadcast(//read dataset);
Me passing the bcVariable to a function
Service.calculateFunction(sparkSession, bcVariable.getValue());
public static class Service {
public static calculateFunction(
SparkSession sparkSession,
Map<String, Dataset> dataSet ) {
System.out.println("---> size : " + dataSet.size()); //printing size 1
for( Entry<String, Dataset> aEntry : dataSet.entrySet() ) {
System.out.println( aEntry.getKey()); // printing key
aEntry.getValue().show() // throw null pointer exception
}
}
What is wrong here ? how to pass a dataset/dataframe in the function?
Try 2 :
Broadcast<Dataset> bcVariable = javaSparkContext.broadcast(//read dataset);
Me passing the bcVariable to a function
Service.calculateFunction(sparkSession, bcVariable.getValue());
public static class Service { public static calculateFunction( SparkSession sparkSession, Dataset dataSet ) {
System.out.println("---> size : " + dataSet.size()); // throwing null pointer exception.
}
What is wrong here ? how to pass a dataset/dataframe in the function?
Try 3 :
Dataset metaData = //read dataset from oracle table i.e. meta-data.
Me passing the metaData to a function
Service.calculateFunction(sparkSession, metaData );
public static class Service {
public static calculateFunction(
SparkSession sparkSession,
Dataset metaData ) {
System.out.println("---> size : " + metaData.size()); // throwing null pointer exception.
}
What is wrong here ? how to pass a dataset/dataframe in the function?
Upvotes: 0
Views: 4107
Reputation: 74669
The value to be broadcast has to be any Scala object but not a DataFrame
.
Service.calculateFunction(sparkSession, metaData)
is executed on executors and hence metaData is null
(as it was not serialized and sent over the wire from the driver to executors).
broadcast[T](value: T): Broadcast[T]
Broadcast a read-only variable to the cluster, returning a org.apache.spark.broadcast.Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.
Think of DataFrame
data abstraction to represent a distributed computation that is described in a SQL-like language (Dataset API or SQL). It simply does not make any sense to have it anywhere but on the driver where computations can be submitted for execution (as tasks on executors).
You simply have to "convert" the data this computation represents (in DataFrame
terms) using DataFrame.collect
.
Once you collected the data you can broadcast it and reference using .value
method.
The code could look as follows:
val dataset = // reading data
Broadcast<Map<String,Dataset>> bcVariable =
javaSparkContext.broadcast(dataset.collect);
Service.calculateFunction(sparkSession, bcVariable.getValue());
The only change compared to your code is collect
.
Upvotes: 4