Shivam Gupta
Shivam Gupta

Reputation: 193

Column value length validation in pyspark

I am having a data frame like:

| a    | b   | c    | d  |
|-------------------|----|
| abcd | gs  | abmj | a  |
| hgj  | vxb | basn | f  |
| hg   | hj  | ndks | dd |

i want to filter which column have wong validation like if all columns length like: length a = 3, b <= 2, c <= 4, d = 1

i want final data frame as :

| a    | b   | c    | d  |error|
|-------------------|----|-----|
| abcd | gs  | abmj | a  | a   |
| hgj  | vxb | basn | f  | b   |
| hg   | hj  | ndks | dd | a,d |

here error column is which column having wrong data.

how to validate this please help me out. Thanks.

Upvotes: 1

Views: 1121

Answers (1)

samkart
samkart

Reputation: 6654

you can use case when (when().otherwise()) to check the conditions and return the column name. this can be generalized (sped up) if you create a dict with the columns and their conditions.

see below example.

# dict with columns and their conditions
cond_dict = {
    'a': '= 3',
    'b': '<= 2',
    'c': '<= 4',
    'd': '= 1'
}

import pyspark.sql.functions as func

# list comprehension to create case whens for each column condition 
# that returns the column name if condition is not met
conditions = [func.when(func.expr('length({0}) {1}'.format(k, v)) == False, func.lit(k))
              for k, v in cond_dict.items()]

# [Column<'CASE WHEN ((length(a) = 3) = false) THEN a END'>,
#  Column<'CASE WHEN ((length(b) <= 2) = false) THEN b END'>,
#  Column<'CASE WHEN ((length(c) <= 4) = false) THEN c END'>,
#  Column<'CASE WHEN ((length(d) = 1) = false) THEN d END'>]

# use the list of case whens within a `concat_ws` with "," delimiter
data_sdf. \
    withColumn('error', func.concat_ws(',', *conditions)). \
    show()

# +----+---+----+---+-----+
# |   a|  b|   c|  d|error|
# +----+---+----+---+-----+
# |abcd| gs|abmj|  a|    a|
# | hgj|vxb|basn|  f|    b|
# |  hg| hj|ndks| dd|  a,d|
# +----+---+----+---+-----+

Upvotes: 3

Related Questions