Yustas
Yustas

Reputation: 99

Processing several files one-by-one separately by Spark

I need help with implementation one workflow with Apache Spark. My task is in next:

  1. I have several CSV files as source data. Note: these files could has different layout
  2. I have metadata with info how I need parse each file (this is not problem)
  3. Main goal: result is source file with several additional columns. I have to update each source file without joining to one output range. For example: source 10 files -> 10 result files and each result file have data only from corresponding source file.

As I know Spark can open many files by mask:

var source = sc.textFile("/source/data*.gz");

But in this case I can't recognize which line of a file. If I get list of source files and try to process by following scenario:

JavaSparkContext sc = new JavaSparkContext(...);
List<String> files = new ArrayList() //list of source files full name's
for(String f : files)
{
   JavaRDD<String> data = sc.textFile(f);
   //process this file with Spark
   outRdd.coalesce(1, true).saveAsTextFile(f + "_out"); 
}

But in this case I will processed all files in sequential mode.

My question is next: how I can processed many files in parallel mode?. For example: one file - one executor?

Thank very much for help me!

Upvotes: 4

Views: 6015

Answers (2)

Igor Berman
Igor Berman

Reputation: 1532

You can open regular java fixed size thread pool(say 10 threads) and submit spark job your saveAsTextFile from Callable/Runnable. This will submit 10 parallel jobs, and if you have enough resources in your spark cluster - they will be executed in parallel. Something like following

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import com.google.common.collect.Lists;

public class Test {

    public static void main(String[] argv) {
        final JavaSparkContext sc = new JavaSparkContext(...);
        List<String> files = new ArrayList<>(); //list of source files full name's
        ExecutorService pool = Executors.newFixedThreadPool(10);
        List<Future<?>> futures = new ArrayList<>();
        for(final String f : files)
        {
            Future<?> fut = pool.submit(new Runnable() {

                @Override
                public void run() {
                    JavaRDD<String> data = sc.textFile(f);
                    //process this file with Spark
                    outRdd.coalesce(1, true).saveAsTextFile(f + "_out"); 

                }
            });
            futures.add(fut);

        }
        //waiting for all tasks
        for (Future<?> fut : futures) {
            fut.get();
        }
    }
}

Upvotes: 0

Ramzy
Ramzy

Reputation: 7138

Here are the steps

  1. Use sparkcontext.wholeTextFiles("/path/to/folder/containing/all/files")
  2. The above returns an RDD where key is the path of the file, and value is the content of the file
  3. rdd.map(lambda x:x[1]) - this give you an rdd with only file contents
  4. rdd.map(lambda x: customeFunctionToProcessFileContent(x))
  5. since map function works in parallel, any operations you do, would be faster and not sequential - as long as your tasks don't depend on each other, which is the main criteria for parallelism

The above works with default partition though. So you might not get input files count equal to output file count(as output is number of partitions).

You can re-partition the RDD based on count or any other unique value based on your data, so you end up with output files count equal to input count. This approach will have only parallelism but will not have the performance achieved with optimal number of partitions

Upvotes: 2

Related Questions