Murali
Murali

Reputation: 65

Spark filter multiple group of rows to a single row

I am trying to acheive the following,

Lets say I have a dataframe with the following columns

id  | name  | alias
-------------------
1   | abc   | short
1   | abc   | ailas-long-1
1   | abc   | another-long-alias
2   | xyz   | short_alias
2   | xyz   | same_length
3   | def   | alias_1

I want to groupby id and name and select the shorter alias,

The output I am expecting is

id  | name  | alias
-------------------
1   | abc   | short
2   | xyz   | short_alias
3   | def   | alias_1

I can achevie this using window and row_number, is there anyother efficient method to get the same result. In general, the thrid column filter condition can be anything in this case its the length of the field.

Any help would be much appreciated.

Thank you.

Upvotes: 0

Views: 1373

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

All you need to do is use length inbuilt function and use that in window function as

from pyspark.sql import functions as f
from pyspark.sql import Window

windowSpec = Window.partitionBy('id', 'name').orderBy('length')

df.withColumn('length', f.length('alias'))\
    .withColumn('length', f.row_number().over(windowSpec))\
    .filter(f.col('length') == 1)\
    .drop('length')\
    .show(truncate=False)

which should give you

+---+----+-----------+
|id |name|alias      |
+---+----+-----------+
|3  |def |alias_1    |
|1  |abc |short      |
|2  |xyz |short_alias|
+---+----+-----------+

Upvotes: 2

user3689574
user3689574

Reputation: 1676

A solution without window (Not very pretty..) and the easiest, in my opinion, rdd solution:

from pyspark.sql import functions as F
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)

rdd = sc.parallelize([(1   , "abc"   , "short-alias"),
                     (1   , "abc"   , "short"),
                         (1   , "abc"   , "ailas-long-1"),
                         (1   , "abc"   , "another-long-alias"),
                         (2   , "xyz"   , "same_length"),
                         (2   , "xyz"   , "same_length1"),
                         (3   , "def"   , "short_alias") ])

df = hiveCtx.createDataFrame(\
rdd, ["id", "name", "alias"])

len_df = df.groupBy(["id", "name"]).agg(F.min(F.length("alias")).alias("alias_len"))

df = df.withColumn("alias_len", F.length("alias"))

cond = ["alias_len", "id", "name"]

df.join(len_df, cond).show()

print rdd.map(lambda x: ((x[0], x[1]), x[2]))\
    .reduceByKey(lambda x,y: x if len(x) < len(y) else y ).collect()

Output:

+---------+---+----+-----------+
|alias_len| id|name|      alias|
+---------+---+----+-----------+
|       11|  3| def|short_alias|
|       11|  2| xyz|same_length|
|        5|  1| abc|      short|
+---------+---+----+-----------+

[((2, 'xyz'), 'same_length'), ((3, 'def'), 'short_alias'), ((1, 'abc'), 'short')]

Upvotes: 0

Related Questions