linog
linog

Reputation: 6226

Apply window function in Spark with non constant frame size

My Problem

I am currently facing difficulties with Spark window functions. I am using Spark (through pyspark) version 1.6.3 (associated Python version 2.6.6). I run a pyspark shell instance that automatically initializes HiveContext as my sqlContext.

I want to do a rolling sum with window function. My problem is that the window frame is not fixed: it depends on the observation we consider. To be more specific, I order data by a variable called rank_id and want to do rolling sum, for any observation indexed $x$ between indexes $x+1$ and $2x-1$. Thus, my rangeBetween must depend on the rank_id variable value.

An important point is that I don't want to collect data thus cannot use anything like numpy (my data have many many observations).

Reproducible example

from pyspark.mllib.random import RandomRDDs
import pyspark.sql.functions as psf
from pyspark.sql.window import Window

# Reproducible example
data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
df = df.selectExpr("_1 as x", "_2 as y")

#df.show(2)
#+-------------------+------------------+                                        
#|                  x|                 y|
#+-------------------+------------------+
#|0.32767742062486405|0.2506351566289311|
#| 0.7245348534550357| 0.597929853274274|
#+-------------------+------------------+
#only showing top 2 rows

# Finalize dataframe creation
w = Window().orderBy("x")
df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
#df.show(3)
#+--------------------+--------------------+-------+                             
#|                   x|                   y|rank_id|
#+--------------------+--------------------+-------+
#|0.016536160706045577|0.009892450530381458|      1|
#| 0.10943843181953838|  0.6478505849227775|      2|
#| 0.13916818312857027| 0.24165348228464578|      3|
#+--------------------+--------------------+-------+
#only showing top 3 rows

Fixed width cumulative sum: no problem

Using window function, I am able to run a cumulative sum on a given number of indexes (I use here rangeBetween but for this example rowBetween could be used indifferently).

w = Window.orderBy('rank_id').rangeBetween(-1,3)
df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
#df1.show(3)
#+--------------------+--------------------+-------+------------------+          
#|                   x|                   y|rank_id|             roll1|
#+--------------------+--------------------+-------+------------------+
#|0.016536160706045577|0.009892450530381458|      1|0.9698521852602887|
#| 0.10943843181953838|  0.6478505849227775|      2|1.5744700156326066|
#| 0.13916818312857027| 0.24165348228464578|      3|2.3040547273760392|
#+--------------------+--------------------+-------+------------------+
#only showing top 3 rows

Cumulative sum width not fixed

I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error

# Now if I want to make rangeBetween size depend on a variable
w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)

Traceback (most recent call last): File "", line 1, in TypeError: cannot concatenate 'str' and 'int' objects

I tried something else, using SQL statement

# Using SQL expression
df.registerTempTable('tempdf')
df2 = sqlContext.sql("""
   SELECT *, SUM(y)
   OVER (ORDER BY rank_id
   RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
   FROM tempdf;
""")

which this times gives me the following error

Traceback (most recent call last): File "", line 6, in File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql return DataFrame(self._ssql_ctx.sql(sqlQuery), self) File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"

I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark

df2 = sqlContext.sql("""
   SELECT *, SUM(y)
   OVER (ORDER BY rank_id
   RANGE BETWEEN -1 AND 1) AS cumsum
   FROM tempdf;
 """)

Traceback (most recent call last): File "", line 6, in File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql return DataFrame(self._ssql_ctx.sql(sqlQuery), self) File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"

How could I solve my problem by using either window or SQL statement within Spark?

Upvotes: 3

Views: 3979

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35249

How could I solve my problem by using either window or SQL statement within Spark?

TL;DR You cannot, or at least not in a scalable way, with current requirements. You can try something similar to sliding over RDD: How to transform data with sliding window over time series data in Pyspark

I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark

It is incorrect. Range specification requires (PRECEDING | FOLLOWING | CURRENT_ROW) specification. Also there should be no semicolon:

SELECT *, SUM(x)
OVER (ORDER BY rank_id
RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
FROM tempdf

I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...

TypeError: cannot concatenate 'str' and 'int' objects

As exception says - you cannot call + on string and integer. You probably wanted columns:

from pyspark.sql.functions import col

.rangeBetween(col('rank_id') + 1,  2* col('rank_id') - 1)

but this is not supported. Range has to be of fixed size and cannot be defined in terms of expressions.

An important point is that I don't want to collect data

Window definition without partitionBy:

w = Window.orderBy('rank_id').rangeBetween(-1,3)

is as bad as collect. So even if there are workarounds for "dynamic frame" (with conditionals and unbounded window) problem, they won't help you here.

Upvotes: 3

Related Questions