Reputation: 4022
Consider the below example
>>> l = [("US","City1",125),("US","City2",123),("Europe","CityX",23),("Europe","CityY",17)]
>>> print l
[('US', 'City1', 125), ('US', 'City2', 123), ('Europe', 'CityX', 23), ('Europe', 'CityY', 17)]
>>> sc = SparkContext(appName="N")
>>> sqlsc = SQLContext(sc)
>>> df = sqlsc.createDataFrame(l)
>>> df.printSchema()
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
|-- _3: long (nullable = true)
>>> df.registerTempTable("t1")
>>> rdf=sqlsc.sql("Select _1,sum(_3) from t1 group by _1").show()
+------+---+
| _1|_c1|
+------+---+
| US|248|
|Europe| 40|
+------+---+
>>> rdf.printSchema()
root
|-- _1: string (nullable = true)
|-- _c1: long (nullable = true)
>>> rdf.registerTempTable("t2")
>>> sqlsc.sql("Select * from t2 where _c1 > 200").show()
+---+---+
| _1|_c1|
+---+---+
| US|248|
+---+---+
So basically, I am trying to find all the _3
(which can be population subscribed to some service) which are above threshold in each country. In the above table, there is an additional dataframe is created (rdf
)
Now, How do I eliminate the rdf
dataframe and embed the complete query within df
dataframe itself.
I tried, but pyspark throws error
>>> sqlsc.sql("Select _1,sum(_3) from t1 group by _1").show()
+------+---+
| _1|_c1|
+------+---+
| US|248|
|Europe| 40|
+------+---+
>>> sqlsc.sql("Select _1,sum(_3) from t1 group by _1 where _c1 > 200").show()
Traceback (most recent call last):
File "/ghostcache/kimanjun/spark-1.6.0/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o28.sql.
: java.lang.RuntimeException: [1.39] failure: ``union'' expected but `where' found
Upvotes: 0
Views: 158
Reputation: 7742
Here is a solution with no kind of temp tables:
#Do this to don't have conflict with sum in built-in spark functions
from pyspark.sql import sum as _sum
gDf = df.groupBy(df._1).agg(_sum(df._3).alias('sum'))
gDf.filter(gDf.sum > 200).show()
This solution we have a way of group and aggregate with a sum. To make sure that you don't have issues with the sum. Is better to the filter in another object.
I recommend you this link to see some useful ways much more powerful than using direct SQL in the dataframe.
Upvotes: 1