nobody
nobody

Reputation: 11080

pyspark - TypeError: count() takes exactly 1 argument (2 given)

I am concatenating a bunch of columns and counting them.I can't count on an alias?

df.select(F.col("_c21"),F.concat(F.col("id1"),F.lit("|"),F.col("id2"),F.lit("|"),F.col("id3"),F.lit("|"),F.col("id4").alias("ids")))
df.repartition(col("_c21"])).count("ids").over(Window.partitionBy("_c21"))

Data looks like this

+--------------------+--------------------------------------------+
|                _c21|concat(id1, |, id2, |, id3, |, id4 AS `ids`)|
+--------------------+--------------------------------------------+
|roBMSlo...|                                  US|WA|98115|Centu...|
|3Vzlfim...|                                  FR|56|56130|SFR.....|
|rgBdftS...|                                  CA|NB|E1A|Bell Ca...|

Upvotes: 0

Views: 972

Answers (2)

mck
mck

Reputation: 42332

Use F.count, not the count method of dataframe (which counts total number of rows). Also no need to repartition because the window will do the partitioning anyway. And you also misplaced a bracket for the alias.

import pyspark.sql.functions as F

df1 = df.select(
    F.col("_c21"),
    F.concat(
        F.col("id1"),F.lit("|"),F.col("id2"),F.lit("|"),F.col("id3"),F.lit("|"),F.col("id4")
    ).alias("ids")    # misplaced close bracket here
)

df2 = df1.select(F.count("ids").over(Window.partitionBy("_c21")))

# or if you want an additional column, use 
df2 = df1.withColumn("count_id", F.count("ids").over(Window.partitionBy("_c21")))

In fact, concat_ws is more appropriate:

df1 = df.select(
    F.col("_c21"),
    F.concat_ws(
        "|",
        F.col("id1"), F.col("id2"), F.col("id3"), F.col("id4")
    ).alias("ids")
)

Upvotes: 1

stack0114106
stack0114106

Reputation: 8711

Go with spark-sql. It improves readability, portability, and also easy to debug.

Sample input:

df = spark.sql(""" with t1 (
 select  'roBMSlo' c1,   'US' c2,   'WA' c3,   '98115' c4,    'Centuy' c5    union all
 select  '3Vzlfim' c1,   'FR' c2,   '56' c3,   '56130' c4,   'SFR' c5    union all
 select  'rgBdftS' c1,   'CA'  c2,  'NB' c3,   'E1A' c4,   'Bell Ca'  c5    
  )  select   c1  _c21  ,   c2  id1   ,   c3 id2  ,   c4  id3   ,   c5 id4      from t1
""")
df.show(truncate=False)
df.createOrReplaceTempView("df")

+-------+---+---+-----+-------+
|_c21   |id1|id2|id3  |id4    |
+-------+---+---+-----+-------+
|roBMSlo|US |WA |98115|Centuy |
|3Vzlfim|FR |56 |56130|SFR    |
|rgBdftS|CA |NB |E1A  |Bell Ca|
+-------+---+---+-----+-------+

spark.sql("""
select _c21, ids, count(ids) over(partition by _c21) cw from (
select _c21, concat(id1,id2,id3,id4) ids  from df )
""").show()

+-------+---------------+---+
|   _c21|            ids| cw|
+-------+---------------+---+
|roBMSlo|USWA98115Centuy|  1|
|3Vzlfim|   FR5656130SFR|  1|
|rgBdftS| CANBE1ABell Ca|  1|
+-------+---------------+---+

If you want to join with a separator,

spark.sql("""
select _c21, ids, count(ids) over(partition by _c21) cw from (
select _c21, concat_ws("|",id1,id2,id3,id4) ids  from df )
""").show()

Upvotes: 1

Related Questions