twiz911
twiz911

Reputation: 634

Spark Local Mode - all jobs only use one CPU core

We are running Spark Java in local mode on a single AWS EC2 instance using

"local[*]"

However, profiling using New Relic tools and a simple 'top' show that only one CPU core of our 16 core machine is ever in use for three different Java spark jobs we've written (we've also tried different AWS instances but only one core is ever used).

Runtime.getRuntime().availableProcessors() reports 16 processors and sparkContext.defaultParallelism() reports 16 as well.

I've looked at various Stackoverflow local mode issues but none seem to have resolved the issue.

Any advice much appreciated.

Thanks

EDIT: Process

1) Use sqlContext to read gzipped CSV file 1 using com.databricks.spark.csv from disc (S3) into DataFrame DF1.

2) Use sqlContext to read gzipped CSV file 2 using com.databricks.spark.csv from disc (S3) into DataFrame DF2.

3) Use DF1.toJavaRDD().mapToPair(new mapping function that returns a Tuple>) RDD1

4) Use DF2.toJavaRDD().mapToPair(new mapping function that returns a Tuple>) RDD2

5) Call union on the RDDs

6) Call reduceByKey() on the unioned RDDs to "merge by key" so have a Tuple>) with only one instance of a particular key (as the same key appears in both RDD1 and RDD2).

7) Call .values().map(new mapping Function which iterates over all items in the provided List and merges them as required to return a List of the same or smaller length

8) Call .flatMap() to get an RDD

9) Use sqlContext to create a DataFrame from the flat map of type DomainClass

10) Use DF.coalease(1).write() to write the DF as gzipped CSV to S3.

Upvotes: 6

Views: 4770

Answers (1)

Tim
Tim

Reputation: 3725

I think your problem is that your CSV files are gzipped. When Spark reads files, it loads them in parallel, but it can only do this if the file codec is splittable*. Plain (non-gzipped) text and parquet are splittable, as well as the bgzip codec used in genomics (my field). Your entire files are ending up in one partition each.

Try decompressing the csv.gz files and running this again. I think you'll see much better results!

  • splittable formats mean that if you are given an arbitrary file offset at which to start reading, you can find the beginning of the next record in your block and interpret it. Gzipped files are not splittable.

Edit: I replicated this behavior on my machine. Using sc.textFile on a 3G gzipped text file produced 1 partition.

Upvotes: 4

Related Questions