Dataholic
Dataholic

Reputation: 123

Extract text in between two strings if a third string is also present in between those two strings- Pyspark

I have a spark dataframe with two columns (time_stamp and message),as shown below:

Example spark dataframe

      message                                             time_stamp
irrelevant_text Startstring [ID: 1AB]                   2015-01-23 08:23:16
some irrelevant text                                    2015-01-23 08:24:20
irrelevant_text mandatorystring ID [1AB]                2015-01-23 08:25:32
some irrelevant text                                    2015-01-23 08:27:18
contributor XYZ_ABCD                                    2015-01-23 08:27:54
some irrelevant text                                    2015-01-23 08:28:36
irrelevant_text endstring [ID: 1AB]                     2015-01-23 08:30:47
some irrelevant text                                    2015-01-23 08:24:20
irrelevant_text Startstring [ID: 2BC]                   2015-01-23 10:05:16
some irrelevant text                                    2015-01-23 10:24:20
contributor LMN_EFG_X                                   2015-01-23 10:27:21
some irrelevant text                                    2015-01-23 10:28:34
irrelevant_text endstring [ID: 2BC]                     2015-01-23 10:30:47
some irrelevant text                                    2015-01-23 10:50:20
irrelevant_text Startstring [ID: 3DE]                   2015-01-23 12:21:16
some irrelevant text                                    2015-01-23 12:24:20
irrelevant_text mandatorystring ID [3DE]                2015-01-23 12:37:32
some irrelevant text                                    2015-01-23 12:45:18
contributor PQRS_STU_wtx                                2015-01-23 12:47:05
some irrelevant text                                    2015-01-23 12:48:33
irrelevant_text endstring [ID: 3DE]                     2015-01-23 12:59:47

I am looking to extract contributor appeared in between Startstring and endstring if mandatorystring exists between Startstring and endstring and discarding the contributors if mandatorystring doesn't exists between Startstring and endstring. There may be multiple such instances in one Date.

Expected Output:

time_stamp                 contributor
2015-01-23 08:27:54         XYZ_ABCD                                    
2015-01-23 12:47:05       PQRS_STU_wtx                                

For reading the text file I used following command.

df = spark.read.format("com.databricks.spark.csv").option("inferSchema", "false").schema(schema).option("delimiter", "\t").load('{}'.format(fileName))

Upvotes: 0

Views: 1092

Answers (2)

s.polam
s.polam

Reputation: 10362

Use window functions.

Try below code.

Import required libraries.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

Loading data into data frame.

df = spark.read.format("csv").option("header","true").load("/tmp/data/sample.csv")
df \
.withColumn("subMessage", \
    F.when(F.col("message").contains("Startstring"),F.lit("start"))\ 
    .when(F.col("message").contains("mandatorystring"),F.lit("mandatory")) \
    .when(F.col("message").contains("contributor"),F.regexp_replace(F.col("message"),"contributor ","")) \
    .when(F.col("message").contains("endstring"),F.lit("end"))\
) \
.filter(F.col("subMessage").isNotNull()) \
.withColumn("iscontributor",((F.lead(F.col("subMessage"),1).over(Window.orderBy(F.lit(1))) == "end") & (F.lag(F.col("subMessage"),1).over(Window.orderBy(F.lit(1))) == "mandatory"))) \
.filter(F.col("iscontributor") == True) \
.show()

Final Output.

+--------------------+-------------------+------------+-------------+
|             message|         time_stamp|  subMessage|iscontributor|
+--------------------+-------------------+------------+-------------+
|contributor XYZ_ABCD|2015-01-23 08:27:54|    XYZ_ABCD|         true|
|contributor PQRS_...|2015-01-23 12:47:05|PQRS_STU_wtx|         true|
+--------------------+-------------------+------------+-------------+

Upvotes: 1

mck
mck

Reputation: 42352

Filter the groups of messages that are valid (those containing "mandatory"), and get the messages containing "contributor" from the valid message groups.

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'begin',
    F.last(
        F.when(F.col('message').rlike('Startstring'), F.col('time_stamp')), True
    ).over(Window.orderBy('time_stamp'))
).withColumn(
    'end',
    F.first(
        F.when(F.col('message').rlike('Endstring'), F.col('time_stamp')), True
    ).over(Window.orderBy('time_stamp').rowsBetween(0, Window.unboundedFollowing))
).withColumn(
    'mandatory',
    F.sum(
        F.col('message').rlike('mandatory').cast('int')
    ).over(Window.partitionBy('begin', 'end'))
).filter(
    "mandatory >= 1 and message rlike 'contributor'"
).select(
    'time_stamp',
    F.regexp_extract('message', 'contributor (\S+)', 1).alias('contributor')
)

df2.show()
+-------------------+------------+
|         time_stamp| contributor|
+-------------------+------------+
|2015-01-23 08:27:54|    XYZ_ABCD|
|2015-01-23 12:47:05|PQRS_STU_wtx|
+-------------------+------------+

Upvotes: 1

Related Questions