Saikat
Saikat

Reputation: 443

How can I convert a input string into dictionary for each rows of a column in pyspark

I have a column values of a dataframe where I am receiving a string input like below where startIndex is the index of beginning of each character, end index is the end of occurrence of that character in the string and flag is the character itself.

    +---+------------------+
    | id|    Values        |
    +---+------------------+
    |01 |  AABBBAA         |
    |02 |  SSSAAAA         |
    +---+------------------+

Now I want to convert the string into dictionary for each rows as depicted below:

    +---+--------------------+
    | id|    Values          |
    +---+--------------------+
    |01 |  [{"startIndex":0, |
    |   |    "endIndex" : 1, | 
    |   |    "flag" : A },   |
    |   |   {"startIndex":2, |
    |   |    "endIndex" : 4, |
    |   |    "flag" : B },   |
    |   |   {"startIndex":5, |
    |   |    "endIndex" : 6, |
    |   |    "flag" : A }]   |
    |02 |  [{"startIndex":0, |
    |   |    "endIndex" : 2, |
    |   |    "flag" : S },   |
    |   |   {"startIndex":3, |
    |   |    "endIndex" : 6, |
    |   |    "flag" : A }]   |
    +---+--------------------+-

I have the pseudo code to frame the dictionary but not sure how to apply it to all the rows at one go without using loops. Also the problem with such approach is only the last framed dictionary is getting overwritten in all the rows


        import re
        x = "aaabbbbccaa"
        xs = re.findall(r"((.)\2*)", x)
        print(xs)
        start = 0
        output = '' 
        for item in xs:
            end = start + (len(item[0])-1)
            startIndex = start
            endIndex = end
            qualityFlag = item[1]
            print(startIndex, endIndex, qualityFlag)
            start = end+

Upvotes: 0

Views: 766

Answers (1)

jxc
jxc

Reputation: 13998

Using udf() to wrap up the code logic and to_json() to convert the array of structs into string:

from pyspark.sql.functions import udf, to_json
import re

df = spark.createDataFrame([
      ('01', 'AABBBAA')
    , ('02', 'SSSAAAA')
  ] , ['id', 'Values']
)

# argument `x` is a StringType() over the udf function
# return `row` as a list of dicts
@udf('array<struct<startIndex:long,endIndex:long,flag:string>>')
def set_fields(x):
    row = []
    for m in re.finditer(r'(.)\1*', x):
        row.append({
            'startIndex': m.start()
          , 'endIndex': m.end()-1
          , 'flag': m.group(1)
        })
    return row

df.select('id', to_json(set_fields('Values')).alias('Values')).show(truncate=False)
+---+----------------------------------------------------------------------------------------------------------------------------+
|id |Values                                                                                                                      |
+---+----------------------------------------------------------------------------------------------------------------------------+
|01 |[{"startIndex":0,"endIndex":1,"flag":"A"},{"startIndex":2,"endIndex":4,"flag":"B"},{"startIndex":5,"endIndex":6,"flag":"A"}]|
|02 |[{"startIndex":0,"endIndex":2,"flag":"S"},{"startIndex":3,"endIndex":6,"flag":"A"}]                                         |
+---+----------------------------------------------------------------------------------------------------------------------------+

Upvotes: 1

Related Questions