Reputation: 184
I'm receiving time series data, and want to persist/upsert only the most recent entry we get based on a specific key. We used to be aggregating things with mapWithState
and we were able to process about 1k/sec locally around 6k/sec on our qa environment, and around 45k/sec on our beefiest environments.
I removed a lot of code, because we had some requirement changes and I think the reason I'm seeing such slow behavior is because of reduceByKey
here's a small amount of code I have:
rawStream
.map(timeSeries => (timeSeries.key, AggregationEngine.createSingleItemAggregate(timeSeries)))
// Select the last value for the key
.reduceByKey((accumulator: AggregationResult, aggregationResult: AggregationResult) => {
if (accumulator.firstTimeMillis > aggregationResult.firstTimeMillis) {
accumulator
} else {
aggregationResult
}
}, numPartitions = numberOfReduceTasks)
// Send back table name and aggregate
.map(aggResultPair => (aggResultPair._2.tableName, aggResultPair._2) )
This takes about 3-5 minutes to proccess 500 data points locally, and it's pretty slow on small batch sizes on our qa environment. I understand that there should be a slow down with it, because before everything was one stage, now because of the shuffling, it's broken into two stages, and shuffling takes a long time. Is there an ideal numPartitions
value that I should be using? Like maybe each core should add X partitions, or per gig of memory you should add X more partitions. I've been running it locally, and trying to figure it out, but nothing really gets me to a reasonable processing time for this.
Upvotes: 1
Views: 633
Reputation: 1857
I had a similar experience with Spark on a small cluster with around 2000 items in an RDD. Repartitioning to many different partition counts did not make a difference. Once I ran it with more items (around 4000 but I think this depends on the number of executors you have) it started behaving as expected. Try running it with more data points.
Upvotes: 1