duckertito
duckertito

Reputation: 3625

Sum up the values of the DataFrame based on conditions

I have a DataFrame that is created as follows:

  df = sc
       .textFile("s3n://bucket/key/data.txt")
       .map(_.split(","))
       .toDF()

This is the content of data.txt:

123,2016-11-09,1
124,2016-11-09,2
123,2016-11-10,1
123,2016-11-11,1
123,2016-11-12,1
124,2016-11-13,1
124,2016-11-14,1

Is it possible to filter df in order to get the sum of 3rd column values for 123 for the last N days starting from now? I am interested in a flexible solution so that N could be defined as a parameter.

For example, if today would be 2016-11-16 and N would be equal to 5, then the sum of 3rd column values for 124 would be equal to 2.

This is my current solution:

  df = sc
       .textFile("s3n://bucket/key/data.txt")
       .map(_.split(","))
       .toDF(["key","date","qty"])

val starting_date = LocalDate.now().minusDays(x_last_days)
df.filter(col("key") === "124")
                    .filter(to_date(df("date")).gt(starting_date))
                    .agg(sum(col("qty")))

but it does not seem to work properly. 1. The line where I define column names ["key","date","qty"] does not compile for Scala 2.10.6 and Spark 1.6.2. 2. Also it returns a dataframe, while I need Int. Should I just do toString.toInt?

Upvotes: 2

Views: 1579

Answers (1)

eliasah
eliasah

Reputation: 40360

Both of the following won't compile :

scala> val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1")).map(_.split(",")).toDF(["key","date","qty"])
// <console>:1: error: illegal start of simple expression
//       val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1")).map(_.split(",")).toDF(["key","date","qty"])
                                                                                                                                                                                             ^

scala> val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1")).map(_.split(",")).toDF
// <console>:27: error: value toDF is not a member of org.apache.spark.rdd.RDD[Array[String]]
//       val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1")).map(_.split(",")).toDF
                                                                                                                                                                                          ^

The first one won't because it's a incorrect syntax and as for the second, it is because, like the error says, it's not a member, in other terms, the action is not supported.

The later one will compile with Spark 2.x but the following solution would also apply or you'll have a DataFrame with one column of type ArrayType.

Now let's solve the issue :

scala> :pa
// Entering paste mode (ctrl-D to finish)
import sqlContext.implicits._ // you don't need to import this in the shell.
val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1"))
           .map{ _.split(",") match { case Array(a,b,c) => (a,b,c) }}.toDF("key","date","qty")

// Exiting paste mode, now interpreting.

// df: org.apache.spark.sql.DataFrame = [key: string, date: string, qty: string]

You can apply any filter you want and compute the aggregation needed, e.g :

scala> val df2 = df.filter(col("key") === "124").agg(sum(col("qty")))
// df2: org.apache.spark.sql.DataFrame = [sum(qty): double]

scala> df2.show
// +--------+                                                                      
// |sum(qty)|
// +--------+
// |     4.0|
// +--------+

PS: The above code has been tested in Spark 1.6.2 and 2.0.0

Upvotes: 4

Related Questions