Pyspark create new column extracting info with regex

I want to extract relevant information from the text column of a DataFrame using regular expressions.

If I have this DataFrame:

+-------------------------------------------+
|text                                       |
+-------------------------------------------+
|Hello this is a test +34666666666 677777777|
|Hello this a test 44442222M 33335555C      |
+-------------------------------------------+

I would like to have the following output:

+-------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                       |match                                                                                                                                               |
+-------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|Hello this is a test +34666666666 677777777|[(PHONE,u'34666666666',u'677777777')]|
|Hello this a test 44442222M 33335555C      |[(DNI, u'44442222M',u'33335555C')]                                                                                                      |
+-------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+

I've try using this:

pat_list = [['PHONE','\\b((\+?34([ \t|\-])?)?[9|6|7]((\d{1}([ \t|\-])?[0-9]{3})|(\d{2}([ \t|\-])?'
        '[0-9]{2}))([ \t|\-])?[0-9]{2}([ \t|\-])?[0-9]{2})\\b'],
['DNI','\\b(\d{8})([A-Z])\\b']]

l1 = ['Hello this is a test +34666666666 677777777','Hello this a test 44442222M 33335555C']
l = zip(l1)
df = spark.createDataFrame(l,['text'])
rdd = df.rdd.map(list)

def parse_pat(row, col_number, patterns):
    column = row[col_number]
    hit_words = []
    for patron in patterns:
        patron_comp = re.compile(patron[1], re.IGNORECASE)
        match = patron_comp.search(column)
        if match:
            hit_words.append(patron[0])
            find= re.findall(patron[1], column, re.DOTALL)
            hit_words.append(' '.join(str(x) for x in find))
    myrow = list(row)
    myrow.append(hit_words)
    return myrow

rdd_parse_pat = rdd.map(lambda row: (parse_pat(row, col_number=0, patterns=pat_list)))
df_out = rdd_parse_pat.toDF(['text','match'])

But I get this:

df_out.show(truncate=False)

+-------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                       |match                                                                                                                                               |
+-------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|Hello this is a test +34666666666 677777777|[PHONE, (u'34666666666', u'34', u'', u'6666', u'6666', u'', u'', u'', u'', u'') (u'677777777', u'', u'', u'7777', u'7777', u'', u'', u'', u'', u'')]|
|Hello this a test 44442222M 33335555C      |[DNI, (u'44442222', u'M') (u'33335555', u'C')]                                                                                                      |
+-------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+

Does anyone know how could I do it?

Upvotes: 1

Views: 912

Answers (1)

C. Gonzalez V
C. Gonzalez V

Reputation: 26

You are joining all possible groups for each match... try with this:

def parse_pat(row, col_number, patterns):
    column = row[col_number]
    hit_words = {}
    for patron in patterns:
        patron_comp = re.compile(patron[1], re.IGNORECASE|re.DOTALL)
        start_pos = 0
        while True:
            match = patron_comp.search(column, start_pos)
            if match:
                key = patron[0]  #  Match name 
                value = match.group(0)  # Match value
                if key not in hit_words:
                    hit_words[key] = [value]  #  First match of its name
                else:
                    hit_words[key] = hit_words[key] + [value]   # Append to previous match
                start_pos = match.end(0)  # To look for other possible matches
            else:
                break  # There is no more matches.
    myrow = list(row)
    myrow.append(hit_words)
    return myrow

It creates a dict for each possible pattern.
- Key: pattern's name (PHONE, DNI ...)
- Value: list with all matches.

Result:

+-------------------------------------------+--------------------------------------------------+
|text                                       |match                                             |
+-------------------------------------------+--------------------------------------------------+
|Hello this is a test +34666666666 677777777|Map(PHONE -> WrappedArray(34666666666, 677777777))|
|Hello this a test 44442222M 33335555C      |Map(DNI -> WrappedArray(44442222M, 33335555C))    |
+-------------------------------------------+--------------------------------------------------+

Upvotes: 1

Related Questions