nickn
nickn

Reputation: 184

numPartitions for reduceByKey not affecting speed

I'm receiving time series data, and want to persist/upsert only the most recent entry we get based on a specific key. We used to be aggregating things with mapWithState and we were able to process about 1k/sec locally around 6k/sec on our qa environment, and around 45k/sec on our beefiest environments.

I removed a lot of code, because we had some requirement changes and I think the reason I'm seeing such slow behavior is because of reduceByKey here's a small amount of code I have:

rawStream
    .map(timeSeries => (timeSeries.key, AggregationEngine.createSingleItemAggregate(timeSeries)))
    // Select the last value for the key
    .reduceByKey((accumulator: AggregationResult, aggregationResult: AggregationResult) => {
        if (accumulator.firstTimeMillis > aggregationResult.firstTimeMillis) {
            accumulator
        } else {
            aggregationResult
        }
    }, numPartitions = numberOfReduceTasks)
    // Send back table name and aggregate
    .map(aggResultPair => (aggResultPair._2.tableName, aggResultPair._2) )

This takes about 3-5 minutes to proccess 500 data points locally, and it's pretty slow on small batch sizes on our qa environment. I understand that there should be a slow down with it, because before everything was one stage, now because of the shuffling, it's broken into two stages, and shuffling takes a long time. Is there an ideal numPartitions value that I should be using? Like maybe each core should add X partitions, or per gig of memory you should add X more partitions. I've been running it locally, and trying to figure it out, but nothing really gets me to a reasonable processing time for this.

Upvotes: 1

Views: 633

Answers (1)

nedim
nedim

Reputation: 1857

I had a similar experience with Spark on a small cluster with around 2000 items in an RDD. Repartitioning to many different partition counts did not make a difference. Once I ran it with more items (around 4000 but I think this depends on the number of executors you have) it started behaving as expected. Try running it with more data points.

Upvotes: 1

Related Questions