Gayatri
Gayatri

Reputation: 2253

PySpark 2 - Combine Records from multiple rows

I have a text file which has the records as follows:

<BR>Datetime:2018.06.30^
Name:ABC^
Se:4^
Machine:XXXXXXX^
InnerTrace:^
AdditionalInfo:^
<ER>
<BR>Datetime:2018.05.30-EDT^
Name:DEF^
Se:4^
Machine:XXXXXXX^
InnerTrace:^
AdditionalInfo:^
<ER>

I am trying to read this into spark and process the file so that the result would be:

Datetime     Name  Se  Machine  InnerTrace  AdditionalInfo
2018.06.30   ABC   4   XXXXXXX      
2018.05.30   DEF   4   XXXXXXX

When I try reading the file into spark with

sparkSession.read.csv("filename") 

I get each row as separate which makes it difficult to put all the rows between < BR > and < ER > together. Is there any easy workaround this?

I am doing this using PySpark 2.

Upvotes: 0

Views: 440

Answers (1)

pault
pault

Reputation: 43504

This file format isn't something that's spark friendly. If you can't modify the file, you're going to have to do a fair amount of processing to get it the way you want.

Here is one approach that may work for you-

Read the File

Suppose you have the following DataFrame:

df = spark.read.csv(path="filename", quote='')
df.show(truncate=False)
#+-----------------------------+
#|_c0                          |
#+-----------------------------+
#|"<BR>Datetime:2018.06.30^    |
#|Name:ABC^                    |
#|Se:4^                        |
#|Machine:XXXXXXX^             |
#|InnerTrace:^                 |
#|AdditionalInfo:^             |
#|<ER>"                        |
#|"<BR>Datetime:2018.05.30-EDT^|
#|Name:DEF^                    |
#|Se:4^                        |
#|Machine:XXXXXXX^             |
#|InnerTrace:^                 |
#|AdditionalInfo:^             |
#|<ER>"                        |
#+-----------------------------+

Add a column to separate rows into groups of records

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.orderBy("id").rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn("group", f.col("_c0").rlike('^"<BR>.+').cast("int"))
df = df.withColumn("id", f.monotonically_increasing_id())
df = df.withColumn("group", f.sum("group").over(w)).drop("id")
df.show(truncate=False)
#+-----------------------------+-----+
#|_c0                          |group|
#+-----------------------------+-----+
#|"<BR>Datetime:2018.06.30^    |1    |
#|Name:ABC^                    |1    |
#|Se:4^                        |1    |
#|Machine:XXXXXXX^             |1    |
#|InnerTrace:^                 |1    |
#|AdditionalInfo:^             |1    |
#|<ER>"                        |1    |
#|"<BR>Datetime:2018.05.30-EDT^|2    |
#|Name:DEF^                    |2    |
#|Se:4^                        |2    |
#|Machine:XXXXXXX^             |2    |
#|InnerTrace:^                 |2    |
#|AdditionalInfo:^             |2    |
#|<ER>"                        |2    |
#+-----------------------------+-----+

Use Regex to clean and split strings

df = df.select(
    "group",
    f.regexp_replace(pattern=r'(^"<BR>|<ER>"$|\^$)', replacement='', str="_c0").alias("col")
).where(f.col("col") != '')
df = df.select("group", f.split("col", ":").alias("split"))

df.show(truncate=False)
#+-----+--------------------------+
#|group|split                     |
#+-----+--------------------------+
#|1    |[Datetime, 2018.06.30]    |
#|1    |[Name, ABC]               |
#|1    |[Se, 4]                   |
#|1    |[Machine, XXXXXXX]        |
#|1    |[InnerTrace, ]            |
#|1    |[AdditionalInfo, ]        |
#|2    |[Datetime, 2018.05.30-EDT]|
#|2    |[Name, DEF]               |
#|2    |[Se, 4]                   |
#|2    |[Machine, XXXXXXX]        |
#|2    |[InnerTrace, ]            |
#|2    |[AdditionalInfo, ]        |
#+-----+--------------------------+

Extract elements from array, groupby, and pivot

df = df.select(
        "group",
        f.col("split").getItem(0).alias("key"),
        f.col("split").getItem(1).alias("value")
    )\
    .groupBy("group").pivot("key").agg(f.first("value"))\
    .drop("group")
df.show(truncate=False)
#+--------------+--------------+----------+-------+----+---+
#|AdditionalInfo|Datetime      |InnerTrace|Machine|Name|Se |
#+--------------+--------------+----------+-------+----+---+
#|              |2018.06.30    |          |XXXXXXX|ABC |4  |
#|              |2018.05.30-EDT|          |XXXXXXX|DEF |4  |
#+--------------+--------------+----------+-------+----+---+

Upvotes: 2

Related Questions