Reputation: 2253
I have a text file which has the records as follows:
<BR>Datetime:2018.06.30^
Name:ABC^
Se:4^
Machine:XXXXXXX^
InnerTrace:^
AdditionalInfo:^
<ER>
<BR>Datetime:2018.05.30-EDT^
Name:DEF^
Se:4^
Machine:XXXXXXX^
InnerTrace:^
AdditionalInfo:^
<ER>
I am trying to read this into spark and process the file so that the result would be:
Datetime Name Se Machine InnerTrace AdditionalInfo
2018.06.30 ABC 4 XXXXXXX
2018.05.30 DEF 4 XXXXXXX
When I try reading the file into spark with
sparkSession.read.csv("filename")
I get each row as separate which makes it difficult to put all the rows between < BR > and < ER > together. Is there any easy workaround this?
I am doing this using PySpark 2.
Upvotes: 0
Views: 440
Reputation: 43504
This file format isn't something that's spark friendly. If you can't modify the file, you're going to have to do a fair amount of processing to get it the way you want.
Here is one approach that may work for you-
Suppose you have the following DataFrame:
df = spark.read.csv(path="filename", quote='')
df.show(truncate=False)
#+-----------------------------+
#|_c0 |
#+-----------------------------+
#|"<BR>Datetime:2018.06.30^ |
#|Name:ABC^ |
#|Se:4^ |
#|Machine:XXXXXXX^ |
#|InnerTrace:^ |
#|AdditionalInfo:^ |
#|<ER>" |
#|"<BR>Datetime:2018.05.30-EDT^|
#|Name:DEF^ |
#|Se:4^ |
#|Machine:XXXXXXX^ |
#|InnerTrace:^ |
#|AdditionalInfo:^ |
#|<ER>" |
#+-----------------------------+
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.orderBy("id").rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn("group", f.col("_c0").rlike('^"<BR>.+').cast("int"))
df = df.withColumn("id", f.monotonically_increasing_id())
df = df.withColumn("group", f.sum("group").over(w)).drop("id")
df.show(truncate=False)
#+-----------------------------+-----+
#|_c0 |group|
#+-----------------------------+-----+
#|"<BR>Datetime:2018.06.30^ |1 |
#|Name:ABC^ |1 |
#|Se:4^ |1 |
#|Machine:XXXXXXX^ |1 |
#|InnerTrace:^ |1 |
#|AdditionalInfo:^ |1 |
#|<ER>" |1 |
#|"<BR>Datetime:2018.05.30-EDT^|2 |
#|Name:DEF^ |2 |
#|Se:4^ |2 |
#|Machine:XXXXXXX^ |2 |
#|InnerTrace:^ |2 |
#|AdditionalInfo:^ |2 |
#|<ER>" |2 |
#+-----------------------------+-----+
df = df.select(
"group",
f.regexp_replace(pattern=r'(^"<BR>|<ER>"$|\^$)', replacement='', str="_c0").alias("col")
).where(f.col("col") != '')
df = df.select("group", f.split("col", ":").alias("split"))
df.show(truncate=False)
#+-----+--------------------------+
#|group|split |
#+-----+--------------------------+
#|1 |[Datetime, 2018.06.30] |
#|1 |[Name, ABC] |
#|1 |[Se, 4] |
#|1 |[Machine, XXXXXXX] |
#|1 |[InnerTrace, ] |
#|1 |[AdditionalInfo, ] |
#|2 |[Datetime, 2018.05.30-EDT]|
#|2 |[Name, DEF] |
#|2 |[Se, 4] |
#|2 |[Machine, XXXXXXX] |
#|2 |[InnerTrace, ] |
#|2 |[AdditionalInfo, ] |
#+-----+--------------------------+
df = df.select(
"group",
f.col("split").getItem(0).alias("key"),
f.col("split").getItem(1).alias("value")
)\
.groupBy("group").pivot("key").agg(f.first("value"))\
.drop("group")
df.show(truncate=False)
#+--------------+--------------+----------+-------+----+---+
#|AdditionalInfo|Datetime |InnerTrace|Machine|Name|Se |
#+--------------+--------------+----------+-------+----+---+
#| |2018.06.30 | |XXXXXXX|ABC |4 |
#| |2018.05.30-EDT| |XXXXXXX|DEF |4 |
#+--------------+--------------+----------+-------+----+---+
Upvotes: 2