Reputation: 11080
I am concatenating a bunch of columns and counting them.I can't count on an alias?
df.select(F.col("_c21"),F.concat(F.col("id1"),F.lit("|"),F.col("id2"),F.lit("|"),F.col("id3"),F.lit("|"),F.col("id4").alias("ids")))
df.repartition(col("_c21"])).count("ids").over(Window.partitionBy("_c21"))
Data looks like this
+--------------------+--------------------------------------------+
| _c21|concat(id1, |, id2, |, id3, |, id4 AS `ids`)|
+--------------------+--------------------------------------------+
|roBMSlo...| US|WA|98115|Centu...|
|3Vzlfim...| FR|56|56130|SFR.....|
|rgBdftS...| CA|NB|E1A|Bell Ca...|
Upvotes: 0
Views: 972
Reputation: 42332
Use F.count
, not the count
method of dataframe (which counts total number of rows). Also no need to repartition because the window will do the partitioning anyway. And you also misplaced a bracket for the alias.
import pyspark.sql.functions as F
df1 = df.select(
F.col("_c21"),
F.concat(
F.col("id1"),F.lit("|"),F.col("id2"),F.lit("|"),F.col("id3"),F.lit("|"),F.col("id4")
).alias("ids") # misplaced close bracket here
)
df2 = df1.select(F.count("ids").over(Window.partitionBy("_c21")))
# or if you want an additional column, use
df2 = df1.withColumn("count_id", F.count("ids").over(Window.partitionBy("_c21")))
In fact, concat_ws
is more appropriate:
df1 = df.select(
F.col("_c21"),
F.concat_ws(
"|",
F.col("id1"), F.col("id2"), F.col("id3"), F.col("id4")
).alias("ids")
)
Upvotes: 1
Reputation: 8711
Go with spark-sql. It improves readability, portability, and also easy to debug.
Sample input:
df = spark.sql(""" with t1 (
select 'roBMSlo' c1, 'US' c2, 'WA' c3, '98115' c4, 'Centuy' c5 union all
select '3Vzlfim' c1, 'FR' c2, '56' c3, '56130' c4, 'SFR' c5 union all
select 'rgBdftS' c1, 'CA' c2, 'NB' c3, 'E1A' c4, 'Bell Ca' c5
) select c1 _c21 , c2 id1 , c3 id2 , c4 id3 , c5 id4 from t1
""")
df.show(truncate=False)
df.createOrReplaceTempView("df")
+-------+---+---+-----+-------+
|_c21 |id1|id2|id3 |id4 |
+-------+---+---+-----+-------+
|roBMSlo|US |WA |98115|Centuy |
|3Vzlfim|FR |56 |56130|SFR |
|rgBdftS|CA |NB |E1A |Bell Ca|
+-------+---+---+-----+-------+
spark.sql("""
select _c21, ids, count(ids) over(partition by _c21) cw from (
select _c21, concat(id1,id2,id3,id4) ids from df )
""").show()
+-------+---------------+---+
| _c21| ids| cw|
+-------+---------------+---+
|roBMSlo|USWA98115Centuy| 1|
|3Vzlfim| FR5656130SFR| 1|
|rgBdftS| CANBE1ABell Ca| 1|
+-------+---------------+---+
If you want to join with a separator,
spark.sql("""
select _c21, ids, count(ids) over(partition by _c21) cw from (
select _c21, concat_ws("|",id1,id2,id3,id4) ids from df )
""").show()
Upvotes: 1