Raghu
Raghu

Reputation: 1712

Running subqueries in pyspark using where or filter statement

I am trying to run a subquery in pyspark. I see that it is possible using SQL statements. But is there any inherent support using "where" or "filter" operations?

Consider the test data frame :

from pyspark.sql import SparkSession
sqlContext = SparkSession.builder.appName('test').enableHiveSupport().getOrCreate() 
tst = sqlContext.createDataFrame([(1,2),(4,3),(1,4),(1,5),(1,6)],schema=['sample','time'])
tst_sub = sqlContext.createDataFrame([(1,2),(4,3),(1,4)],schema=['sample','time'])
#%% using where to query the df
tst.where(F.col('time')>4).show()
+------+----+
|sample|time|
+------+----+
|     1|   5|
|     1|   6|
+------+----+

Here you can see that the where function is working fine. When I try to do the same using a subquery , like this:

#%% using where with subquery
tst.where(F.col('time')>F.max(tst_sub.select('time'))).show()

I get this error:

AttributeError Traceback (most recent call last) in ----> 1 tst.where(F.col('time')>F.max(tst_sub.select('time'))).show()

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4744.12781922/lib/spark/python/pyspark/sql/functions.py in _(col) 42 def _(col): 43 sc = SparkContext._active_spark_context ---> 44 jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) 45 return Column(jc) 46 _.name = name

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4744.12781922/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args) 1246 1247 def call(self, *args): -> 1248 args_command, temp_args = self._build_args(*args) 1249 1250 command = proto.CALL_COMMAND_NAME +\

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4744.12781922/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _build_args(self, *args) 1216 1217 args_command = "".join( -> 1218 [get_command_part(arg, self.pool) for arg in new_args]) 1219 1220 return args_command, temp_args

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4744.12781922/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in (.0) 1216 1217 args_command = "".join( -> 1218 [get_command_part(arg, self.pool) for arg in new_args]) 1219 1220 return args_command, temp_args

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4744.12781922/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool) 296 command_part += ";" + interface 297 else: --> 298 command_part = REFERENCE_TYPE + parameter._get_object_id() 299 300 command_part += "\n"

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4744.12781922/lib/spark/python/pyspark/sql/dataframe.py in getattr(self, name) 1298 if name not in self.columns: 1299 raise AttributeError( -> 1300 "'%s' object has no attribute '%s'" % (self.class.name, name)) 1301 jc = self._jdf.apply(name) 1302 return Column(jc)

AttributeError: 'DataFrame' object has no attribute '_get_object_id'

When I register the dataframes as table and perform a sql query, it works fine:

tst.createOrReplaceTempView("tst")
tst_sub.createOrReplaceTempView("tst_sub")
sqlContext.sql("SELECT * FROM tst WHERE time>(SELECT(max(time)) FROM tst_sub)").show()

Is there any method to perform a subquery in pyspark on the dataframes directly using filter, where or any other methods?

Upvotes: 2

Views: 4130

Answers (1)

mck
mck

Reputation: 42422

You need to collect the max time into a numerical variable in Python before putting it in the filter:

tst.where(F.col('time') > tst_sub.select(F.max('time')).head()[0]).show()
+------+----+
|sample|time|
+------+----+
|     1|   5|
|     1|   6|
+------+----+

Upvotes: 1

Related Questions