BdEngineer
BdEngineer

Reputation: 3179

How to broadcast a DataFrame?

I am using spark-sql-2.4.1 version. creating a broadcast variable as below

Broadcast<Map<String,Dataset>> bcVariable = javaSparkContext.broadcast(//read dataset);

Me passing the bcVariable to a function

Service.calculateFunction(sparkSession, bcVariable.getValue());


public   static class Service {
        public static calculateFunction(
          SparkSession sparkSession,
          Map<String, Dataset> dataSet ) {

        System.out.println("---> size : " + dataSet.size());  //printing size 1


        for( Entry<String, Dataset> aEntry : dataSet.entrySet() ) {
           System.out.println( aEntry.getKey());   //  printing key 
            aEntry.getValue().show()   // throw null pointer exception
           }
    }

What is wrong here ? how to pass a dataset/dataframe in the function?

Try 2 :

Broadcast<Dataset> bcVariable = javaSparkContext.broadcast(//read dataset);

Me passing the bcVariable to a function

 Service.calculateFunction(sparkSession, bcVariable.getValue());

public static class Service { public static calculateFunction( SparkSession sparkSession, Dataset dataSet ) {

    System.out.println("---> size : " + dataSet.size());  // throwing null pointer exception.



}

What is wrong here ? how to pass a dataset/dataframe in the function?

Try 3 :

Dataset metaData = //read dataset from oracle table i.e. meta-data.

Me passing the metaData to a function

Service.calculateFunction(sparkSession, metaData );

public   static class Service {
        public static calculateFunction(
          SparkSession sparkSession,
          Dataset metaData ) {

        System.out.println("---> size : " + metaData.size());  // throwing null pointer exception.



    }

What is wrong here ? how to pass a dataset/dataframe in the function?

Upvotes: 0

Views: 4107

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74669

The value to be broadcast has to be any Scala object but not a DataFrame.

Service.calculateFunction(sparkSession, metaData) is executed on executors and hence metaData is null (as it was not serialized and sent over the wire from the driver to executors).

broadcast[T](value: T): Broadcast[T]

Broadcast a read-only variable to the cluster, returning a org.apache.spark.broadcast.Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.

Think of DataFrame data abstraction to represent a distributed computation that is described in a SQL-like language (Dataset API or SQL). It simply does not make any sense to have it anywhere but on the driver where computations can be submitted for execution (as tasks on executors).

You simply have to "convert" the data this computation represents (in DataFrame terms) using DataFrame.collect.

Once you collected the data you can broadcast it and reference using .value method.


The code could look as follows:

val dataset = // reading data
Broadcast<Map<String,Dataset>> bcVariable = 
  javaSparkContext.broadcast(dataset.collect);
Service.calculateFunction(sparkSession, bcVariable.getValue());

The only change compared to your code is collect.

Upvotes: 4

Related Questions