Sam-T
Sam-T

Reputation: 1965

Spark Sql mapping issue

Sparks2/Java8 Cassandra2 Trying to read some data from Cassandra and then run a group by query in sparks. I have only 2 columns in my DF transdate (Date), origin (String)

Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins  GROUP BY (origin,transdate) ORDER BY cnt DESC LIMIT 1"); `

Get Error:

 `Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'origins.`origin`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value)`

The group by issue got solved removing ( ) in group by as below

Complete code: (trying to get max number of trans on date for a origin/location)

JavaRDD<TransByDate> originDateRDD = javaFunctions(sc).cassandraTable("trans", "trans_by_date", CassandraJavaUtil.mapRowTo(TransByDate.class))
                    .select(CassandraJavaUtil.column("origin"), CassandraJavaUtil.column("trans_date").as("transdate")).limit((long)100) ;
Dataset<Row> originDF = sparks.createDataFrame(originDateRDD, TransByDate.class);
String[] columns = originDF.columns();
System.out.println("originDF columns: "+columns[0]+" "+columns[1]) ; -> transdate origin
originDF.createOrReplaceTempView("origins");

Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins  GROUP BY origin,transdate ORDER BY cnt DESC LIMIT 1"); 
List list = maxOrigindate.collectAsList(); -> Exception here
int j = list.size();

originDF columns: transdate origin

`public static class TransByDate implements Serializable {
        private String origin;
        private Date transdate;

        public TransByDate() { }

        public TransByDate (String origin, Date transdate) { 
            this.origin = origin;
            this.transdate= transdate;

        }

        public String getOrigin() { return origin; }
        public void setOrigin(String origin) { this.origin = origin; }

        public Date getTransdate() { return transdate; }
        public void setTransdate(Date transdate) { this.transdate = transdate; }

    }

Schema

root
 |-- transdate: struct (nullable = true)
 |    |-- date: integer (nullable = false)
 |    |-- day: integer (nullable = false)
 |    |-- hours: integer (nullable = false)
 |    |-- minutes: integer (nullable = false)
 |    |-- month: integer (nullable = false)
 |    |-- seconds: integer (nullable = false)
 |    |-- time: long (nullable = false)
 |    |-- timezoneOffset: integer (nullable = false)
 |    |-- year: integer (nullable = false)
 |-- origin: string (nullable = true)

Exception ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 12) scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) .... Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 12, localhost): scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) ... Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) ... at org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2184) at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559) at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2184) at spark.SparkTest.sqlMaxCount(SparkTest.java:244) -> List list = maxOrigindate.collectAsList();

Caused by: scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)

Upvotes: 3

Views: 1109

Answers (2)

abaghel
abaghel

Reputation: 15297

You are getting below error.

Caused by: scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) at 

This error is because Spark sql supports java.sql.Date type. Please check the Spark documentation here. You can also refer SPARK-2562.

Upvotes: 1

Rajat Mishra
Rajat Mishra

Reputation: 3770

change the query to

Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, 
transdate, 
COUNT(*) AS cnt FROM origins  GROUP BY origin,transdate 
ORDER BY cnt DESC LIMIT 1"); 

this will work.

Upvotes: 1

Related Questions