Reputation: 35
I try to show the 5 tickers with the highest 'prct_chng' on a particular day for all days. How prepare code which will do above task?
Sample data:
+------+-------------------+-----+-----+-----+-----+-------+-------+-------+-------------+
|TICKER| DATE| OPEN| HIGH| LOW|CLOSE| VOL| SMA50| SMA100| prct_chng|
+------+-------------------+-----+-----+-----+-----+-------+-------+-------+-------------+
| IAG|2020-07-22 00:00:00| 0.74| 0.74|0.685|0.725| 23873| 0.405| 0.308| 0.986|
| ITM|2020-07-22 00:00:00|0.153|0.153|0.145| 0.15| 9230| 0.230| 0.290| 1.000|
| ATG|2020-07-22 00:00:00| 4.96| 4.96| 4.91| 4.92| 1576| 4.552| 4.192| 1.002|
| ALL|2020-07-22 00:00:00| 8.2| 8.66| 8.02| 8.4| 14009| 7.077| 6.684| 1.017|
| SUW|2020-07-22 00:00:00| 17.0| 17.0| 16.8| 16.8| 35| 14.997| 13.481| 0.988|
| SPH|2020-07-22 00:00:00| 14.0| 14.0| 14.0| 14.0| 8| 14.195| 15.620| 1.037|
| CEZ|2020-07-22 00:00:00| 80.5| 80.5| 79.8| 79.9| 301| 78.593| 75.613| 0.988|
| IMP|2020-07-22 00:00:00| 1.2| 1.27| 1.19| 1.27| 55998| 1.030| 0.945| 1.104|
| 4FM|2020-07-22 00:00:00| 4.82| 4.82| 4.57| 4.59| 255| 4.702| 4.397| 0.998|
| ALR|2020-07-23 00:00:00|17.45| 17.6| 16.9|16.91| 327103| 17.006| 15.735| 0.971|
| CCC|2020-07-23 00:00:00| 65.0| 65.1|62.94| 63.7| 156646| 63.383| 54.450| 0.986|
| INK|2020-07-23 00:00:00|19.45|19.45|19.25|19.45| 861| 18.533| 16.939| 1.010|
| WPL|2020-07-23 00:00:00| 75.6| 75.6| 71.6| 71.6| 3231| 70.950| 69.491| 0.957|
| ECH|2020-07-23 00:00:00| 3.94| 3.98| 3.92| 3.95| 155636| 3.919| 3.984| 1.003|
| TEN|2020-07-23 00:00:00|550.0|560.0|540.0|541.0| 17752|542.903|490.941| 0.975|
+------+-------------------+-----+-----+-----+-----+-------+-------+-------+-------------+
Answer for this question is really close to reply from this post Top N items from a Spark DataFrame/RDD however there is no division by date.
Upvotes: 1
Views: 60
Reputation: 1405
You can use Window function -> row_number to partition by date and order by prct_chng in the desc order to select top5 ticker.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
#InputDF
# +------+-------------------+---------+
# |TICKER| DATE|prct_chng|
# +------+-------------------+---------+
# | IAG|2020-07-22 00:00:00| 0.986|
# | ITM|2020-07-22 00:00:00| 1.0|
# | ATG|2020-07-22 00:00:00| 1.002|
# | ALL|2020-07-22 00:00:00| 1.017|
# | SUW|2020-07-22 00:00:00| 0.988|
# | SPH|2020-07-22 00:00:00| 1.037|
# | CEZ|2020-07-22 00:00:00| 0.988|
# | IMP|2020-07-22 00:00:00| 1.104|
# | 4FM|2020-07-22 00:00:00| 0.998|
# | ALR|2020-07-23 00:00:00| 0.971|
# | CCC|2020-07-23 00:00:00| 0.986|
# | INK|2020-07-23 00:00:00| 1.01|
# | WPL|2020-07-23 00:00:00| 0.957|
# | ECH|2020-07-23 00:00:00| 1.003|
# | TEN|2020-07-23 00:00:00| 0.975|
# +------+-------------------+---------+
w= Window.partitionBy(F.to_timestamp("DATE")).orderBy(F.desc("prct_chng"))
df.withColumn("r_no", F.row_number().over(w)).filter(F.col("r_no") <=5).drop("r_no").show()
# +------+-------------------+---------+
# |TICKER| DATE|prct_chng|
# +------+-------------------+---------+
# | IMP|2020-07-22 00:00:00| 1.104|
# | SPH|2020-07-22 00:00:00| 1.037|
# | ALL|2020-07-22 00:00:00| 1.017|
# | ATG|2020-07-22 00:00:00| 1.002|
# | ITM|2020-07-22 00:00:00| 1.0|
# | INK|2020-07-23 00:00:00| 1.01|
# | ECH|2020-07-23 00:00:00| 1.003|
# | CCC|2020-07-23 00:00:00| 0.986|
# | TEN|2020-07-23 00:00:00| 0.975|
# | ALR|2020-07-23 00:00:00| 0.971|
# +------+-------------------+---------+
Upvotes: 1