Reputation: 1844
I have been trying to filter out a header line with #Id in the beginning and add the id number
as a column to the file_name being processed. Below are sample files to be processed
File 1:
#sample first line
#Id: abcdef
col1,col2,col3
1,2,3
2,3,3
4,5,6
File 2:
#sample first line
#Id: ghjklo
col1,col2,col3
5,1,3
2,5,8
8,0,4
When I try to construct the dataframe and print the results I am able to add the filename as a column using the below snippet.
par_df = spark.read.schema(schema) \
.option("header", "true") \
.format("com.databricks.spark.csv") \
.option("mode", "DROPMALFORMED")\
.csv("s3a://" + bucket "/"+prefix+"/").withColumn("FileName", func.input_file_name())
This filters out the header info and below is the snippet to print the result.
parsed_diff_df = par_df.select(
par_df['col1'],
par_df['col2'])
parsed_diff_df.registerTempTable("parsed_diff_df_table")
results = sqlContext.sql("select col1, col2, FileName from "
"parsed_diff_df_table").collect()
This is the result I have got and unable to append the Id column as it is already filtered out.
1,2,3,File1
2,3,3,File1
4,5,6,File1
5,1,3,File2
2,5,8,File2
8,0,4,File2
Intended result is below.
1,2,3,abcdef,File1
2,3,3,abcdef,File1
4,5,6,abcdef,File1
5,1,3,ghjklo,File2
2,5,8,ghjklo,File2
8,0,4,ghjklo,File2
I have also tried this but no luck.
rdd = sc.textFile("s3a://" + bucket + "/"+prefix+"/").flatMap(lambda line: line.split("\n")).filter(lambda line: '#' in line)
results = rdd.collect()
for row in results:
print row
Upvotes: 0
Views: 1002
Reputation: 5834
Instead of using csv loader, implement below steps to achieve this:
I am java developer, not much handson with Python, something similar might help you:
pairRdd=sc.wholeTextFiles('<path>')
#it exactly wont work, make required changes:
def appendId( record ):
splits = record.splitlines()
id=splits[0].split(':')[1].strip()
print(id)
output=[]
for s in xrange(2,len(splits)):
output.append(splits[s]+','+id)
return output
objRDD=pairRdd.flatMapValues(appendId)
.map(lambda key,val:val.split(','))
.map(lambda p:Row(col1=int(p[0]), col2=int(p[1])........FileName=p[3]))
dataframe=spark.createDataFrame(objRdd)
.....
Equivalent Java:
JavaPairRDD<String[]> inputRdd = sparkContext.wholeTextFiles("<xyz path>");;
inputRdd.flatMapValues(new Function<String, Iterable<String>>() {
@Override
public Iterable<String> call(String v1) throws Exception {
String[] splits = v1.split( System.getProperty("line.separator"));
String id = splits[0].split(":")[1].trim();
List<String> values = new ArrayList<String>();
for (int i =2;i<splits.length;i++){
values.add(String.format("%s,%s", splits[i],id));
}
return values;
}
}
).map(s->s._2().split(","));
Upvotes: 1
Reputation: 10096
You can map the FileName
of each file to it's id
:
Let's write a function to extract the id value:
import re
def extract_id(l):
return re.search('#Id: ([a-z]+)\\n', line).group(1)
Let's read the files as RDDs:
file_id = sc.wholeTextFiles("/user/at967214/test.csv").filter(lambda l: l[1][0]=='#').map(lambda l: [l[0], extract_id(l[1])])
And now the dataframe:
file_id_df = spark.createDataFrame(file_id, ["FileName", "id"])
Now you can join it with your first dataframe
par_df.join(file_id_df, "FileName", "inner")
Upvotes: 2