Reputation: 925
When I try to write a Koalas DataFrame directly to a delta table using koalas.DataFrame.to_delta() locally I get the following Pyspark exception:
java.lang.ClassNotFoundException: Failed to find data source: delta
EDIT: ignore below, the problem does occur with a direct call to Pyspark as well.
If I convert the Koalas DataFrame to a Spark DataFrame and then write to delta, I seem to have no issues. Is there an underlying library that Koalas is not aware of but Pyspark is? Seems odd because I would think the same Pyspark modules are being used under the hood... I should note that the Koalas to_delta() method does seem to work on Databricks, which suggests to me my local setup is missing a Delta-related library.
Failing Koalas code:
kdf = ks.DataFrame({'eid': [1, 2, 3],
'contigName': ['chr1', 'chr2', 'chr3'],
'phen1': [0.123, 0.456, 0.789],
'phen2': [0.987, 0.654, 0.321]})
kdf.to_delta(path='tmp/test.delta', mode='overwrite')
EDIT: Not Working Koalas to Spark to Delta code:
kdf = ks.DataFrame({'eid': [1, 2, 3],
'contigName': ['chr1', 'chr2', 'chr3'],
'phen1': [0.123, 0.456, 0.789],
'phen2': [0.987, 0.654, 0.321]})
kdf.to_spark().write.format('delta').mode('overwrite')
Also, are there any differences to be aware of between how Koalas and Spark save to a delta table? I have a rather large delta table which has up until now been written to using Koalas (on Databricks), but I might switch to spark.write to make local testing easier. Before I do I want to be sure that the results of both methods are the same (I'll do some testing to confirm that, just curious if anyone has any other notes about switching write strategies for an existing delta table).
EDIT: Okay I guess Pyspark wasn't actually saving the delta table either, I forgot to add .save() onto the Pyspark .write call like a dumbdumb. So I guess my real question now is how to include the Delta library / jar when running Pyspark locally, particularly when I'm running unit tests in Pycharm?
Upvotes: 6
Views: 17535
Reputation: 87154
You just need to follow documentation. For interactive pyspark it could be:
pyspark --packages io.delta:delta-core_2.12:1.0.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
or using the code (install package first with pip install delta-spark
):
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Please note, that different versions of Delta have different requirements for the Spark version - check compatibility table for your Spark version.
Upvotes: 11