Reputation: 549
I have inserted 50 records in Kafka topic with 1 second latency between each record, so 50 records in 50+ seconds.
While consuming the records from this topic, as you can see in below code, i am keeping the batch interval to 1 second. So ideally i should be getting ~50 RDDs. I am using foreachRDD to get the data in batches , and then process data for each RDD. In below code, each time "call" method is called, i am counting records in each RDD. The surprising thing is that the first RDD itself is showing 50 records , and obviously subsequent RDD are showing 0 records. I don't understand this behaviour, it should be ideally 1 record per RDD.
Can anyone suggest if my understanding is wrong?
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<K, String>>>() {
public void call(JavaRDD<ConsumerRecord<K, V>> rdd) {
System.out.println("NUmber of elements in RDD : "+ rdd.count());
List<Row> rows = rdd.map(record -> processData(record))
.reduce((rows1, rows2) -> {
rows1.addAll(rows2);
return rows1;
});
StructType schema = DataTypes.createStructType(fields);
Dataset ds = ss.createDataFrame(rows, schema);
ds.createOrReplaceTempView("trades");
ds.show();
}
});
Upvotes: 2
Views: 2277
Reputation: 519
This is the expected behavior of spark what you are seeing. Had you started your streaming job and then populated the Kafka topic with data (Say 1 record every second) your batch size would have been according to your expectation. (Not quite... Because of Kafka topic partitions)
But it is possible to achieve what you want by using a spark streaming parameter called max.rate.per.partition
You have to keep in mind that it works per Kafka partition.
Example:
So if your Kafka topic has 3 partitions. And your batch size is 1 second
. and max.rate.per.partition = 1
In the below mentioned configuration will get 3 items per RDD batch.
Sample Streaming Context:
```
val sparkConf = new SparkConf().
set("spark.streaming.kafka.maxRatePerPartition",
inputParam.maxRatePerPartition)
val ssc = new StreamingContext(sparkConf, inputParam.batchDuration)
```
If your Kafka topic has 1 partition then you will get exactly what you want.
Upvotes: 2
Reputation: 37435
Spark Streaming uses "micro batching" that is, during the batch interval
data arrives or is collected.
At each batch interval deadline, the data that arrived up to that moment is sent for processing to Spark. If when the streaming process starts, all data has already been received on Kafka, all that data will be processed at once in the first interval.
Eventual new data arriving after that moment will be processes in further intervals.
Upvotes: 0