Reputation: 123
I have a spark dataframe with two columns (time_stamp and message),as shown below:
Example spark dataframe
message time_stamp
irrelevant_text Startstring [ID: 1AB] 2015-01-23 08:23:16
some irrelevant text 2015-01-23 08:24:20
irrelevant_text mandatorystring ID [1AB] 2015-01-23 08:25:32
some irrelevant text 2015-01-23 08:27:18
contributor XYZ_ABCD 2015-01-23 08:27:54
some irrelevant text 2015-01-23 08:28:36
irrelevant_text endstring [ID: 1AB] 2015-01-23 08:30:47
some irrelevant text 2015-01-23 08:24:20
irrelevant_text Startstring [ID: 2BC] 2015-01-23 10:05:16
some irrelevant text 2015-01-23 10:24:20
contributor LMN_EFG_X 2015-01-23 10:27:21
some irrelevant text 2015-01-23 10:28:34
irrelevant_text endstring [ID: 2BC] 2015-01-23 10:30:47
some irrelevant text 2015-01-23 10:50:20
irrelevant_text Startstring [ID: 3DE] 2015-01-23 12:21:16
some irrelevant text 2015-01-23 12:24:20
irrelevant_text mandatorystring ID [3DE] 2015-01-23 12:37:32
some irrelevant text 2015-01-23 12:45:18
contributor PQRS_STU_wtx 2015-01-23 12:47:05
some irrelevant text 2015-01-23 12:48:33
irrelevant_text endstring [ID: 3DE] 2015-01-23 12:59:47
I am looking to extract contributor appeared in between Startstring and endstring if mandatorystring exists between Startstring and endstring and discarding the contributors if mandatorystring doesn't exists between Startstring and endstring. There may be multiple such instances in one Date.
Expected Output:
time_stamp contributor
2015-01-23 08:27:54 XYZ_ABCD
2015-01-23 12:47:05 PQRS_STU_wtx
For reading the text file I used following command.
df = spark.read.format("com.databricks.spark.csv").option("inferSchema", "false").schema(schema).option("delimiter", "\t").load('{}'.format(fileName))
Upvotes: 0
Views: 1092
Reputation: 10362
Use window
functions.
Try below code.
Import required libraries.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
Loading data into data frame.
df = spark.read.format("csv").option("header","true").load("/tmp/data/sample.csv")
df \
.withColumn("subMessage", \
F.when(F.col("message").contains("Startstring"),F.lit("start"))\
.when(F.col("message").contains("mandatorystring"),F.lit("mandatory")) \
.when(F.col("message").contains("contributor"),F.regexp_replace(F.col("message"),"contributor ","")) \
.when(F.col("message").contains("endstring"),F.lit("end"))\
) \
.filter(F.col("subMessage").isNotNull()) \
.withColumn("iscontributor",((F.lead(F.col("subMessage"),1).over(Window.orderBy(F.lit(1))) == "end") & (F.lag(F.col("subMessage"),1).over(Window.orderBy(F.lit(1))) == "mandatory"))) \
.filter(F.col("iscontributor") == True) \
.show()
Final Output.
+--------------------+-------------------+------------+-------------+
| message| time_stamp| subMessage|iscontributor|
+--------------------+-------------------+------------+-------------+
|contributor XYZ_ABCD|2015-01-23 08:27:54| XYZ_ABCD| true|
|contributor PQRS_...|2015-01-23 12:47:05|PQRS_STU_wtx| true|
+--------------------+-------------------+------------+-------------+
Upvotes: 1
Reputation: 42352
Filter the groups of messages that are valid (those containing "mandatory"), and get the messages containing "contributor" from the valid message groups.
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'begin',
F.last(
F.when(F.col('message').rlike('Startstring'), F.col('time_stamp')), True
).over(Window.orderBy('time_stamp'))
).withColumn(
'end',
F.first(
F.when(F.col('message').rlike('Endstring'), F.col('time_stamp')), True
).over(Window.orderBy('time_stamp').rowsBetween(0, Window.unboundedFollowing))
).withColumn(
'mandatory',
F.sum(
F.col('message').rlike('mandatory').cast('int')
).over(Window.partitionBy('begin', 'end'))
).filter(
"mandatory >= 1 and message rlike 'contributor'"
).select(
'time_stamp',
F.regexp_extract('message', 'contributor (\S+)', 1).alias('contributor')
)
df2.show()
+-------------------+------------+
| time_stamp| contributor|
+-------------------+------------+
|2015-01-23 08:27:54| XYZ_ABCD|
|2015-01-23 12:47:05|PQRS_STU_wtx|
+-------------------+------------+
Upvotes: 1