Raptor0009
Raptor0009

Reputation: 268

How to use Delta Lake with spark-shell?

I'm trying to write as Spark DF as a DeltaTable. It's working fine in my IDE Intelliji , But with the same dependencies and versions it's not working in my spark REPL(Spark shell)

Spark Version :2.4.0 Scala Version :2.11.8

Dependencies in Intelliji (Dependencies for whole project , Kindly ignore relevant)

    compile 'org.scala-lang:scala-library:2.11.8'
    compile 'org.scala-lang:scala-reflect:2.11.8'
    compile 'org.scala-lang:scala-compiler:2.11.8'
    compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.1.2'
    compile 'org.scala-lang.modules:scala-swing_2.11:2.0.3'
    compile 'org.apache.spark:spark-mllib_2.11:2.4.0'
    compile 'org.apache.spark:spark-sql_2.11:2.4.0'
    compile 'org.apache.spark:spark-graphx_2.11:2.4.0'
    compile 'org.apache.spark:spark-launcher_2.11:2.4.0'
    compile 'org.apache.spark:spark-catalyst_2.11:2.4.0'
    compile 'org.apache.spark:spark-streaming_2.11:2.4.0'
    compile group: 'io.delta', name: 'delta-core_2.11', version: '0.5.0'
    compile 'org.apache.spark:spark-core_2.11:2.4.0'
    compile 'org.apache.spark:spark-hive_2.11:2.4.0'
    compile 'com.databricks:spark-avro_2.11:4.0.0'
    compile 'org.apache.avro:avro-mapred:1.8.2'
    compile 'org.apache.avro:avro:1.8.2'
    compile 'org.apache.avro:avro-compiler:1.8.2'
    compile group: 'mysql', name: 'mysql-connector-java', version: '8.0.15'
    compile group: 'commons-io', name: 'commons-io', version: '2.5'
    testCompile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.26'
    testCompile group: 'junit', name: 'junit', version: '4.12'
    testCompile group: 'org.scalatest', name: 'scalatest_2.12', version: '3.2.0-SNAP10'
    compile group: 'javax.mail', name: 'javax.mail-api', version: '1.6.2'
    compile group: 'com.sun.mail' ,name: 'javax.mail', version: '1.6.0'
    compile 'com.hortonworks:shc-core:1.1.1-2.1-s_2.11'
    compile 'com.hortonworks:shc:1.1.1-2.1-s_2.11'
    compile group: 'org.apache.hbase', name: 'hbase-client', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase-server', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase-common', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase', version: '1.2.5', ext: 'pom'
    compile group: 'org.apache.hbase', name: 'hbase-protocol', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase-hadoop2-compat', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase-annotations', version: '1.2.5'

    // jackson modues
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.8.6'
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.10.0'
    compile group: 'org.codehaus.jackson', name: 'jackson-core-asl', version: '1.9.13'
    compile group: 'org.codehaus.jackson', name: 'jackson-mapper-asl', version: '1.9.13'
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.8.7'
    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.8.6'
    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.8.6'
    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jaxb-annotations', version: '2.8.6'
    compile group: 'org.json4s', name: 'json4s-jackson_2.11', version: '3.2.10'
    compile group: 'com.twitter', name: 'parquet-jackson', version: '1.6.0'
    compile group: 'org.codehaus.jackson', name: 'jackson-jaxrs', version: '1.9.13'
    compile group: 'org.codehaus.jackson', name: 'jackson-xc', version: '1.9.13'
    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-paranamer', version: '2.8.6'
    compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-annotations', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-auth', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-yarn-common', version: '2.7.3'

Piece of code which i'm trying to execute

import io.delta._

val dF=spark.read.load("path") //parquet file
dF.write.format("delta").mode("overwrite").partitionBy("topic","partition","key").save("path") // delta table

spark-shell Command used:

spark-shell --packages com.fasterxml.jackson.core:jackson-databind:2.8.6,com.fasterxml.jackson.core:jackson-core:2.10.0,org.codehaus.jackson:jackson-core-asl:1.9.13,org.codehaus.jackson:jackson-mapper-asl:1.9.13,com.fasterxml.jackson.core:jackson-annotations:2.8.7,com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.8.6,com.fasterxml.jackson.module:jackson-module-scala_2.11:2.8.6,com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.8.6,com.twitter:parquet-jackson:1.6.0,org.codehaus.jackson:jackson-jaxrs:1.9.13,org.codehaus.jackson:jackson-xc:1.9.13,com.fasterxml.jackson.module:jackson-module-paranamer:2.8.6,io.delta:delta-core_2.11:0.5.0,commons-io:commons-io:2.5

Error in REPL:

Exception in thread "main" java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse$default$3()Z
    at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:127)
    at org.apache.spark.sql.delta.actions.Metadata$$anonfun$schema$1.apply(actions.scala:202)
    at org.apache.spark.sql.delta.actions.Metadata$$anonfun$schema$1.apply(actions.scala:201)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.sql.delta.actions.Metadata.schema$lzycompute(actions.scala:201)
    at org.apache.spark.sql.delta.actions.Metadata.schema(actions.scala:200)
    at org.apache.spark.sql.delta.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:61)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:45)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:85)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:65)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:396)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:133)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
    at org.controller.deltaLakeEG.deltaLakeHadoopEg$.main(deltaLakeHadoopEg.scala:29)
    at org.controller.deltaLakeEG.deltaLakeHadoopEg.main(deltaLakeHadoopEg.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Upvotes: 3

Views: 12260

Answers (4)

Jacek Laskowski
Jacek Laskowski

Reputation: 74669

As per the official documentation:

Delta Lake requires Apache Spark version 2.4.2 or above

Please upgrade your Spark version to at least 2.4.2 in IntelliJ IDEA (or issues show up). The latest version as of this writing is 3.1.1, but that's not supported yet (April, 7th):

We have upgraded Spark to 3.1.1 in master branch. We are still working on some items before doing a release.

As per the official documentation:

Run spark-shell with the Delta Lake package:

bin/spark-shell --packages io.delta:delta-core_2.11:0.8.0

From myself, use --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension to enable Delta Lake's SQL commands, e.g. DESCRIBE DETAIL, GENERATE.

The entire command to run spark-shell with Delta Lake 0.8.0 should be as follows:

./bin/spark-shell \
  --packages io.delta:delta-core_2.12:0.8.0 \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

Upvotes: 6

Galuoises
Galuoises

Reputation: 3283

Please check here: https://docs.delta.io/0.8.0/quick-start.html

To run the spark-shell use the command

spark-shell --packages io.delta:delta-core_2.12:0.8.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Upvotes: 0

Tauheed Ahmad
Tauheed Ahmad

Reputation: 11

bin/spark-shell --packages io.delta:delta-core_2.11:0.6.1

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath("/tmp/delta-table")

Upvotes: 1

Charlie Flowers
Charlie Flowers

Reputation: 1380

Spark itself has a dependency on Jackson, and the version you're instructing spark-shell to use is incompatible. Per https://github.com/apache/spark/blob/v2.4.0/pom.xml, 2.4.0 uses Jackson 2.6.7. Is there a particular reason that you need Jackson 2.10 in this case?

Upvotes: 0

Related Questions