ghukill
ghukill

Reputation: 1222

Pyspark DataFrame select rows with distinct values, and rows with non-distinct values

Suppose I have a pyspark DataFrame (DF):

-----------------------------
record_id | foo | bar
-----------------------------
1 | random text | random text
2 | random text | random text
3 | random text | random text
1 | random text | random text
2 | random text | random text
-----------------------------

My end goal is to write these rows to MySQL with .write.jdbc(), which I've been doing successfully. But now, before doing so, adding a new column unique based on the uniqueness of the record_id column.

I've made a bit of progress identifying unique record_id's with something similar:

df.select('record_id').distinct().rdd.map(lambda r: r[0])

But unlike Panda's DataFrames, I don't believe this has an index I can reuse, it appears to just be the values. I'm still fairly new to Spark/Pyspark.

Would it make sense to try and figure out the following workflow?

  1. Identify rows with distinct record_id, and write to MySQL
  2. Then, identify the remaining rows, and write to MySQL

Or would it be possible to alter the original DF, adding a new column unique based on some chained commands? Something like the following, which I could then write to MySQL wholesale:

----------------------------------
record_id | foo | bar | unique 
----------------------------------
1 | random text | random text | 0
2 | random text | random text | 0
3 | random text | random text | 1 # where 1 for boolean True
1 | random text | random text | 0
2 | random text | random text | 0
----------------------------------

Any suggestions or advice would be much appreciated!

Upvotes: 2

Views: 4663

Answers (1)

akuiper
akuiper

Reputation: 215117

You can count the number of rows partitionBy record_id, if the record_id has only one row, mark it as unique:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

df.withColumn("unique", (F.count("record_id").over(Window.partitionBy("record_id")) == 1).cast('integer')).show()
+---------+-----------+-----------+------+
|record_id|        foo|        bar|unique|
+---------+-----------+-----------+------+
|        3|random text|random text|     1|
|        1|random text|random text|     0|
|        1|random text|random text|     0|
|        2|random text|random text|     0|
|        2|random text|random text|     0|
+---------+-----------+-----------+------+

Upvotes: 3

Related Questions