Reputation: 83
I have a Dataframe in the following shape
1 2
5 9
How can I convert it to (row_num, col_num, value) format
0 0 1
0 1 2
1 0 5
1 1 9
Is there any way to apply some function or any mapper? Thanks in advance
Upvotes: 2
Views: 190
Reputation: 1712
In pyspark, row_number() and pos_explode will be helpful. Try this:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
tst= sqlContext.createDataFrame([(1,7,80),(1,8,40),(1,5,100),(5,8,90),(7,6,50),(0,3,60)],schema=['col1','col2','col3'])
tst1= tst.withColumn("row_number",F.row_number().over(Window.orderBy(F.lit(1)))-1)
#%%
tst_arr = tst1.withColumn("arr",F.array(tst.columns))
tst_new = tst_arr.select('row_number','arr').select('row_number',F.posexplode('arr'))
results:
In [47]: tst_new.show()
+----------+---+---+
|row_number|pos|col|
+----------+---+---+
| 0| 0| 1|
| 0| 1| 7|
| 0| 2| 80|
| 1| 0| 1|
| 1| 1| 8|
| 1| 2| 40|
| 2| 0| 1|
| 2| 1| 5|
| 2| 2|100|
| 3| 0| 5|
| 3| 1| 8|
| 3| 2| 90|
| 4| 0| 7|
| 4| 1| 6|
| 4| 2| 50|
| 5| 0| 0|
| 5| 1| 3|
| 5| 2| 60|
+----------+---+---+
Upvotes: 0
Reputation: 5536
You can do it by transposing the data as:
from pyspark.sql.functions import *
from pyspark.sql import Window
df = spark.createDataFrame([(1,2),(5,9)],['col1','col2'])
#renaming the columns based on their position
df = df.toDF(*list(map(lambda x: str(x),[*range(len(df.columns))])))
#Transposing the dataframe as required
col_list = ','.join([f'{i},`{i}`'for i in df.columns])
rows = len(df.columns)
df.withColumn('row_id',lit(row_number().over(Window.orderBy(lit(1)))-1)).select('row_id',
expr(f'''stack({rows},{col_list}) as (col_id,col_value)''')).show()
+------+------+---------+
|row_id|col_id|col_value|
+------+------+---------+
| 0| 0| 1|
| 0| 1| 2|
| 1| 0| 5|
| 1| 1| 9|
+------+------+---------+
Upvotes: 0
Reputation: 10382
Check below code.
scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._
scala> val colExpr = array(df.columns.zipWithIndex.map(c => struct(lit(c._2).as("col_name"),col(c._1).as("value"))):_*)
colExpr: org.apache.spark.sql.Column = array(named_struct(col_name, 0 AS `col_name`, NamePlaceholder(), a AS `value`), named_struct(col_name, 1 AS `col_name`, NamePlaceholder(), b AS `value`))
scala> df.withColumn("row_number",lit(row_number().over(Window.orderBy(lit(1)))-1)).withColumn("data",explode(colExpr)).select($"row_number",$"data.*").show(false)
+----------+--------+-----+
|row_number|col_name|value|
+----------+--------+-----+
|0 |0 |1 |
|0 |1 |2 |
|1 |0 |5 |
|1 |1 |9 |
+----------+--------+-----+
Upvotes: 1