Reputation: 99
I need help with implementation one workflow with Apache Spark. My task is in next:
As I know Spark can open many files by mask:
var source = sc.textFile("/source/data*.gz");
But in this case I can't recognize which line of a file. If I get list of source files and try to process by following scenario:
JavaSparkContext sc = new JavaSparkContext(...);
List<String> files = new ArrayList() //list of source files full name's
for(String f : files)
{
JavaRDD<String> data = sc.textFile(f);
//process this file with Spark
outRdd.coalesce(1, true).saveAsTextFile(f + "_out");
}
But in this case I will processed all files in sequential mode.
My question is next: how I can processed many files in parallel mode?. For example: one file - one executor?
Thank very much for help me!
Upvotes: 4
Views: 6015
Reputation: 1532
You can open regular java fixed size thread pool(say 10 threads) and submit spark job your saveAsTextFile from Callable/Runnable. This will submit 10 parallel jobs, and if you have enough resources in your spark cluster - they will be executed in parallel. Something like following
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import com.google.common.collect.Lists;
public class Test {
public static void main(String[] argv) {
final JavaSparkContext sc = new JavaSparkContext(...);
List<String> files = new ArrayList<>(); //list of source files full name's
ExecutorService pool = Executors.newFixedThreadPool(10);
List<Future<?>> futures = new ArrayList<>();
for(final String f : files)
{
Future<?> fut = pool.submit(new Runnable() {
@Override
public void run() {
JavaRDD<String> data = sc.textFile(f);
//process this file with Spark
outRdd.coalesce(1, true).saveAsTextFile(f + "_out");
}
});
futures.add(fut);
}
//waiting for all tasks
for (Future<?> fut : futures) {
fut.get();
}
}
}
Upvotes: 0
Reputation: 7138
Here are the steps
The above works with default partition though. So you might not get input files count equal to output file count(as output is number of partitions).
You can re-partition the RDD based on count or any other unique value based on your data, so you end up with output files count equal to input count. This approach will have only parallelism but will not have the performance achieved with optimal number of partitions
Upvotes: 2