Jacek Laskowski
Jacek Laskowski

Reputation: 74619

How to CREATE TABLE USING delta with Spark 2.4.4?

This is Spark 2.4.4 and Delta Lake 0.5.0.

I'm trying to create a table using delta data source and seems I'm missing something. Although the CREATE TABLE USING delta command worked fine neither the table directory is created nor insertInto works.

The following CREATE TABLE USING delta worked fine, but insertInto failed.

scala> sql("""
create table t5
USING delta
LOCATION '/tmp/delta'
""").show

scala> spark.catalog.listTables.where('name === "t5").show
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
|  t5| default|       null| EXTERNAL|      false|
+----+--------+-----------+---------+-----------+

scala> spark.range(5).write.option("mergeSchema", true).insertInto("t5")
org.apache.spark.sql.AnalysisException: `default`.`t5` requires that the data to be inserted have the same number of columns as the target table: target table has 0 column(s) but the inserted data has 1 column(s), including 0 partition column(s) having constant value(s).;
  at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion.org$apache$spark$sql$execution$datasources$PreprocessTableInsertion$$preprocess(rules.scala:341)
  ...

I thought I'd create with columns defined, but that didn't work either.

scala> sql("""
create table t6
(id LONG, name STRING)
USING delta
LOCATION '/tmp/delta'
""").show
org.apache.spark.sql.AnalysisException: delta does not allow user-specified schemas.;
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
  at org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:78)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:194)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3370)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
  ... 54 elided

Upvotes: 12

Views: 33678

Answers (3)

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

tl;dr CREATE TABLE USING delta is not supported by Spark before 3.0.0 and Delta Lake before 0.7.0.


Delta Lake 0.7.0 with Spark 3.0.0 (both just released) do support CREATE TABLE SQL command.

Be sure to "install" Delta SQL using spark.sql.catalog.spark_catalog configuration property with org.apache.spark.sql.delta.catalog.DeltaCatalog.

$ ./bin/spark-submit \
  --packages io.delta:delta-core_2.12:0.7.0 \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

scala> spark.version
res0: String = 3.0.0

scala> sql("CREATE TABLE delta_101 (id LONG) USING delta").show
++
||
++
++

scala> spark.table("delta_101").show
+---+
| id|
+---+
+---+

scala> sql("DESCRIBE EXTENDED delta_101").show(truncate = false)
+----------------------------+---------------------------------------------------------+-------+
|col_name                    |data_type                                                |comment|
+----------------------------+---------------------------------------------------------+-------+
|id                          |bigint                                                   |       |
|                            |                                                         |       |
|# Partitioning              |                                                         |       |
|Not partitioned             |                                                         |       |
|                            |                                                         |       |
|# Detailed Table Information|                                                         |       |
|Name                        |default.delta_101                                        |       |
|Location                    |file:/Users/jacek/dev/oss/spark/spark-warehouse/delta_101|       |
|Provider                    |delta                                                    |       |
|Table Properties            |[]                                                       |       |
+----------------------------+---------------------------------------------------------+-------+

Upvotes: 4

An example with pyspark 3.0.0 & delta 0.7.0

print(f"LOCATION '{location}")
spark.sql(f"""
CREATE OR REPLACE TABLE  {TABLE_NAME} (
  CD_DEVICE INT, 
  FC_LOCAL_TIME TIMESTAMP,  
  CD_TYPE_DEVICE STRING,
  CONSUMTION DOUBLE,
  YEAR INT,
  MONTH INT, 
  DAY INT )
USING DELTA
PARTITIONED BY (YEAR , MONTH , DAY, FC_LOCAL_TIME)
LOCATION '{location}'
""")

Where "location" is a dir HDFS for spark cluster mode save de delta table.

Upvotes: 8

Wes
Wes

Reputation: 658

The OSS version of Delta does not have the SQL Create Table syntax as of yet. This will be implemented the future versions using Spark 3.0.

To create a Delta table, you must write out a DataFrame in Delta format. An example in Python being

df.write.format("delta").save("/some/data/path")

Here's a link to the create table documentation for Python, Scala, and Java.

Upvotes: 7

Related Questions