Sawan S
Sawan S

Reputation: 97

pyspark autoincrement column

I have a pyspark dataframe with the below format.

Table A:

 +----+--------+------+-------------+
    | ID |  date  | type | description |
    +----+--------+------+-------------+
    |  1 | 201905 | A    | descA       |
    |  2 | 202006 | B    | descB       |
    |  3 | 201503 | C    | descC       |
    |  4 | 201507 | D    | descD       |
    |  5 | 201601 | E    | descE       |
    |  6 | 201809 | F    | descF       |
    |  7 | 201011 | G    | descG       |
    +----+--------+------+-------------+

I have another table B which I need to append to table A. This table does not have the ID column. Table B

 +--------+------+-------------+
    |  Date  | Type | description |
    +--------+------+-------------+
    | 201001 | H    | descH       |
    | 201507 | I    | descI       |
    | 201907 | J    | descJ       |
    +--------+------+-------------+

Table B needs to appended to table A and the ID column must be auto-incremented by 1 for every additional entry as shown below.

Output table:

+----+--------+------+-------------+
| ID |  date  | type | description |
+----+--------+------+-------------+
|  1 | 201905 | A    | descA       |
|  2 | 202006 | B    | descB       |
|  3 | 201503 | C    | descC       |
|  4 | 201507 | D    | descD       |
|  5 | 201601 | E    | descE       |
|  6 | 201809 | F    | descF       |
|  7 | 201011 | G    | descG       |
|  8 | 201001 | H    | descH       |
|  9 | 201507 | I    | descI       |
| 10 | 201907 | J    | descJ       |
+----+--------+------+-------------+

Can you please tell me how I can do this using Pyspark?

Thanks.

Upvotes: 0

Views: 393

Answers (1)

Cena
Cena

Reputation: 3419

You can assign row numbers to Table B starting from 1 and then add the max of the ID column from Table A to it. So the first row of Table B becomes 8 (1+7).

For union, use unionByName since the column orders will be different. It unions dataframes by column names (and not by position).

from pyspark.sql import functions as F
from pyspark.sql.window import Window

max_id = df_a.agg({"ID": "max"}).collect()[0][0]

w = Window().orderBy("date", "type")
df_a.unionByName(df_b.withColumn("ID", (max_id + F.row_number().over(w)).cast("int"))).show()

+---+------+----+-----------+
| ID|  date|type|description|
+---+------+----+-----------+
|  1|201905|   A|      descA|
|  2|202006|   B|      descB|
|  3|201503|   C|      descC|
|  4|201507|   D|      descD|
|  5|201601|   E|      descE|
|  6|201809|   F|      descF|
|  7|201011|   G|      descG|
|  8|201001|   H|      descH|
|  9|201507|   I|      descI|
| 10|201907|   J|      descJ|
+---+------+----+-----------+

Upvotes: 2

Related Questions