Psychotechnopath
Psychotechnopath

Reputation: 2744

Converting a Pandas Dataframe back to Spark DataFrame after first converting other way around

We have a lakehouse architecture with Hive metastore tables. I want to do processing on this data, so I select my data as Spark dataframe.

The specific processing step I wanted to achieve was parsing date columns in my Spark Dataframes that come in a rather strange format: /Date(1582959943313)/, where the number inside /Date(xx)/ is miliseconds since epoch. I thought I was being clever by converting my Spark DF toPandas() and then process the dates:

df_accounts = spark.sql("SELECT * FROM database.accounts")
df_accounts_pandas = df_accounts.toPandas()
df_accounts_pandas['ControlledDate'] = df_accounts_pandas['ControlledDate'].str.slice(start=6, stop=-2)
df_accounts_pandas['Created'] = df_accounts_pandas['Created'].str.slice(start=6, stop=-2)
df_accounts_pandas['ControlledDate'] = pd.to_datetime(df_accounts_pandas['ControlledDate'], unit='ms', origin='unix')
df_accounts_pandas['Created'] = pd.to_datetime(df_accounts_pandas['Created'], unit='ms', origin='unix')

This works fine. Now the next step would be to convert the df back to a Spark Dataframe, and be done with it.

df_accounts = spark.createDataframe(df_accounts_pandas)

This throws a ValueError: Some of types cannot be determined after inferring

This SO Question tells me to either manually define a schema or drop Null columns. But why do I have to make this choice? I don't understand why I was able to do the conversion the other way around, but now cannot convert back.

Are there any other workarounds? My tables have 100's of columns so I don't want to manually define a schema. I don't know if columns that are NULL now will be NULL in the future.

P.S. - Am rather new to spark ecosystem - Is it even a good idea to do processing in pandas like this? (I would like to since it has more options than regular PySpark) Or are there better ways to use pandas functionality on Spark dataframes?

Upvotes: 0

Views: 2245

Answers (2)

Arjun
Arjun

Reputation: 325

Since you have generated new columns, you can specify the new schema and convert a pandas df back to spark df.

from pyspark.sql.types import *
accounts_new_schema = StructType([ StructField("col1", LongType(), True)\
                       ,StructField("col2", IntegerType(), True)\
                       ,StructField("col3", IntegerType(), True)\
                       ,StructField("col4", IntegerType(), True)\
                       ,StructField("col5", StringType(), True)\
                       ,StructField("col6", StringType(), True)\
                       ,StructField("col7", IntegerType(), True)\
                       ,StructField("col8", IntegerType(), True)\])

spdf = spark.createDataFrame(df_accounts_pandas,schema=accounts_new_schema)

Upvotes: 1

Azhar Khan
Azhar Khan

Reputation: 4098

We had this requirement to transform data back and forth between spark and pandas, and we achieved it by serialising to parquet files.

We chose this path because toPandas() kept crashing and spark.createDataframe() had schema mapping issues as you are facing.

For a dataset of size (1M, 300) spark write took about an hour; but rest of the operations were quicker.

Spark DF to Pandas DF:

# sdf is spark dataframe. Serialise to parquet
sdf.repartition(1).write.mode("overwrite").parquet("dbfs:/file/path")

# import in Pandas
pdf = pd.read_parquet("/dbfs/file/path/part-0000xxx.parquet")

Pandas DF to Spark DF:

# pdf is pandas dataframe. Serialize to parquet
pdf.to_parquet("/dbfs/file/path/df.parquet")

# import in Spark
sdf = spark.read.parquet("dbfs:/file/path")

Upvotes: 2

Related Questions