Lekshmi
Lekshmi

Reputation: 81

Splitting an input log file in Pyspark dataframe

I have a log file which I need to split using Pyspark Dataframe .Below is my sample log file

20/06/25 12:19:33 INFO datasources.FileScanRDD: Reading File path: hdfs://bpaiddev/dev/data/warehouse/clean/falcon/ukc/masked_data/parquet/FRAUD_CUSTOMER_INFORMATION/rcd_crt_dttm_yyyymmdd=20200523/part-0042-ed52abc2w.c000.snapp.parquet, range:0-27899, partition values :[20200523]
20/06/25 12:19:34 INFO executor.EXECUTOR: Finished task 18.0 in stage 0.0 (TID 18),18994 bytes result sent to driver

From the log sample you can see the first line has more details compared to the second line. I want Timestamp, Status ,Message,Range,Value columns for the first line , for the second line I can have only Timestamp,Status,Message columns.

How to apply regex functions to this kind of data ? Please help me to solve this issue. Thanks a lot!

Expected output:

    +-----------------+------+--------------------+--------------+--------------------+
    |         time_val|status|         log_message|         range|               value|
    +-----------------+------+--------------------+--------------+--------------------+
    |20/06/25 12:19:33|  INFO|datasources.FileS...| range:0-27899| partition values...|
    |20/06/25 12:19:34|  INFO|executor.EXECUTORd..|              |                    |
    +-----------------+------+--------------------+--------------+--------------------+

Upvotes: 0

Views: 692

Answers (1)

suresiva
suresiva

Reputation: 3173

You may first load a Dataframe with Timestamp, 'Status' and all remaining as String.

input_df=spark.createDataFrame(sc.textFile("log_lines.log").map(lambda x : tuple([x[0:17], x[18:22], x[23:]])), ["time_val","status","message"])

+-----------------+------+--------------------+
|         time_val|status|             message|
+-----------------+------+--------------------+
|20/06/25 12:19:33|  INFO|datasources.FileS...|
|20/06/25 12:19:34|  INFO|executor.EXECUTOR...|
+-----------------+------+--------------------+

Now, you first handle the lines with Message,Range,Value as below,

input_df.filter(F.col("message").startswith("datasources.FileScanRDD")).withColumn("log_message", F.split(F.col("message"), ",")[0]).withColumn("range", F.split(F.col("message"), ",")[1]).withColumn("value", F.split(F.col("message"), ",")[2])..drop("message").drop("message").show()


+-----------------+------+--------------------+--------------+--------------------+
|         time_val|status|         log_message|         range|               value|
+-----------------+------+--------------------+--------------+--------------------+
|20/06/25 12:19:33|  INFO|datasources.FileS...| range:0-27899| partition values...|
+-----------------+------+--------------------+--------------+--------------------+

Then you may handle the other line which just has message,

input_df.filter(~(F.col("message").startswith("executor"))).show()

+-----------------+------+--------------------+
|         time_val|status|             message|
+-----------------+------+--------------------+
|20/06/25 12:19:33|  INFO|datasources.FileS...|
+-----------------+------+--------------------+

Upvotes: 1

Related Questions