Leyth G
Leyth G

Reputation: 1143

How to throttle Spark Streaming?

this question kind of goes off my other question for managing AmazonDynamoDbClient throttles and retries. However, I think the solution might exist before I even get to the dynamo call.

My high level process is as follows: I have a scala application utilizing Apache Spark to read large CSV files and perform some aggregations on them and then write them to dynamo. I deploy this to EMR to give us scalability. The issue is that once aggregation is complete, we have millions of records ready to go into dynamo, but we have a write capacity with dynamo. They don't need to be inserted immediately, but it would be nice to control how many per second so we can fine tune it for our use case.

Here is a code sample of what I have so far:

val foreach = new ForeachWriter[Row] {
    override def process(value: Row): Unit = {
      //write to dynamo here
    }

    override def close(errorOrNull: Throwable): Unit = {
    }

    override def open(partitionId: Long, version: Long): Boolean = {
      true
    }
  }

val query = dataGrouped
    .writeStream
    .queryName("DynamoOutput")
    .format("console")
    .foreach(foreach)
    .outputMode(OutputMode.Complete())
    .start()
    .awaitTermination()

Does anyone have any recommendations how to solve this problem?

Upvotes: 3

Views: 2897

Answers (1)

Vidya
Vidya

Reputation: 30320

You should look into the spark.streaming.backpressure.enabled configuration. From the documentation:

Setting the max receiving rate - If the cluster resources is not large enough for the streaming application to process data as fast as it is being received, the receivers can be rate limited by setting a maximum rate limit in terms of records / sec. See the configuration parameters spark.streaming.receiver.maxRate for receivers and spark.streaming.kafka.maxRatePerPartition for Direct Kafka approach. In Spark 1.5, we have introduced a feature called backpressure that eliminate the need to set this rate limit, as Spark Streaming automatically figures out the rate limits and dynamically adjusts them if the processing conditions change. This backpressure can be enabled by setting the configuration parameter spark.streaming.backpressure.enabled to true.

Upvotes: 4

Related Questions