alzinos
alzinos

Reputation: 71

Add column to pyspark dataframe with non-unique ids by other key

Apologies for the title - don't know how to easily summarise my issue.

I have a pyspark dataframe with 2 columns, code and emp. Each unique code value has multiple emp values, as shown below. I wish to add a column which for each unique code value, applies an incrementing number, e.g. the value column below. I've had a play with monotonicallyIncreasingId(), and haven't managed to limit its id creation to one specific code key, and indeed the documentation says that the indexes don't necessary increment in order.

+----+---+-----+
|code|emp|value|
+----+---+-----+
|   a| 14|    1|
|   a| 22|    2|
|   a| 35|    3|
|   a| 64|    4|
|   b| 12|    1|
...
+----+---+-----+

There will, at most, be 4 emp values per code value if that makes any impact on efficiency. The indexes should increment with the size of the emp value - the lowest should have value 1, the highest value n, where n is the number of records with a specific code.

Upvotes: 1

Views: 2279

Answers (3)

pvy4917
pvy4917

Reputation: 1822

You can use row_number() with Windowing functions.

First import Window and row_number,

from pyspark.sql import Window
from pyspark.sql.functions import row_number()

Assuming your scenario with the following columns and values

>>> cols1 = ['code', 'emp']
>>> vals1 = [
     ('a', 14),
     ('a', 22),
     ('a', 35),
     ('a', 64),
     ('b', 12),
     ('b', 35)
]
# Create a DataFrame
>>> df1 = spark.createDataFrame(vals1, cols1)

# Result of 'df1' table.
>>> df1.show()
+----+---+
|code|emp|
+----+---+
|   a| 14|
|   a| 22|
|   a| 35|
|   a| 64|
|   b| 12|
|   b| 35|
+----+---+

Apply, row_number() on over column code.

>>> val = df1.withColumn("value", row_number().over(Window.partitionBy("code").orderBy("emp")))

>>> val.show()
+----+---+-----+
|code|emp|value|
+----+---+-----+
|   b| 12|    1|
|   b| 35|    2|
|   a| 14|    1|
|   a| 22|    2|
|   a| 35|    3|
|   a| 64|    4|
+----+---+-----+

Finally, order by column code to get the desired result.

>>> val.orderBy('code').show()
+----+---+-----+
|code|emp|value|
+----+---+-----+
|   a| 14|    1|
|   a| 22|    2|
|   a| 35|    3|
|   a| 64|    4|
|   b| 12|    1|
|   b| 35|    2|
+----+---+-----+
  • partitionBy: Creates a WindowSpec with the partitioning defined.

For more information, refer:

Upvotes: 2

Tim
Tim

Reputation: 2843

You could create a temp view and use Spark SQL for this:

>>> df = spark.createDataFrame([('a', 14), ('a', 22), ('a', 35), ('a', 64), ('b', 12)], ['code', 'emp'])
>>> df.show()
+----+---+
|code|emp|
+----+---+
|   a| 14|
|   a| 22|
|   a| 35|
|   a| 64|
|   b| 12|
+----+---+
>>> df.createOrReplaceTempView("df")
>>> df2 = spark.sql("select code, emp, row_number() over(partition by code order by emp) as value from df order by code")
>>> df2.show()
+----+---+-----+
|code|emp|value|
+----+---+-----+                                                                                                         
|   a| 14|    1|                                                                                                             
|   a| 22|    2|                                                                                                             
|   a| 35|    3|                                                                                                          
|   a| 64|    4| 
|   b| 12|    1|                                                                                                             
+----+---+-----+

Upvotes: 0

M.Selman SEZGİN
M.Selman SEZGİN

Reputation: 313

For Scala you can create a dataframe with incremental index column like this:

%scala
val rankedWordCount = sqlContext.sql("select row_number() over (order by some_numeric_value desc) as index_col,lower(info) as info, some_numeric_value from information_table")

Upvotes: 0

Related Questions