Reputation: 81
I have a log file which I need to split using Pyspark Dataframe .Below is my sample log file
20/06/25 12:19:33 INFO datasources.FileScanRDD: Reading File path: hdfs://bpaiddev/dev/data/warehouse/clean/falcon/ukc/masked_data/parquet/FRAUD_CUSTOMER_INFORMATION/rcd_crt_dttm_yyyymmdd=20200523/part-0042-ed52abc2w.c000.snapp.parquet, range:0-27899, partition values :[20200523]
20/06/25 12:19:34 INFO executor.EXECUTOR: Finished task 18.0 in stage 0.0 (TID 18),18994 bytes result sent to driver
From the log sample you can see the first line has more details compared to the second line.
I want Timestamp, Status ,Message,Range,Value
columns for the first line , for the second line I can have only Timestamp,Status,Message
columns.
How to apply regex functions to this kind of data ? Please help me to solve this issue. Thanks a lot!
Expected output:
+-----------------+------+--------------------+--------------+--------------------+
| time_val|status| log_message| range| value|
+-----------------+------+--------------------+--------------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...| range:0-27899| partition values...|
|20/06/25 12:19:34| INFO|executor.EXECUTORd..| | |
+-----------------+------+--------------------+--------------+--------------------+
Upvotes: 0
Views: 692
Reputation: 3173
You may first load a Dataframe with Timestamp
, 'Status' and all remaining as String
.
input_df=spark.createDataFrame(sc.textFile("log_lines.log").map(lambda x : tuple([x[0:17], x[18:22], x[23:]])), ["time_val","status","message"])
+-----------------+------+--------------------+
| time_val|status| message|
+-----------------+------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...|
|20/06/25 12:19:34| INFO|executor.EXECUTOR...|
+-----------------+------+--------------------+
Now, you first handle the lines with Message,Range,Value
as below,
input_df.filter(F.col("message").startswith("datasources.FileScanRDD")).withColumn("log_message", F.split(F.col("message"), ",")[0]).withColumn("range", F.split(F.col("message"), ",")[1]).withColumn("value", F.split(F.col("message"), ",")[2])..drop("message").drop("message").show()
+-----------------+------+--------------------+--------------+--------------------+
| time_val|status| log_message| range| value|
+-----------------+------+--------------------+--------------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...| range:0-27899| partition values...|
+-----------------+------+--------------------+--------------+--------------------+
Then you may handle the other line which just has message,
input_df.filter(~(F.col("message").startswith("executor"))).show()
+-----------------+------+--------------------+
| time_val|status| message|
+-----------------+------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...|
+-----------------+------+--------------------+
Upvotes: 1