Tobi
Tobi

Reputation: 31479

Read CSV as dataframe and convert to JSON string

I'm trying to aggregate a CSV file via Spark SQL and then show the result as JSON:

val people = sqlContext.read().format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", ",").load("/tmp/people.csv")  
people.registerTempTable("people")  
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")

That's where I'm stuck. I can to a result.schema().prettyJson() which works flawlessly, but I don't find a way to return the result as JSON.

I was assuming that result.toJSON.collect() should do what I desire, but this fails with a

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 101.0 failed 1 times, most recent failure: Lost task 1.0 in stage 101.0 (TID 159, localhost): java.lang.NegativeArraySizeException
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:171)
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:162)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:511)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:704)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:704)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

error. Can somebody guide me?

Upvotes: 0

Views: 5942

Answers (3)

bhess
bhess

Reputation: 21

Try

val people = sqlContext.read().format("com.databricks.spark.csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .option("mode", "DROPMALFORMED")
  .option("delimiter", ",")
  .load("/tmp/people.csv")  
people.registerTempTable("people")  
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")

Upvotes: 1

Tobi
Tobi

Reputation: 31479

Turns out this error was because of a "malformed" CSV file. It contained some rows which had more columns than others (with no header field name)... Strange error message though.

Upvotes: 1

Ewan Leith
Ewan Leith

Reputation: 1665

The error you're getting is odd, it sounds like result is probably empty?

You might want to try this command on the dataframe to get each line printed out instead:

result.toJSON.foreach(println)

See the Dataframe API for a little more information

Upvotes: 2

Related Questions