Allen Wang
Allen Wang

Reputation: 2502

Create Spark DataFrame from Pandas DataFrames inside RDD

I'm trying to convert a Pandas DataFrame on each worker node (an RDD where each element is a Pandas DataFrame) into a Spark DataFrame across all worker nodes.

Example:

def read_file_and_process_with_pandas(filename):
    data = pd.read(filename)
    """
    some additional operations using pandas functionality
    here the data is a pandas dataframe, and I am using some datetime
    indexing which isn't available for spark dataframes
    """
    return data

filelist = ['file1.csv','file2.csv','file3.csv']
rdd = sc.parallelize(filelist)
rdd = rdd.map(read_file_and_process_with_pandas)

The previous operations work, so I have an RDD of Pandas DataFrames. How can I convert this then into a Spark DataFrame after I'm done with the Pandas processing?

I tried doing rdd = rdd.map(spark.createDataFrame), but when I do something like rdd.take(5), i get the following error:

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o103.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

Is there a way to convert Pandas DataFrames in each worker node into a distributed DataFrame?

Upvotes: 1

Views: 1428

Answers (2)

Jether
Jether

Reputation: 730

See this question: https://stackoverflow.com/a/51231046/7964197

I've had to deal with the same problem, which seems quite common (reading many files using pandas, e.g. excel/pickle/any other non-spark format, and converting the resulting RDD into a spark dataframe)

The supplied code adds a new method on the SparkSession that uses pyarrow to convert the pd.DataFrame objects into arrow record batches which are then directly converted to a pyspark.DataFrame object

spark_df = spark.createFromPandasDataframesRDD(prdd) # prdd is an RDD of pd.DataFrame objects

For large amounts of data, this is orders of magnitude faster than converting to an RDD of Row() objects.

Upvotes: 3

howie
howie

Reputation: 2685

Pandas dataframes can not direct convert to rdd. You can create a Spark DataFrame from Pandas

spark_df = context.createDataFrame(pandas_df)

Reference: Introducing DataFrames in Apache Spark for Large Scale Data Science

Upvotes: 2

Related Questions