astro_asz
astro_asz

Reputation: 2318

Spark Window Function rangeBetween producing incorrect results

I am trying to do a window function on a Spark DataFrame using RangeBetween on a column of type Long, and the results of the window are not correct. Am I doing something wrong?

Here is my DataFrame:

val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
      Seq(
        Row("2014-11-01 08:10:10.12345", 141482941012345L),
        Row("2014-11-01 09:10:10.12345", 141483301012345L),
        Row("2014-11-01 10:10:10.12345", 141483661012345L),
        Row("2014-11-02 10:10:10.12345", 141492301012345L),
        Row("2014-11-03 10:10:10.12345", 141500941012345L),
        Row("2014-11-04 10:10:10.12345", 141509581012345L),
        Row("2014-11-05 10:10:10.12345", 141518221012345L),
        Row("2014-11-06 10:10:10.12345", 141526861012345L),
        Row("2014-11-07 10:10:10.12345", 141535501012345L),
        Row("2014-11-08 10:10:10.12345", 141544141012345L)
      )
    )
val schema = new StructType()
  .add(StructField("dateTime", StringType, true))
  .add(StructField("unixTime", LongType, true))

val df = spark.createDataFrame(rowsRdd, schema)
df.show(10, false)
df.printSchema()

Which is:

+-------------------------+---------------+
|dateTime                 |unixTime       |
+-------------------------+---------------+
|2014-11-01 08:10:10.12345|141482941012345|
|2014-11-01 09:10:10.12345|141483301012345|
|2014-11-01 10:10:10.12345|141483661012345|
|2014-11-02 10:10:10.12345|141492301012345|
|2014-11-03 10:10:10.12345|141500941012345|
|2014-11-04 10:10:10.12345|141509581012345|
|2014-11-05 10:10:10.12345|141518221012345|
|2014-11-06 10:10:10.12345|141526861012345|
|2014-11-07 10:10:10.12345|141535501012345|
|2014-11-08 10:10:10.12345|141544141012345|
+-------------------------+---------------+

Schema:

root
 |-- dateTime: string (nullable = true)
 |-- unixTime: long (nullable = true)

The first column is a timestamp of an event (string, we will not use it in practice) and second column is the unix time corresponding to the timestamp in units of 10e-5 seconds.

Now I want to calculate a number of events in the window proceeding the current row. For example with 3 hour window I do:

val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-3*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))

Which returns correctly:

+-------------------------+---------------+---+
|dateTime                 |unixTime       |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|1  |
|2014-11-01 09:10:10.12345|141483301012345|2  |
|2014-11-01 10:10:10.12345|141483661012345|3  |
|2014-11-02 10:10:10.12345|141492301012345|1  |
|2014-11-03 10:10:10.12345|141500941012345|1  |
|2014-11-04 10:10:10.12345|141509581012345|1  |
|2014-11-05 10:10:10.12345|141518221012345|1  |
|2014-11-06 10:10:10.12345|141526861012345|1  |
|2014-11-07 10:10:10.12345|141535501012345|1  |
|2014-11-08 10:10:10.12345|141544141012345|1  |
+-------------------------+---------------+---+

Here is the result for window of 6 hours. Why is result all 0 now?

val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-6*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))

+-------------------------+---------------+---+
|dateTime                 |unixTime       |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|0  |
|2014-11-01 09:10:10.12345|141483301012345|0  |
|2014-11-01 10:10:10.12345|141483661012345|0  |
|2014-11-02 10:10:10.12345|141492301012345|0  |
|2014-11-03 10:10:10.12345|141500941012345|0  |
|2014-11-04 10:10:10.12345|141509581012345|0  |
|2014-11-05 10:10:10.12345|141518221012345|0  |
|2014-11-06 10:10:10.12345|141526861012345|0  |
|2014-11-07 10:10:10.12345|141535501012345|0  |
|2014-11-08 10:10:10.12345|141544141012345|0  |
+-------------------------+---------------+---+

Here is what happens for 12 hours. Why is result all 1 now?

val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-12*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))

+-------------------------+---------------+---+
|dateTime                 |unixTime       |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|1  |
|2014-11-01 09:10:10.12345|141483301012345|1  |
|2014-11-01 10:10:10.12345|141483661012345|1  |
|2014-11-02 10:10:10.12345|141492301012345|1  |
|2014-11-03 10:10:10.12345|141500941012345|1  |
|2014-11-04 10:10:10.12345|141509581012345|1  |
|2014-11-05 10:10:10.12345|141518221012345|1  |
|2014-11-06 10:10:10.12345|141526861012345|1  |
|2014-11-07 10:10:10.12345|141535501012345|1  |
|2014-11-08 10:10:10.12345|141544141012345|1  |
+-------------------------+---------------+---+

What is going on here? It does not work correctly with any large rangeBetween values.

Edit: 9/11/2017

Is it something related to this issu? [SPARK-19451][SQL] rangeBetween method should accept Long value as boundary #18540. Is it already implemented in latest version of Spark?

Upvotes: 2

Views: 2032

Answers (1)

user8913786
user8913786

Reputation: 36

It is indeed related to the linked issue. 6 * hour is 2160000000 which is larger than Integer.MAX_VALUE (2147483647) therefore it results in integer overflow:

scala> (6 * hour).toInt
res4: Int = -2134967296

The issue has been fixed on current master and will be released in Spark 2.3.

Upvotes: 2

Related Questions