Tilo Wiklund
Tilo Wiklund

Reputation: 821

Adding line numbers when parsing many CSV files with Spark

I am currently having Spark parse a large number of small CSV-files into one large dataframe. Something along the lines of

df = spark.read.format("csv").load("file*.csv")

Because of how the data set being parsed is structured I need the line numbers within the corresponding source CSV-file of every row in df. Is there some simple way of achieving this (preferably without resorting to reconstructing them afterward by a combination of grouping on input_file_name() and zipwithindex())?

For example if

# file1.csv
col1, col2
A, B
C, D

and

# file2.csv
col1, col2
E, F
G, H

I need a resulting data frame equivalent to

row, col1, col2
1, A, B
2, C, D
1, E, F
2, G, H

Upvotes: 4

Views: 5494

Answers (1)

Avishek Bhattacharya
Avishek Bhattacharya

Reputation: 6994

If you require any arbitrary order of the row_number in a dataframe, you could use the following alternatives.

One alternative is to use monotonically_increasing_id function if you are using spark 2.x

Something like this

val df = spark.read.format("csv").load("file*.csv").withColumn("rowId", monotonically_increasing_id())

The other alternative would be using row_number. But that works if you have partition in the dataframe

Something like

val df = spark.read.format("csv").load("file*.csv").withColumn("rowId", row_number().over(Window.partitionBy("col1")

This will ensure the row number is populated per partition.

However, if you require exact ordering, I am afraid there are no "sparky" way to do it. The reason being once you read data as a dataframe it looses the ordering with which the data was persisted before.

You could merge the csv files using a java program in a single machine and add the row number in the program.

Upvotes: 4

Related Questions