ZZzzZZzz
ZZzzZZzz

Reputation: 1844

How to append a Header value from file as a extra column in csv file using pyspark for 1000 files

I have been trying to filter out a header line with #Id in the beginning and add the id number as a column to the file_name being processed. Below are sample files to be processed

File 1:

#sample first line
#Id: abcdef
col1,col2,col3
1,2,3
2,3,3
4,5,6

File 2:

#sample first line
#Id: ghjklo
col1,col2,col3
5,1,3
2,5,8
8,0,4

When I try to construct the dataframe and print the results I am able to add the filename as a column using the below snippet.

par_df = spark.read.schema(schema) \
                    .option("header", "true") \
                    .format("com.databricks.spark.csv") \
                    .option("mode", "DROPMALFORMED")\
                    .csv("s3a://" + bucket "/"+prefix+"/").withColumn("FileName", func.input_file_name())

This filters out the header info and below is the snippet to print the result.

parsed_diff_df = par_df.select(
    par_df['col1'],
    par_df['col2'])    
parsed_diff_df.registerTempTable("parsed_diff_df_table")
results = sqlContext.sql("select col1, col2, FileName from "                        
                             "parsed_diff_df_table").collect()

This is the result I have got and unable to append the Id column as it is already filtered out.

1,2,3,File1
2,3,3,File1
4,5,6,File1
5,1,3,File2
2,5,8,File2
8,0,4,File2

Intended result is below.

1,2,3,abcdef,File1
2,3,3,abcdef,File1
4,5,6,abcdef,File1
5,1,3,ghjklo,File2
2,5,8,ghjklo,File2
8,0,4,ghjklo,File2

I have also tried this but no luck.

   rdd = sc.textFile("s3a://" + bucket + "/"+prefix+"/").flatMap(lambda line: line.split("\n")).filter(lambda line: '#' in line)

   results = rdd.collect()
   for row in results:
       print row

Upvotes: 0

Views: 1002

Answers (2)

Rahul Sharma
Rahul Sharma

Reputation: 5834

Instead of using csv loader, implement below steps to achieve this:

  • Load data to pair rdd using sparkContext.wholeTextFiles.
  • Apply flatMapValues function
    1. Split each record using new line '\n'
    2. Get id from first line-->split first line using ':' then second part of split as id.
    3. Skip second line as schema is predefined.
    4. line 3rd to last line append id.
  • apply map function skip key as it is contains file name and split value to individual columns-->split using ','.
  • Convert RDD to dataset using 'col1, col2, col3'

I am java developer, not much handson with Python, something similar might help you:

pairRdd=sc.wholeTextFiles('<path>')

#it exactly wont work, make required changes:
def appendId( record ):
   splits = record.splitlines()
   id=splits[0].split(':')[1].strip()
   print(id)
   output=[]
   for s in xrange(2,len(splits)):
       output.append(splits[s]+','+id)
   return output
objRDD=pairRdd.flatMapValues(appendId)
.map(lambda key,val:val.split(','))
.map(lambda p:Row(col1=int(p[0]), col2=int(p[1])........FileName=p[3]))
dataframe=spark.createDataFrame(objRdd)
.....

Equivalent Java:

JavaPairRDD<String[]> inputRdd = sparkContext.wholeTextFiles("<xyz path>");;
    inputRdd.flatMapValues(new Function<String, Iterable<String>>() {
                               @Override
                               public Iterable<String> call(String v1) throws Exception {
                                   String[] splits = v1.split( System.getProperty("line.separator"));
                                   String id = splits[0].split(":")[1].trim();
                                   List<String> values = new ArrayList<String>();
                                   for (int i =2;i<splits.length;i++){
                                       values.add(String.format("%s,%s", splits[i],id));
                                   }
                                   return values;
                               }
                           }
    ).map(s->s._2().split(",")); 

Upvotes: 1

MaFF
MaFF

Reputation: 10096

You can map the FileName of each file to it's id:

Let's write a function to extract the id value:

import re
def extract_id(l):
    return re.search('#Id: ([a-z]+)\\n', line).group(1)

Let's read the files as RDDs:

file_id = sc.wholeTextFiles("/user/at967214/test.csv").filter(lambda l: l[1][0]=='#').map(lambda l: [l[0], extract_id(l[1])])

And now the dataframe:

file_id_df = spark.createDataFrame(file_id, ["FileName", "id"])

Now you can join it with your first dataframe

par_df.join(file_id_df, "FileName", "inner")

Upvotes: 2

Related Questions