Reputation: 897
I have the following data in a dataframe
col1 col2 col3 col4
1 desc1 v1 v3
2 desc2 v4 v2
1 desc1 v4 v2
2 desc2 v1 v3
I need only the first row of each unique combination of col1,col2 like below
Expected Output:
col1 col2 col3 col4
1 desc1 v1 v3
2 desc2 v4 v2
How can I achieve this in pyspark (version 1.3.1)?
I tried and achieved the same by converting the dataframe into an rdd and then applying map and reduceByKey functions and then converting back the result rdd into dataframe. Is there any other way to perform the above operation using dataframe functions?
Upvotes: 1
Views: 1512
Reputation: 330173
If you want an arbitrary row you can try to use first
or last
but it is far from pretty and I would seriously consider upgrading Spark:
from pyspark.sql.functions import col, first
df = sc.parallelize([
(1, "desc1", "v1", "v3"), (2, "desc2", "v4", "v2"),
(1, "desc1", "v4", "v2"), (2, "desc2", "v1", "v3")
]).toDF(["col1", "col2", "col3", "col4"])
keys = ["col1", "col2"]
values = ["col3", "col4"]
agg_exprs = [first(c).alias(c) for c in keys + ["vs_"]]
select_exprs = keys + [
"vs_.col{0} AS {1}".format(i + 1, v) for (i, v) in enumerate(values)]
df_not_so_first = (df
.selectExpr("struct({}) AS vs_".format(",".join(values)), *keys)
.groupBy(*keys)
.agg(*agg_exprs)
.selectExpr(*select_exprs))
Note that in this particular context first
doesn't choose any specific row and results may not be deterministic. Moreover, depending on a Spark version, individual aggregations can be scheduled separately. It means that
df.groupBy("col1", "col2").agg(first("col3"), first("col4"))
doesn't guarantee col3
and col4
will be selected from the same row.
Upvotes: 2