Reputation: 253
I have a set of custom log files, that I need to parse. I am currently working in Azure Databricks, but am quite new in using PySpark. The log files are hosted within an Azure Blob Storage Account, which is mounted to our Azure Databricks instance.
Log file example for the input:
Value_x: 1
Value_y: "Station"
col1;col2;col3;col4;
A1;B1;C1;D1;
A2;B2;C2;D2;
A3;B3;C3;D3;
Output that is a list of strings, but I can also work with a list of lists.
['A1;B1;C1;D1;1;station',
'A2;B2;C2;D2;1;station',
'A3;B3;C3;D3;1;station']
The snippet of code to apply these transformations.
def custom_parser(file, content):
content_ = content.replace('"', '').replace('\r', '').split('\n')
content_ = [line for line in content_ if len(line) > 0]
x = content_[0].split('Value_x:')[-1].strip()
y = content_[0].split('Value_y:')[-1].strip()
content_ = content_[3:]
content_ = [line + ';'.join([x,y]) for line in content_]
return content_
from pyspark import SparkConf
from pyspark.context import SparkContext
sc = SparkContect.getOrCreate(SparkConf)
files = sc.wholeTextFiles('spam/eggs/').collect()
parsed_content = []
for file, content in files:
parsed_content += custom_parser(file, content)
I have developed a custom_parser function to handle the content of these log files. But I am left with some questions:
Upvotes: 0
Views: 806
Reputation: 707
you cannot apply your custom_parser action directly on sc.wholeTextFiles, but what you can do is to use custom_parser as map function, then after read your files and get RDD[String,String] (path,content) you can apply the custom_parser as rdd.map(custom_parser) and then write it where you need. In that way you will do your job in parallel, not like now that you are doing all in driver.
Upvotes: 1