Reputation: 3625
I have a DataFrame that is created as follows:
df = sc
.textFile("s3n://bucket/key/data.txt")
.map(_.split(","))
.toDF()
This is the content of data.txt
:
123,2016-11-09,1
124,2016-11-09,2
123,2016-11-10,1
123,2016-11-11,1
123,2016-11-12,1
124,2016-11-13,1
124,2016-11-14,1
Is it possible to filter df
in order to get the sum of 3rd column values for 123
for the last N days starting from now? I am interested in a flexible solution so that N could be defined as a parameter.
For example, if today would be 2016-11-16
and N
would be equal to 5, then the sum of 3rd column values for 124
would be equal to 2
.
This is my current solution:
df = sc
.textFile("s3n://bucket/key/data.txt")
.map(_.split(","))
.toDF(["key","date","qty"])
val starting_date = LocalDate.now().minusDays(x_last_days)
df.filter(col("key") === "124")
.filter(to_date(df("date")).gt(starting_date))
.agg(sum(col("qty")))
but it does not seem to work properly. 1. The line where I define column names ["key","date","qty"]
does not compile for Scala 2.10.6 and Spark 1.6.2. 2. Also it returns a dataframe, while I need Int
. Should I just do toString.toInt
?
Upvotes: 2
Views: 1579
Reputation: 40360
Both of the following won't compile :
scala> val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1")).map(_.split(",")).toDF(["key","date","qty"])
// <console>:1: error: illegal start of simple expression
// val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1")).map(_.split(",")).toDF(["key","date","qty"])
^
scala> val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1")).map(_.split(",")).toDF
// <console>:27: error: value toDF is not a member of org.apache.spark.rdd.RDD[Array[String]]
// val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1")).map(_.split(",")).toDF
^
The first one won't because it's a incorrect syntax and as for the second, it is because, like the error says, it's not a member, in other terms, the action is not supported.
The later one will compile with Spark 2.x but the following solution would also apply or you'll have a DataFrame
with one column of type ArrayType
.
Now let's solve the issue :
scala> :pa
// Entering paste mode (ctrl-D to finish)
import sqlContext.implicits._ // you don't need to import this in the shell.
val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1"))
.map{ _.split(",") match { case Array(a,b,c) => (a,b,c) }}.toDF("key","date","qty")
// Exiting paste mode, now interpreting.
// df: org.apache.spark.sql.DataFrame = [key: string, date: string, qty: string]
You can apply any filter you want and compute the aggregation needed, e.g :
scala> val df2 = df.filter(col("key") === "124").agg(sum(col("qty")))
// df2: org.apache.spark.sql.DataFrame = [sum(qty): double]
scala> df2.show
// +--------+
// |sum(qty)|
// +--------+
// | 4.0|
// +--------+
PS: The above code has been tested in Spark 1.6.2 and 2.0.0
Upvotes: 4