Reputation: 1197
I tried to write dataframe to vertica using the following documentation :https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingtoVerticaUsingDefaultSource.htm?tocpath=Integrating%20with%20Apache%20Spark%7CSaving%20an%20Apache%20Spark%20DataFrame%20to%20a%20Vertica%20Table%7C_____1 provide by vertica and it worked. The dataframe gets written into the table after loading with the desired libraries.
Now when I tried to do the same exact code in Intellij or without writing the code from directly the spark shell, there are some errors with it :
The code is :
val rows: RDD[Row] = sc.parallelize(Array(
Row(1,"hello", true),
Row(2,"goodbye", false)
))
val schema = StructType(Array(
StructField("id",IntegerType, false),
StructField("sings",StringType,true),
StructField("still_here",BooleanType,true)
))
val spark = SparkSession.builder().config(conf).getOrCreate()
val df = spark.createDataFrame(rows, schema) // Spark 2.0// View the sample data and schema
df.show
df.schema// Setup the user options, defaults are shown where applicable for optional values.
// Replace the values in italics with the settings for your Vertica instance.
val opts: Map[String, String] = Map(
"table" -> "signs",
"db" -> "dbadmin",
"user" -> "dbadmin",
"password" -> "password",
"host" -> "localhost",
"hdfs_url" -> "hdfs://localhost:9000/user",
"web_hdfs_url" -> "webhdfs://localhost:9870/user",
// "failed_rows_percent_tolerance"-> "0.00" // OPTIONAL (default val shown)
"dbschema" -> "public" // OPTIONAL (default val shown)
// "port" -> "5433" // OPTIONAL (default val shown)
// "strlen" -> "1024" // OPTIONAL (default val shown)
// "fileformat" -> "orc" // OPTIONAL (default val shown)
)// SaveMode can be either Overwrite, Append, ErrorIfExists, Ignore
val mode = SaveMode.Append
df
.write
.format("com.vertica.spark.datasource.DefaultSource")
.options(opts)
.mode(mode)
.save()
This is the same as provided in the documentation. ANd this error comes.
I have set up my hdfs and vertica.
The question is if it is working as expected from the spark shell why is it not working outside from it ?
20/04/27 01:55:50 INFO S2V: Load by name. Column list: ("name","miles_per_gallon","cylinders","displacement","horsepower","weight_in_lbs","acceleration","year","origin")
20/04/27 01:55:50 INFO S2V: Writing intermediate data files to path: hdfs://localhost:9000/user/S2V_job2509086937642333836
20/04/27 01:55:50 ERROR S2VUtils: Unable to delete the HDFS path: hdfs://localhost:9000/user/S2V_job2509086937642333836
20/04/27 01:55:50 ERROR S2V: Failed to save DataFrame to Vertica table: second0.car with SaveMode: Append
20/04/27 01:55:50 ERROR JobScheduler: Error running job streaming job 1587932740000 ms.2
java.lang.Exception: S2V: FATAL ERROR for job S2V_job2509086937642333836. Job status information is available in the Vertica table second0.S2V_JOB_STATUS_USER_DBADMIN. Unable to create/insert into target table: second0.car with SaveMode: Append. ERROR MESSAGE: ERROR: java.lang.Exception: S2V: FATAL ERROR for job S2V_job2509086937642333836. Unable to save intermediate orc files to HDFS path: hdfs://localhost:9000/user/S2V_job2509086937642333836. Error message: The ORC data source must be used with Hive support enabled;
at com.vertica.spark.s2v.S2V.do2Stage(S2V.scala:446)
at com.vertica.spark.s2v.S2V.save(S2V.scala:496)
at com.vertica.spark.datasource.DefaultSource.createRelation(VerticaSource.scala:100)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:469)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at replica_nimble_spark.SparkVerticaHelper$$anonfun$applyPipeline$1$$anonfun$apply$3.apply(SparkVerticaHelper.scala:85)
at replica_nimble_spark.SparkVerticaHelper$$anonfun$applyPipeline$1$$anonfun$apply$3.apply(SparkVerticaHelper.scala:76)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Upvotes: 1
Views: 755
Reputation: 29175
The question is if it is working as expected from the spark shell why is it not working outside from it ?
The answer is your error message :
Error message: The ORC data source must be used with Hive support enabled;
at com.vertica.spark.s2v.S2V.do2Stage(S2V.scala:446)
Means you have to enable hive support like this example to fix this error.
val spark = SparkSession
.builder()
.appName("Mr potterpod wants to test spark hive support")
.master("local[*]")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport() // this is what I was talking about
.getOrCreate()
Why from spark-shell its working ?
Answer : spark-shell
enables Hive support by default greater than or equal to Spark 2.0.
Proof : To test the default nature open spark-shell with out any options then do this...
scala> spark.sparkContext.getConf.get("spark.sql.catalogImplementation")
res3: String = hive
If you want to test this feature by disabling hive support in spark-shell
using spark.sql.catalogImplementation
Options for this property are (in-memory or hive)
spark-shell --conf spark.sql.catalogImplementation=in-memory
then you will hit the same error in spark-shell also
Further reading How to enable or disable Hive support in spark-shell through Spark property (Spark 1.6)?
Upvotes: 3