user3058703
user3058703

Reputation: 591

Iterating over multiple CSVs and joining with Spark SQL

I have several csv files with the same headers and the same IDs. I am attempting to iterate to merge all files up to one indexed '31'. In my while loop, I'm trying to initialise the merged dataset so it can be used for the remainder of the loop. In the last line, I was told that the 'local variable merged may not have been initialised'. How should I instead be doing this?

SparkSession spark = SparkSession.builder().appName("testSql")
            .master("local[*]")
            .config("spark.sql.warehouse.dir", "file:///c:tmp")
            .getOrCreate();

Dataset<Row> first = spark.read().option("header", true).csv("mypath/01.csv");
Dataset<Row> second = spark.read().option("header", true).csv("mypath/02.csv");
    
IntStream.range(3, 31)
    .forEach(i -> {
        while(i==3) {
            Dataset<Row> merged = first.join(second, first.col("customer_id").equalTo(second.col("customer_id")));
            }
        Dataset<Row> next = spark.read().option("header", true).csv("mypath/"+i+".csv");
        Dataset<Row> merged  = merged.join(next, merged.col("customer_id").equalTo(next.col("customer_id")));

Upvotes: 1

Views: 275

Answers (1)

mazaneicha
mazaneicha

Reputation: 9427

EDITED based on feedback in the comments.

Following your pattern, something like this would work:

Dataset<Row> ds1 = spark.read().option("header", true).csv("mypath/01.csv");
Dataset<?>[] result = {ds1};
IntStream.range(2, 31)
    .forEach(i -> {
        Dataset<Row> next = spark.read().option("header", true).csv("mypath/"+i+".csv");
        result[0] = result[0].join(next, "customer_id");
    });

We're wrapping Dataset into an array in order to work around the restriction on variable capture in lambda expressions.

The more straightforward way, for this particular case, is to simply use a for-loop rather than stream.forEach:

Dataset<Row> result = spark.read().option("header", true).csv("mypath/01.csv");
for( int i = 2 ; i < 31 ; i++ ) {
  Dataset<Row> next = spark.read().option("header", true).csv("mypath/"+i+".csv");
  result[0] = result[0].join(next, "customer_id");
};

Upvotes: 1

Related Questions