Ivan Lee
Ivan Lee

Reputation: 4261

add new column with condition and group

df = spark.createDataFrame(
    [
        ['A', '1', '3'],
        ['A', '2', '7'],
        ['A', '3', '1'],
        ['A', '1', '5'],
        ['A', '3', '4'],
        ['A', '5', '2'],
        ['B', '1', '8'],
        ['B', '2', '4'],
        ['B', '4', '2'],
        ['B', '6', '8']
    ],
    ['col1', 'col2', 'col3']
)
df.show()

Grouping by col1, and getting value of col2 as condition to add new column:

+----+------------+------------+
|col1|        col2|        col3|
+----+------------+------------+
|   A|   [1, 2, 3]|   [3, 7, 1]|
|   A|   [1, 3, 5]|   [5, 4, 2]|
|   B|[1, 2, 4, 6]|[8, 4, 2, 8]|
+----+------------+------------+

changed the content of the question, added one column to order those rows, if some value in this column are duplicate, please don't care the order of those rows:

df = spark.createDataFrame(
    [
        ['A', '1', '3','2'],
        ['A', '2', '7','2'],
        ['A', '3', '1','2'],
        ['A', '1', '5','3'],
        ['A', '3', '4','3'],
        ['A', '5', '2','4'],
        ['B', '1', '8','4'],
        ['B', '2', '4','5'],
        ['B', '4', '2','6'],
        ['B', '6', '8','7']
    ],
    ['col1', 'col2', 'col3', 'col4']
)
df.show()

Upvotes: 0

Views: 83

Answers (1)

Jonathan
Jonathan

Reputation: 2033

As you have a new column for sorting, you can use .sum() and Window to create a group column:

df = df.orderBy(['col1', 'col4', 'col2'])
df = df.withColumn(
    'group', 
    func.sum(func.when(func.col('col2')=="1", 1).otherwise(0)).over(Window.partitionBy(func.col('col1')).orderBy(func.asc(func.col('col4'))))
)

df.show()
+----+----+----+----+-----+
|col1|col2|col3|col4|group|
+----+----+----+----+-----+
|   A|   1|   3|   2|    1|
|   A|   2|   7|   2|    1|
|   A|   3|   1|   2|    1|
|   A|   1|   5|   3|    2|
|   A|   3|   4|   3|    2|
|   A|   5|   2|   4|    2|
|   B|   1|   8|   4|    1|
|   B|   2|   4|   5|    1|
|   B|   4|   2|   6|    1|
|   B|   6|   8|   7|    1|
+----+----+----+----+-----+

Then you can use the grouping and collect list:

df\
    .groupby('col1', 'group')\
    .agg(
        func.collect_list('col2').alias('col2'),
        func.collect_list('col3').alias('col3')
    )\
    .drop('group')\
    .show(10, False)
+----+------------+------------+
|col1|col2        |col3        |
+----+------------+------------+
|A   |[1, 2, 3]   |[3, 7, 1]   |
|A   |[1, 3, 5]   |[5, 4, 2]   |
|B   |[1, 2, 4, 6]|[8, 4, 2, 8]|
+----+------------+------------+

Upvotes: 1

Related Questions