Reputation: 388
I'm just trying out dotnet spark. I modified the sample program to write the DataFrame contents into a parquet file. However I am getting an exception which does not seem to have a helpful info. May I know what may be causing the exception? Or is there somewhere the exception logs can be more helpful?
20/12/09 15:04:32 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 20/12/09 15:04:32 INFO Executor: Executor killed task 1.0 in stage 6.0 (TID 604), reason: Stage cancelled
[2020-12-09T07:04:32.6029517Z] [HIS2547] [Exception] [JvmBridge] JVM method execution failed: Nonstatic method parquet failed for class 22 when called with 1 arguments ([Index=1, Type=String, Value=myparquet1], ) at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args) 20/12/09 15:04:32 WARN TaskSetManager: Lost task 1.0 in stage 6.0 (TID 604, localhost, executor driver): TaskKilled (Stage cancelled)
20/12/09 15:04:32 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool Exception saving to parquetSystem.Exception: JVM method execution failed: Nonstatic method parquet failed for class 22 when called with 1 arguments ([Index=1, Type=String, Value=myparquet1], ) at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args) at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object arg0) at Microsoft.Spark.Interop.Ipc.JvmBridge.CallNonStaticJavaMethod(JvmObjectReference objectId, String methodName, Object arg0) at Microsoft.Spark.Interop.Ipc.JvmObjectReference.Invoke(String methodName, Object arg0) at Microsoft.Spark.Sql.DataFrameWriter.Parquet(String path) at MySparkApp.Program.Main(String[] args) in C:\Users\Administrator\mySparkApp\Program.cs:line 46
This is my code:
class Program
{
static void Main(string[] args)
{
//BuildWebHost(args).Run();
// Create a Spark session
SparkSession spark = SparkSession
.Builder()
.AppName("word_count_sample1")
.GetOrCreate();
// Create initial DataFrame
DataFrame dataFrame = spark.Read().Text(@"C:\Users\Administrator\mySparkApp\input.txt");
// Count words
DataFrame words = dataFrame
.Select(Functions.Split(Functions.Col("value"), " ").Alias("words"))
.Select(Functions.Explode(Functions.Col("words"))
.Alias("word"))
.GroupBy("word")
.Count()
.OrderBy(Functions.Col("count").Desc());
// Show results
words.Show();
try
{
//words.Write().Mode(SaveMode.Append).Parquet("parquet.wordcount");
var dataFrameWriter = words.Write();
dataFrameWriter.Mode(SaveMode.Overwrite); // Append does not work either
dataFrameWriter.Parquet("myparquet1");
}
catch (Exception ex)
{
Console.WriteLine("Exception saving to parquet" + ex.ToString());
}
spark.Stop();
}
Basically the code simply creates the folder I specified in my parquet file's path parameter, but the contents are empty. If I try to read the parquet file previously created by my Scala driver using dotnet Spark, it reads just fine. Only the write from dotnet spark does not work. Any help would be appreciated. Thank you!
Upvotes: 1
Views: 553