Reputation: 821
I am currently having Spark parse a large number of small CSV-files into one large dataframe. Something along the lines of
df = spark.read.format("csv").load("file*.csv")
Because of how the data set being parsed is structured I need the line numbers within the corresponding source CSV-file of every row in df
. Is there some simple way of achieving this (preferably without resorting to reconstructing them afterward by a combination of grouping on input_file_name()
and zipwithindex()
)?
For example if
# file1.csv
col1, col2
A, B
C, D
and
# file2.csv
col1, col2
E, F
G, H
I need a resulting data frame equivalent to
row, col1, col2
1, A, B
2, C, D
1, E, F
2, G, H
Upvotes: 4
Views: 5494
Reputation: 6994
If you require any arbitrary order of the row_number in a dataframe, you could use the following alternatives.
One alternative is to use monotonically_increasing_id function if you are using spark 2.x
Something like this
val df = spark.read.format("csv").load("file*.csv").withColumn("rowId", monotonically_increasing_id())
The other alternative would be using row_number
. But that works if you have partition in the dataframe
Something like
val df = spark.read.format("csv").load("file*.csv").withColumn("rowId", row_number().over(Window.partitionBy("col1")
This will ensure the row number is populated per partition.
However, if you require exact ordering, I am afraid there are no "sparky" way to do it. The reason being once you read data as a dataframe it looses the ordering with which the data was persisted before.
You could merge the csv files using a java program in a single machine and add the row number in the program.
Upvotes: 4