Reputation: 4261
df = spark.createDataFrame(
[
['A', '1', '3'],
['A', '2', '7'],
['A', '3', '1'],
['A', '1', '5'],
['A', '3', '4'],
['A', '5', '2'],
['B', '1', '8'],
['B', '2', '4'],
['B', '4', '2'],
['B', '6', '8']
],
['col1', 'col2', 'col3']
)
df.show()
Grouping by col1, and getting value of col2 as condition to add new column:
+----+------------+------------+
|col1| col2| col3|
+----+------------+------------+
| A| [1, 2, 3]| [3, 7, 1]|
| A| [1, 3, 5]| [5, 4, 2]|
| B|[1, 2, 4, 6]|[8, 4, 2, 8]|
+----+------------+------------+
changed the content of the question, added one column to order those rows, if some value in this column are duplicate, please don't care the order of those rows:
df = spark.createDataFrame(
[
['A', '1', '3','2'],
['A', '2', '7','2'],
['A', '3', '1','2'],
['A', '1', '5','3'],
['A', '3', '4','3'],
['A', '5', '2','4'],
['B', '1', '8','4'],
['B', '2', '4','5'],
['B', '4', '2','6'],
['B', '6', '8','7']
],
['col1', 'col2', 'col3', 'col4']
)
df.show()
Upvotes: 0
Views: 83
Reputation: 2033
As you have a new column for sorting, you can use .sum()
and Window
to create a group
column:
df = df.orderBy(['col1', 'col4', 'col2'])
df = df.withColumn(
'group',
func.sum(func.when(func.col('col2')=="1", 1).otherwise(0)).over(Window.partitionBy(func.col('col1')).orderBy(func.asc(func.col('col4'))))
)
df.show()
+----+----+----+----+-----+
|col1|col2|col3|col4|group|
+----+----+----+----+-----+
| A| 1| 3| 2| 1|
| A| 2| 7| 2| 1|
| A| 3| 1| 2| 1|
| A| 1| 5| 3| 2|
| A| 3| 4| 3| 2|
| A| 5| 2| 4| 2|
| B| 1| 8| 4| 1|
| B| 2| 4| 5| 1|
| B| 4| 2| 6| 1|
| B| 6| 8| 7| 1|
+----+----+----+----+-----+
Then you can use the grouping and collect list:
df\
.groupby('col1', 'group')\
.agg(
func.collect_list('col2').alias('col2'),
func.collect_list('col3').alias('col3')
)\
.drop('group')\
.show(10, False)
+----+------------+------------+
|col1|col2 |col3 |
+----+------------+------------+
|A |[1, 2, 3] |[3, 7, 1] |
|A |[1, 3, 5] |[5, 4, 2] |
|B |[1, 2, 4, 6]|[8, 4, 2, 8]|
+----+------------+------------+
Upvotes: 1